Back
Featured image of post RabbitMQ fanout exchange

RabbitMQ fanout exchange

A message queue is a form of asynchronous service-to-service communication used in serverless and microservices architectures. One of the good example is by using RabbitMQ. In this article, we will demonstrate how to perform a publish and subscribe process via RabbitMQ

Prerequisite:

  • Please ensure you’ve installed RabbitMQ in your local machine.
    • Alternatively, you may run RabbitMQ docker container
    • docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
  • We will using RabbitMQ Javascript client amqplib. Please ensure you’ve setup a new project with this package installed.
    • Alternatively, you can also clone this project to have application-ready demonstrations.

We are going to demonstrate the Fanout exchange message queue flow as shown in the figure below

exchange flow

Code demonstration

We will run publisher.js and comsumer.js separately.

publisher.js

const amqp = require("amqplib");
async function fanoutExchange(){
    try{
        const rabbitmqUrl = "amqp://localhost:5672";
        const connection = await amqp.connect(rabbitmqUrl);
        const exchange = "transports";
        const exchangeType = "fanout";
        const routingKey = "";
        const options = {};
        const payload = {
            vehicleType: "car",
            numberOfPassenger: 3,
        };
        let channel = await connection.createChannel();
        await channel.assertExchange(exchange, exchangeType, options);
        channel.publish(
            exchange,
            routingKey,
            Buffer.from(JSON.stringify(payload)),
            options
        );
    }catch(error){
        console.error(error)
    }
}
fanoutExchange()

consumer.js

const amqp = require("amqplib");
async function fanoutExchangeConsumer(){
    try{
        const rabbitmqUrl = "amqp://localhost:5672";
        const connection = await amqp.connect(rabbitmqUrl);
        const exchange = "transports";
        const exchangeType = "fanout";
        const routingKey = "";
        const options = {};
        let channel = await connection.createChannel();
        await channel.assertExchange(exchange, exchangeType, options);
        const { queue } = await channel.assertQueue("", options);
        channel.bindQueue(queue, exchange, routingKey);
        channel.consume(queue, (data) => {
            console.log("Received", JSON.parse(data.content.toString()));
            channel.ack(data, false, true);
        });
    }catch(error){
        console.error(error)
    }
}
fanoutExchangeConsumer()

Before we start:

  • To have better visualization, please run the publisher.js in 1 terminal and consumer.js in 2 terminals. You can either use the split terminal in Visual Studio Code or run the files in your machine terminals.
  • If you’re not using the example from the provided Git Repository. Please ensure you run the consumer.js in 2 terminals followed by running publisher.js in 1 terminal

Result for this example

Steps to demo from Git Repository

  • Open a new terminal
  • npm run dev to start the server
  • Open 2 new terminals and run the following command:
    • npm run consumer
  • Make a POST request to http://localhost:3000/api/fanout/exchange/transports with your custom payload.

Result

Explanations

  • Publisher

    1. Before the process started, we need to first connect to the RabbitMQ.
    2. The next step is to make connection with the channel and start creating our desired exchange by using assertExchange() method
      • In here we specify the following argument’s value:
        • exchange name is transports in this example
        • exchange type is fanout
        • we didn’t specify the options in this example but feel free to have a look on the available options at here
    3. And now, we can start publishing our message to the exchange by using publish() method with the following arguments:
      • we can use back the exchange name we specified earlier ‘transports’
      • routingKey, we could just leave it as empty as it doesn’t apply anything for fanout exchange type
      • payload, Publish message only accepts buffer payload. To achieve this, we can first stringify our payload if it’s an object and convert it into a Buffer by using Buffer.from
      • options, we didn’t specify the options in this example but feel free to have a look at the available options at here
  • Comsumer

    1. Before the process started, we need to first connect to the RabbitMQ.
    2. The next step is to make connection with the channel and start creating our desired exchange by using assertExchange() method
      • In here we specify the following argument’s value:
        • exchange name is transports in this example
        • exchange type is fanout
        • we didn’t specify the options in this example but feel free to have a look at the available options at here
    3. And now we can start to create our queue and bind with the exchange we created earlier.
      • As you can observe from the code, we are putting it as an empty string when creating the queue channel.assertQueue("",options). This is because we do not need to bother with the queue name when binding the exchange and queue. By putting an empty string for the queue name, it will return us a unique and random queue name such as amq.gen-JzTY20BRgKO-HjmUJj0wLg
      • There are some useful options we can take note of when creating the queue:
        • durable : if true, the queue still stays alive even if we restart the broker.
        • exclusive : if true, the queue will be deleted when connection is closed.
        • expires : specify the time in a millisecond to delete the queue when no consumer is connecting to the queue.
        • checkout other options here
        • We could just leave the routing key an empty for the binding between exchange and queue because it does not apply anything for the fanout exchange.
        • Check out the direct and topic exchange for the use case of routing key
    4. After the binding process is finished, we can now consume the message whenever there is an incoming message from the broker by using .consume() method
    5. When we received the message, we can now acknowledge the message by using ack() method so that the broker will know we successfully retrieve the message.
comments powered by Disqus