At Citrine, we have a long data pipeline that processes data uploaded by our users. This data can be as simple as a basic PDF document, or as complex as a rigidly structured set of materials properties in a CSV or JSON file.To make it easy for clients in multiple languages to join the fun, we chose RabbitMQ for messaging. This post has a few tips & tricks you might find handy when using RabbitMQ with Ruby and Rails.

Subscribe with Sneakers

Sneakers is a fantastic library for letting your Rails application subscribe to RabbitMQ message queues. It’s very easy to configure with your Rails environment, so you have access to ActiveRecord or anything else you need. It’s also easy to get going on Heroku with a simple Procfile configuration:


-- Procfile
web: bundle exec puma -p $PORT -c config/puma.rb
worker: WORKERS=’EmailWorker’ bundle exec rake sneakers:run
bigworker: WORKERS=’BigWorker’ bundle exec rake sneakers:run

We spread our workers out across different dynos to maximize resource utilization and to easily scale our bigger, slower jobs. For example, a particular job might need a lot of memory or CPU but the rest of your workers are lightweight.


You can also override the global sneakers configuration (set in config/sneakers.rb) per-worker, which is handy for particular jobs that might need to override a timeout or prefetch setting. Sneakers makes this really easy:


-- app/workers/one_worker.rb
class EmailWorker
  include Sneakers::Worker
  from_queue ‘queue.email_queue’,
             :timeout_job_after => 180  #override config/sneakers.rb

Test your Workflow

Testing your Sneakers workers (or your entire RabbitMQ workflow) can be cumbersome. We found a library, Moqueue, that eases a lot of this pain. It’s quite old and documentation is sparse (or non-existent, but you can view a cached version of the introductory blog post here ), but thankfully it’s not rocket science. The gist is that it manages your message queues in memory and doesn’t require a running AMQP broker. To get started, you’ll need a bit of boilerplate. First, you need a few methods in your test helper:


-- test/test_helper.rb
module MoqueueRunner
  def run
    logger.info ‘Worker subscribed.’
    @queue.subscribe {|header, msg| self.work msg }
    logger.info ‘Worker alive.’
  end
end

def make_saferun(clazz)
  clazz.include(MoqueueRunner) # custom Moqueue runner for sneakers workers
end

This code just makes our Sneakers workers play nicely with Moqueue’s AMQP situation. Each test also needs to set up the queues it uses, set the worker to use the MoqueueRunner, and finally make our internal AMQP publishers use Moqueue. Note that our publishing class is custom, but allows for the injection of a custom AMQP channel for testing & flexibility.


-- test/models/one_worker_test.rb
def setup
  overload_amqp # start moqueue
  make_saferun(EmailWorker) # set the runner
  # setup the queue for this test
  @mq = MQ.new
  @qname = ‘queue.email_queue’
  @topic = @mq.topic(‘queue’)
  @queue = @mq.queue(@qname) 
  @queue.bind(@topic, :key => @qname)
  # make your publisher use Moqueue by adding
  AMQP::ConnectionManager.stub :get_publisher, { pub: @topic, keyname: key } do
    @publisher = Publisher::MyPublisher.new(“queue.another_queue”, @mq)
    end
  end

def teardown
  reset_broker # reset the queues for each test
end

OK! You’re almost ready to test. Here’s a simple test that makes sure a queue contains a message we think it does. This particular test will make sure that upon delivery of a certain message, an e-mail will get sent.


-- test/models/one_worker_test.rb
test ‘email sent on message receive’
  worker = EmailWorker.new(@queue)
  worker.run
  @topic.publish( { ‘message’ => “send email”, email: ‘president@whitehouse.gov’ }, key: “queue.email_queue”)
  assert_not ActionMailer::Base.deliveries.empty?
end

You can also test some information about the queues themselves.


-- test/models/one_worker_test.rb
test ‘message contains email’
  d = EmailObject.create(email:’president@whitehouse.gov’)
  d.publish_self
  assert @queue.received_messages.size >= 1
  assert @queue.received_messages[0][‘email’] == ‘president@whitehouse.gov’
end

There you have it! It’s not the easiest thing in the world, but you can at least test some behavior with your asynchronous workers.



Comment