Using RabbitMQ bunny and sneakers
This tutorial contain example how to implement service oriented architecture on ruby on rails, using RabbitMQ, bunny as producer and sneakers as consumer. This introduction accommodating step by step from first installation.
I got idea from this medium post and adding some additional step.
Initiation and environment setup
First of all, we create skeleton of rails, using command: rails new {branch_name} -d mysql
, this function create basic rails file and folder and using database mysql instead default sqlite.
After files are initiated, go to folder via cd {branch_name}
and then type bundle
to install rails dependencies. After success go to config/database.yml
and set some environment variable like host, username and password.
default: &default
adapter: mysql2
encoding: utf8
pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 5 } %>
host: <%= ENV.fetch("RAILS_HOST_DB") %>
username: <%= ENV.fetch("RAILS_USERNAME_DB") %>
password: <%= ENV.fetch("RAILS_PASSWORD_DB") %>
socket: /tmp/mysql.sock
For development convenience i set some environment on .env
and read using gem dotenv-rails
, on Gemfile add gemfile 'dotenv-rails'
on group :development, :test
section and type bundle
again in terminal. After dependencies were updated, create .env
file on root folder and fill like
RAILS_HOST_DB=127.0.0.1
RAILS_USERNAME_DB=root
RAILS_PASSWORD_DB=secret_password
also on config/application.rb
add new line Dotenv::Railtie.load
Do initialize database using command rails db:create
, and voila now on your mysql instance, two database created.
We continue to create model using command rails generate model User first_name:string last_name:string email:string
, it will automatically generate model, test/model and migration file on db/migrate.
Try to runrails db:migrate
to create table and column on schema database.
Since we try using rspec and shoulda matcher to test, we adding additional gem on test section
gem ‘rspec-rails’
gem ‘shoulda-matchers’
and again type bundle
and additional rails generate rspec:install
to create spec_helper
and rails_helper
Finally we back to model User.rb and add simple validation like validates :first_name, :last_name, :email, presence: true
to make validation of mandatory variable
Set publisher
Install rabbitMq client, one of gem is bunny using command:gem install bunny
and set initializer. You can use initializers to hold configuration settings that should be made after all of the frameworks and plugins are loaded.
create new file on config/initializers/publisher/bunny_publisher.rb than set logger and connection. for simpliest setup can be used like Bunny.new("amqp://guest:guest@localhost:5672")
but we can also set more advance setting like :
@connection ||= begin
instance = Bunny.new(
addresses: 'localhost:5672',
username: 'guest',
password: 'guest',
vhost: '/',
logger: Rails.logger
)
instance.start
instance
end
It will create connection when not initiated before.
After initializer created you can create service on app/services/user_publisher.rb
, with main method publish
def publish(options = {})
channel = ::Publisher::BunnyPublisher.connection.create_channel
exchange = channel.exchange(
'sneakers',
type: 'direct',
durable: true
)
headers = { 'x-delay' => options[:delay_time].to_i * 1_000 } if options[:delay_time].present?
exchange.publish(payload.to_json, routing_key: QUEUE_NAME, headers: headers)
end
make sure queue name in publisher same with customer, than publisher ready to use.
Set consumer
For background processing consumer we choose sneakers, install via gem install sneakers
create initializer on config/initializers/sneakers.rb
and set configuration:
Sneakers.configure connection: Connection.sneakers,
exchange: 'sneakers,
exchange_type: :direct,
runner_config_file: nil,
metric: nil,
workers: 1,
log: STDOUT,
pid_path: 'sneakers.pid',
timeout_job_after: 5.minutes,
env: ENV['RAILS_ENV'],
durable: true,
ack: true,
heartbeat: 2,
handler: Sneakers::Handlers::Maxretry
Sneakers.logger = Rails.logger
Sneakers.logger.level = Logger::WARN
we choose ack option to be true to make sure message must be acknowledge when process is finished on consumer
and then we create worker on app/workers/user_create.rb
include Sneakers::Worker
QUEUE_NAME = ::UserPublisher::QUEUE_NAME
from_queue QUEUE_NAME, arguments: { 'x-dead-letter-exchange': "#{QUEUE_NAME}-retry" }
def work(msg)
data = ActiveSupport::JSON.decode(msg)
data['users'].each do |user|
update_user(user.to_h)
end
ack!
rescue StandardError => e
create_log(false, data, message: e.message)
reject!
end
so on above code we get message from rabbitMq and we decode and iterate and update 1 by one, when no raise error it will call ack! to inform in RabbitMq if message already done processed
now we can call consumer rake, via rake sneakers:run
, make sure in ENV WORKERS=UserCreate
it scan worker folders with class_name UserCreate
Test publishing and consuming
To publish you can call UserPublisher
and it will consumed automatically.
irb(main):002:0> user_params = [{id: 1, first_name: 'first'}]
=> [{:id=>1, :first_name=>"first"}]
irb(main):003:0> UserPublisher.new(user_params).publish
on cosumer it will wait for any message
rake sneakers:run
2019-08-17T05:36:41Z p-83921 t-owt7p5uik DEBUG: [worker-user.create:1:d5ckph][#<Thread:0x00007fd5b0885d78 run>][user.create][#<Sneakers::Configuration:0x00007fd5b6efd240>] New worker: subscribing.
2019-08-17T05:36:41Z p-83921 t-owt7p5uik DEBUG: [worker-user.create:1:d5ckph][#<Thread:0x00007fd5b0885d78 run>][user.create][#<Sneakers::Configuration:0x00007fd5b6efd240>] New worker: I'm alive.
2019-08-17T05:41:03Z p-83921 t-owt7x84vw DEBUG: [worker-user.create:1:d5ckph][#<Thread:0x00007fd5b225af78@/Users/ndaru/.rbenv/versions/2.5.0/lib/ruby/gems/2.5.0/gems/bunny-2.14.2/lib/bunny/consumer_work_pool.rb:101 run>][user.create][#<Sneakers::Configuration:0x00007fd5b6efd240>] Working off: "{\"users\":[{\"id\":1,\"first_name\":\"first\"}]}"
You can trace repository in here