Skip to content

Publish subscribe on ruby on rails

  • by

Using RabbitMQ bunny and sneakers

This tutorial contain example how to implement service oriented architecture on ruby on rails, using RabbitMQ, bunny as producer and sneakers as consumer. This introduction accommodating step by step from first installation.

I got idea from this medium post and adding some additional step.

Initiation and environment setup

First of all, we create skeleton of rails, using command: rails new {branch_name} -d mysql, this function create basic rails file and folder and using database mysql instead default sqlite.

After files are initiated, go to folder via cd {branch_name} and then type bundle to install rails dependencies. After success go to config/database.yml and set some environment variable like host, username and password.

default: &default  
adapter: mysql2
encoding: utf8
pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 5 } %>
host: <%= ENV.fetch("RAILS_HOST_DB") %>
username: <%= ENV.fetch("RAILS_USERNAME_DB") %>
password: <%= ENV.fetch("RAILS_PASSWORD_DB") %>
socket: /tmp/mysql.sock

For development convenience i set some environment on .env and read using gem dotenv-rails, on Gemfile add gemfile 'dotenv-rails' on group :development, :test section and type bundle again in terminal. After dependencies were updated, create .env file on root folder and fill like

RAILS_HOST_DB=127.0.0.1
RAILS_USERNAME_DB=root
RAILS_PASSWORD_DB=secret_password

also on config/application.rb add new line Dotenv::Railtie.load

Do initialize database using command rails db:create , and voila now on your mysql instance, two database created.

We continue to create model using command rails generate model User first_name:string last_name:string email:string , it will automatically generate model, test/model and migration file on db/migrate.

Try to runrails db:migrate to create table and column on schema database.

Since we try using rspec and shoulda matcher to test, we adding additional gem on test section

gem ‘rspec-rails’
gem ‘shoulda-matchers’

and again type bundle and additional rails generate rspec:install to create spec_helper and rails_helper

Finally we back to model User.rb and add simple validation like validates :first_name, :last_name, :email, presence: true to make validation of mandatory variable

Set publisher

Install rabbitMq client, one of gem is bunny using command:gem install bunny and set initializer. You can use initializers to hold configuration settings that should be made after all of the frameworks and plugins are loaded.

create new file on config/initializers/publisher/bunny_publisher.rb than set logger and connection. for simpliest setup can be used like Bunny.new("amqp://guest:guest@localhost:5672") but we can also set more advance setting like :

@connection ||= begin
  instance = Bunny.new(
    addresses: 'localhost:5672',
    username: 'guest',
    password: 'guest',
    vhost: '/',
    logger: Rails.logger
  )
  instance.start
  instance
end

It will create connection when not initiated before.

After initializer created you can create service on app/services/user_publisher.rb , with main method publish

def publish(options = {})
  channel = ::Publisher::BunnyPublisher.connection.create_channel
  exchange = channel.exchange(
    'sneakers',
    type: 'direct',
    durable: true
  )
  headers = { 'x-delay' => options[:delay_time].to_i * 1_000 } if options[:delay_time].present?
  exchange.publish(payload.to_json, routing_key: QUEUE_NAME, headers: headers)
end

make sure queue name in publisher same with customer, than publisher ready to use.

Set consumer

For background processing consumer we choose sneakers, install via gem install sneakers

create initializer on config/initializers/sneakers.rb and set configuration:

Sneakers.configure  connection: Connection.sneakers,
  exchange: 'sneakers,
  exchange_type: :direct,
  runner_config_file: nil,
  metric: nil,
  workers: 1,
  log: STDOUT,
  pid_path: 'sneakers.pid',
  timeout_job_after: 5.minutes,
  env: ENV['RAILS_ENV'], 
  durable: true,
  ack: true,
  heartbeat: 2,
  handler: Sneakers::Handlers::Maxretry
Sneakers.logger = Rails.logger
Sneakers.logger.level = Logger::WARN

we choose ack option to be true to make sure message must be acknowledge when process is finished on consumer

and then we create worker on app/workers/user_create.rb

include Sneakers::Worker
QUEUE_NAME = ::UserPublisher::QUEUE_NAME
from_queue QUEUE_NAME, arguments: { 'x-dead-letter-exchange': "#{QUEUE_NAME}-retry" }
def work(msg)
  data = ActiveSupport::JSON.decode(msg)
  data['users'].each do |user|
    update_user(user.to_h)
  end
  ack!
rescue StandardError => e
  create_log(false, data, message: e.message)
  reject!
end

so on above code we get message from rabbitMq and we decode and iterate and update 1 by one, when no raise error it will call ack! to inform in RabbitMq if message already done processed

now we can call consumer rake, via rake sneakers:run, make sure in ENV WORKERS=UserCreate it scan worker folders with class_name UserCreate

Test publishing and consuming

To publish you can call UserPublisher and it will consumed automatically.

irb(main):002:0> user_params = [{id: 1, first_name: 'first'}]
=> [{:id=>1, :first_name=>"first"}]
irb(main):003:0> UserPublisher.new(user_params).publish

on cosumer it will wait for any message

rake sneakers:run           

2019-08-17T05:36:41Z p-83921 t-owt7p5uik DEBUG: [worker-user.create:1:d5ckph][#<Thread:0x00007fd5b0885d78 run>][user.create][#<Sneakers::Configuration:0x00007fd5b6efd240>] New worker: subscribing.
2019-08-17T05:36:41Z p-83921 t-owt7p5uik DEBUG: [worker-user.create:1:d5ckph][#<Thread:0x00007fd5b0885d78 run>][user.create][#<Sneakers::Configuration:0x00007fd5b6efd240>] New worker: I'm alive.
2019-08-17T05:41:03Z p-83921 t-owt7x84vw DEBUG: [worker-user.create:1:d5ckph][#<Thread:0x00007fd5b225af78@/Users/ndaru/.rbenv/versions/2.5.0/lib/ruby/gems/2.5.0/gems/bunny-2.14.2/lib/bunny/consumer_work_pool.rb:101 run>][user.create][#<Sneakers::Configuration:0x00007fd5b6efd240>] Working off: "{\"users\":[{\"id\":1,\"first_name\":\"first\"}]}"

You can trace repository in here

Leave a Reply

Your email address will not be published. Required fields are marked *