All Collections
Data Destinations
Building a Queue Consumer
Building a Queue Consumer

How do I pick up events from RabbitMQ queues?

B
Written by Ben Mills
Updated over a week ago

Using RabbitMQ queues as your destination is an efficient way to stream thousands of data records per second. But how exactly do you connect to the queue and consume your data? 

In this tutorial, we'll walk through an example python script that consumes events from a queue and prints them to a screen.

A Note Before We Start

This tutorial is based on a python tutorial from rabbitmq.com. It uses the pika python library as recommended by the RabbitMQ team; however, their examples assume that the RabbitMQ server is running on localhost, which is not the case when you're consuming from a BitBrew queue. So, as you will see, we will add some python code in order to specify the settings needed to access your queue.

The RabbitMQ website has example code in many other programming languages, which you are free to use instead, but you will have to specify the needed settings in your chosen language.

Build A Python Queue Consumer

Without further ado, let's show you how to build a consumer.

Start the Script

First, you need to create a python script and import the pika library, ssl and json. Here's the start of the script:

#!/usr/bin/env python3
import pika
import ssl
import json

Specify Connections to RabbitMQ

Next, you need to connect to your organization's virtual host (vhost) on BitBrew's hosted RabbitMQ instance. All of the queues you created are located in your tenant's vhost. 

The name of your vhost will be in the format of tenant{tenantId}. So, if your tenant ID is banana-cars, your vhost is tenantbanana-cars .

You can find your tenant ID in the Account tab under Settings in the BitBrew Dashboard app.

In the next part of the script, provide your Dashboard-generated Queue Credentials and the connection information:

  • username: {generatedUserName} 

  • password: {generatedPassword}

  • vhost: tenant{tenantId} 

  • queue: {queueName} 

  • ack: False

  • host: rabbitmq-production.kubernetes.bitbrew.com 

  • port: 5671

Note:  ack=True  means that consumer acknowledgements will take place automatically and will not need to be done manually.

username = 'iJd8E3Vb@bitbrew.com'
password = 'uncrackableGeneratedPassword'
vhost = 'tenantbanana-cars'
queue = 'myQueueName'
ack = False
host = 'rabbitmq-production.kubernetes.bitbrew.com'
port = 5671

This part of the script provides the information needed for connecting.

Configure SSL and Open a Connection

Next, you need to configure ssl and open a connection.

ssl_context = ssl.create_default_context()
ssl_options = pika.SSLOptions(ssl_context, host)
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host, port, vhost, credentials, ssl_options=ssl_options)

Define A Callback Function

Then, you need to define a callback function. This is the meat of your consumer, which determines what you do with the data you consume. 

Our example will extract the events from our queue and print them out to the screen.

def callback(ch, method, properties, body):
    print(body)

Tell your script to use this callback function for events from your queue:

channel.basic_consume(queue, callback,
                      auto_ack=(not ack))

Start the Consumer

Lastly, tell the channel to start consuming.

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

The Whole Example Script Together

#!/usr/bin/env python3
import pika
import ssl
import json

username = 'iJd8E3Vb@bitbrew.com'
password = 'uncrackableGeneratedPassword'
vhost = 'tenantbanana-cars'
queue = 'myQueueName'
ack = False
host = 'rabbitmq-production.kubernetes.bitbrew.com'
port = 5671

ssl_context = ssl.create_default_context()
ssl_options = pika.SSLOptions(ssl_context, host)
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host, port, vhost, credentials, ssl_options=ssl_options)
)

def callback(ch, method, properties, body):
    print(body)

channel = connection.channel()
channel.basic_consume(queue, callback, auto_ack=(not ack))
print(' [*] Waiting for messages. To exit, press CTRL+C')
channel.start_consuming()

Conclusion

Running this script will consume all the records in your queue and display the events on the screen. 

This is a great first exercise to try on your own as you work to develop more sophisticated callback methods to leverage your event data and build application features around it.

Did this answer your question?