Utilizing RabbitMQ ‘s Delay Capability on Use Case : Periodical/Delayed Transaction Status Checking (Using Golang Library Streadway)

This article inspired by another article written by Mr . Eko Kurniawan Khannedy which share his tought about the case of delayed job using rabbitmq — the original article can be found here.

Rizky Ramadhan
7 min readMay 29, 2021
rodeo:gopher riding rabbit

Pain Point

So, this is real problem that I face in my project. There is a rule that make our application had to retry-checkStatus to the third party API if occur timeout on first attempt flagging to the API. Transaction acknowledged to be done if checkStatus to third party API responsed either success or fail. As long as the call to third party responding as timeout, checkStatus process will be retried. Depart on that baseline, we came up to initial idea of using cronjob to solve the case.

Actually, the easiest solution to this problem is to add screen “check status” on the application that can be accessed by the user. So that user can trigger retry-checkStatus to third party API by their self. But this approach can be risky, considering user can abuse the retry-checkStatus flow by firing submission of “check status” on massive repetitive way. Resulting on condition of denial of service. Backed by that case, we consider to re-think another approach to solve the retry-checkStatus issue.

Considering cronjob approach, we also do litttle research and identify some issue that potentially harmful on production if we continue to apply cronjob. This solution expect us to provide an application (red:cronjob-apps) to be called by cronjob on periodical basis and database to store data that ready to be processed by the cronjob-apps. We found out earlier that using combination of cronjob-apps-database will be costly on resource and logically complex. Costly because on periodical basis — let say we set cron to run every 10 minute — cronjob-apps will do query bulk of data (I/O task scan data) on database. Logically complex because many conditional case to cover on variation of different user to fully solve the issue. Far more, possible to occur, in some interval of time, let say on 11 P.M to 12 P.M there is no transaction at all, but job that run on 10minute basis keep do query/scanning the data, which resulting on overusing un-needed resource of database server.

Things to Realize

Now we have pain point that we need to take care of. We concern of the business flow also reliability of our database and on the same time we care of our application performance. So We was wondering, how to solve this problem on most modest way. Fortunately, we meet the article written by Mr Eko Khannedy, that tell us story about utilizing capability of rabbitMQ to delaying message that come into broker. With rabbitmq’s x-delayed-message only the data that necessary to be processed which will make it through database. No extravagant resource.

To make story short, using rabbitMQ and it’s x-delay-message our team can obtain better way to do retry-checkStatus issue to third party API. When timeout occur on first attempt of flagging on API, checkStatus message will be send to rabbitmq together with the header “x-delay” with its value e.g 5000ms. Rabbitmq will recognize message with x-delay, and do push message to consumer/worker on exact 5000ms after message arrive at rabbitMQ. Consumer which accept the message, will do checkSstatus on the API. If the response success or fail, the checkStatus message will be dequeue from rabbit, on the other hand, if response still timeout, the message will be requeue with same x-delay as before(5000ms). No extra resource needed, no over-query occur to database.

simple flow of the case

How We Setup the Solution

First thing first, to enable delay message capability we need to install delay plugin on our rabbitMQ instance.

Installing Delay Plugin

plugin reside on plugin folder. It has different location on different OS. you can see the path below according to your OS.

where our plugins reside

check your plugins folder, if you want to obtain delay capability, plugin “rabbitmq_delayed_message_exchange-3.x.x” must exist

list of existing plugin of your rabbitMQ

you can download the plugin from this link. Then copy the .ez file to your plugin folder

community plugin, let you download plugin

if the plugin had been copied to plugin folder, you can type the command below:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Once the plugin has been enabled, we are ready to start using it.

Create Exchange for Delayed Job

On our project case, application will act as producer to rabbitMQ, and will communicate to exchange, not to queue. Thing to remember is, the capability to delay message is reside on exchange part. Exchange has the attribute type of “x-delayed-message” which can be choosen when we create a new exchange. When message arrive at exchange, it will delay to do send message to the queue base on “x-delay” header sent by producer. Set the new exchange like the picture below

Create Exchange to Delay Job RabbitMQ

Creating Queue to Accept the Message

After create exchange, next part is create the queue to enable push base message to the consumer.

Create Queue to Enable Push Message to Consumer

Binding Exchange to Queue

Producer will send data to exchange, then consumer will get data from queue. In order to make consumer get the data from exchange, we need to setup binding from exchange to the queue that we desire.

Click the newly created exchange on exchange tab,

choose the newly created “my-exchange”

then find bind form, to create new binding. fill in “my-queue” to the field below. So message from producer can be flowed through exchange “my-exchange” to consumer through queue “my-queue”

binding exchange to queue

As result all the message which our producer sent to “my-exchange” (exchange delayed job) will be forwarded to the queue base on it’s x-delay value.

Exchange binding on RabbitMQ

Coding Part

In this article we try to share our producer-consumer delayed message using Go. We re-using the popular rabbitMQ streadway/amqp library to do the rabbit part. Fortunately, rabbitMQ tutorial page already give us clear tutorial on how using the library as a producer, and as a consumer. Our part is only modifying so that the code can solve our specific problem. On producer side, the modification that we made is adding header with x-delay 5000ms. On consumer side, we adding capability of consumer to not only consuming message, but also re-producing message to the queue with the same x-delay 5000ms. So that we get a forever loop queue-requeue message in order to prove the implementation of delayed message.

Create Producer

  • line#15, connect to RabbitMQ server. The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us
  • line#19, we create a channel, which is where most of the API for getting things done resides
  • line#25, To send, we must declare an exchange for us to send to; then we can publish a message to the exchange; “my-exchange” is the name of the exchange that has capability to delay message before forwarding it to the queue
  • line#33, x-delay parameter on Header set to 5000, so message that entering exchange will wait for 5000ms or 5s before sent to the queue
producer’s source code

Create Consumer

  • line#25, init queue object. If the queue not exist, this command force to create new queue
  • line#34, set consumer to receive message from queue named “my-queue” which is binded with “my-exchange” exchange
  • line#47, We’re about to tell the server to deliver us the messages from the queue. Since it will push us messages asynchronously, we will read the messages from a channel (returned by amqp::Consume) in a goroutine
  • line#52, after consuming message, consumer send back message with header x-delay 5000ms. So that the simulation of check status every x second can be shown

Test the Result

Here is the result of the test. Every 5s delayed message entering the queue, then consumed and send back to the queue with same delayed time.

every 5 second, message requeued to rabbitmq

Hope this article useful. Never stop being curious … :)

Reference:

--

--

No responses yet