NATS
NATS is a message broker for distributed systems.
Installation
-
Add the dependency to your
shard.yml
:dependencies: nats: github: jgaskins/nats
-
Run
shards install
Usage
You can use NATS in a publish/subscribe or request/reply paradigm.
Publish/Subscribe
For publish/subscribe, let's consider the following class to be shared, representing an event that will be published by one service and picked up by another:
require "uuid"
require "json"
require "uuid/json"
struct UserRegisteredEvent
include JSON::Serializable
getter id : UUID
getter email : String
getter name : String
def initialize(@id, @email, @name)
end
end
In one service, we can subscribe to a subject that will be sent all of the events pertaining to a user registering:
require "nats"
nats = NATS::Client.new(URI.parse(ENV["NATS_URL"]))
# Subscribe to all messages on "customers.registration" with an optional queue
# group. A message will only be delivered to a single client in a given queue
# group.
nats.subscribe "customers.registration", queue_group: "cart-service" do |msg|
new_user = UserRegisteredEvent.from_json(msg.body_io)
# This message represents that a new customer has registered, presumably sent
# by our identity/authentication/user service. We create a record for this
# customer in our own database so we don't always need to request the info
# from that service.
UserQuery.new.create_from_message(new_user)
end
# Accept wildcard messages. This would match:
# - orders.commercial.fulfilled
# - orders.individual.fulfilled
nats.subscribe "orders.*.shipped", queue_group: "cart-service" do |msg|
# ...
end
# Since the subscribe blocks above do not block execution, we need to keep the
# main fiber from exiting. In a real-world app, you might trap a TERM/INT signal
# to allow the app to close the connection gracefully.
sleep
And then to publish on those topics:
require "nats"
nats = NATS::Client.new(URI.parse(ENV["NATS_URL"]))
# We can publish a message with a given subject. In this example, we'll
# publish a message saying Jolene has registered.
nats.publish "customers.registration", UserRegisteredEvent.new(
id: UUID.random,
name: "Jolene",
email: "jolene@gonnatakeyourman.com",
)
nats.close
Request/Reply
Let's consider an orders service that we may want to send requests to.
require "uuid"
require "json"
require "uuid/json"
require "db"
module Orders
struct Get
include JSON::Serializable
getter id : UUID
def initialize(@id)
end
end
end
struct Order
include DB::Serializable
include JSON::Serializable
getter id : UUID
getter address : String
getter city : String
getter state : String
getter postal_code : String
end
Define the request handler
require "nats"
nats = NATS::Client.new(URI.parse(ENV["NATS_URL"]))
# Subscribe to the subject that the request will be sent to
nats.subscribe "orders.get", do |msg|
request = Orders::Get.from_json(msg.body_io)
order = OrderQuery.new.with_id(request.id)
nats.reply msg, order.to_json
end
Sending the request
require "nats"
nats = NATS::Client.new(URI.parse(ENV["NATS_URL"]))
# Send the request to the subject it is expected to be received on:
order = nats.request "orders.get",
message: Orders::Get.new(order_id).to_json,
timeout: 5.seconds # A timeout must be specified
pp order
Development
TODO Write development instructions here
Contributing
- Fork it (https://github.com/jgaskins/nats/fork)
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request
Contributors
- Jamie Gaskins - creator and maintainer