How To Publish Messages To RabbitMQ In Java Development?

In this article, java development experts are sharing an overview of RabbitMQ and how to publish messages to RabbitMQ. All the code and methods shared in this post are intended for reference purpose only. If you have any doubt, clear it with experts by asking it in your comments.

RabbitMQ is the cross-platform message broker, which is used accept the incoming messages and it can publish the messages to message listener it can be one or many.

rabbitmq-architecture-min

RabbitMQ supports for a various number of languages, like Java,Erlang,Python, Objective-C,Ruby,PHP,C#,JavaScript etc…

It stores all message on the disk so that messages will not lose it server went to fatal state. It provides recovery mechanism to recover to the previous state.

RabbitMQ is used not only as message broker but also it used to integrate two different cross-platform applications.

RabbitMQ also supports RMI(Remote Method Invocation) using dynamic Queues.RabbitMQ has various applications for Admin application, for tracing the message. It has a plugin for STOMP protocol to support web sockets.

RabbitMQ overview:

RabbitMQ provides Java client to interact with RabbitMQ message broker.Main classes involved are:

  • Channel: This interface is used for protocol specific operations.RabbitMQ provides AMQP Protocol. This will convert the messages from different languages for the different platform to send and receive the messages.
  • Connection: It is the class is used for open Connections to RabbitMQ and closing the connections, registering message listener.
  • ConnectionFactory: It is the connection specific settings for RabbitMQ like username, password, and host details.

Creating Connection to RabbitMQ:

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername(userName);

factory.setPassword(password);

factory.setVirtualHost(virtualHost);

factory.setHost(hostName);

factory.setPort(portNumber);

Connection conn = factory.newConnection();

Where we can provide the userName, password, hostAddress and portnumber.

OR

We can construct URI using these fields.

ConnectionFactory factory = new ConnectionFactory();

factory.setUri(“amqp://userName:password@hostName:portNumber/virtualHost”);

Connection conn = factory.newConnection();

Creating channel using Connection Object:

Channel channel = conn.createChannel();

This channel is used to send and receive the message to/from RabbitMQ.

Disconnecting RabbitMQ:

channel.close();

conn.close();

If we can close the channel and connection objects, we cannot send/receive the messages from RabbitMQ.

Registering Exchanges and Queues:

Before sending/receiving a message(s) from RabbitMQ we need to declare them first and using channel we can bind with them, then only we can send/receive the messages. If declared exchanges/queues are not present then it will create a new one.

channel.exchangeDeclare(exchangeName, “direct”, true);

String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, exchangeName, routingKey);

exchangeName is a binding key which is used to map queues with Exchanges, exchanges will route messages to all bind queues using routingKey.

Exchange Types:

direct:Exchange will bind to the queue using routingKey, each queue will bind to an exchange using routing key, whenever a message comes to exchange if the routingKey matches then only the message will copy to queue, otherwise not. Generally, it is used for unicast messages.

1_msg-min

Declaring direct Exchange:

channel.exchangeDeclare(exchangeName, “direct”, true);

Topic: Topic Exchange type will accept routingKey as a regular expression, if the message routing key pattern matches the routing keys of the queues then the only message will be copied to queues otherwise not. It can be used as publish/subscribe model. Generally, this type Exchange used for multicasting messages.

2-min

Declaring topic Exchange:

channel.exchangeDeclare(exchangeName, “topic”, true);

fanout: It is simple to exchange type, it will broadcast all messages that it received to queues that it is bound.whenever a message comes to Exchange then it will copy that message to all queue ignoring the routingKey.

Fanout Routing Example

3-min

Declaring fanout exchange:

channel.exchangeDeclare(exchangeName, “fanout”, true);

headers: this type of Exchange will depend upon attributes, if the attributes are matched then only it will copy to queue-match header will decide whether it should consider all attributes for matching or single match. If this header value is any then if one of the attributes is matches then it will send the message to the queue.

Declaring Header Exchange:

channel.exchangeDeclare(exchangeName, “headers”, true);

The third argument to the exchange Declare is durability, whether we want to store persist the exchange or not. If the value is true it will be available after server restart, otherwise it not available.

Publishing Messages to RabbitMQ:

Using channel we can publish the messages to RabbitMQ, the channel has a method basic Publish method which will publish the message.

byte[] messageBytes = “Hello, world! “getBytes();

channel.basicPublish(exchangeName, routingKey, null, messageBytes);

Here the first argument is the exchangeName, the second is message routingKey, the third argument is the message properties and the fourth argument is the byte format of the message.

Attaching properties to message:

byte[] messageBytes = “Hello, world!”.getBytes();

channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,messageBytes);

 

Adding Custom Properties to message:

Using AMQP.BasicProperties we can create properties and we can pass properties to the channel.

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()

.contentType(“text/plain”)

.deliveryMode(2)

.priority(1)

.userId(“test”)

.build();

byte[] messageBytes = “Hello, world!”.getBytes();

channel.basicPublish(exchangeName, routingKey, properties, messageBytes);

Consuming the Messages:

Consumer is the interface provided by RabbitMQ API to receive the messages, the consumer will get channel object.individual subscriptions are referred by consumerTag.While getting the data from RabbitMQ we need to pass this consumerTag, each consumer will have their consumerTag even if they are on the same channel. For each Exchange there may be one or more no.of consumers will receive the message, using consumerTag RabbitMQ will uniquely identify the consumer.

Eg:

boolean autoAck = false;

channel.basicConsume(queueName, autoAck, “myConsumerTag”,new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException

{

String routingKey = envelope.getRoutingKey();

String contentType = properties.getContentType();

long deliveryTag = envelope.getDeliveryTag();

channel.basicAck(deliveryTag, false);

}

});

 

Here we are specified auto-acknowledgement as false, so we need to manually acknowledge the messages delivered to the consumer using a handleDelivery method.

 

Useful methods in connection object:

 

addShutdownListener(ShutdownListener listener): This method is used to call after the object to immediately after close.

isOpen: method is used to test whether the connection is open or closed.

getCloseReason: we will get the reference to connection or channel which fired the exception.

close: manually closing the connection to RabbitMQ.

Advanced Configuration options for Connection Factory:

Each consumer will run in a separate thread, instead of creating the thread each time we can use the thread pool to get the new thread which will consume the messages.

We can create an ExecutorService which is used to create a fixed size connection pool, we can pass this executor service to connection factory.

 

ExecutorService es = Executors.newFixedThreadPool(20);

Connection conn = factory.newConnection(es);

Re-Try configuration for network failures:

If the connection between client and RabbitMQ server lost due to network failures, we can re-try the connection after some regular intervals. First, we need to enable the automatic recovery and we need to provide interval after how much period we want to re-try the connection to RabbitMQ server.

Eg:

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(10000); //10 sec.

Recovery Listener:

As the connectionFactory.newConnection and connection.createChannel method return types return the object which extends the Recoverable interface, so we can add a listener and we can detach the listeners to it.Once the connection is recovered these listener will invoke.

Example:

Sample java class to send message to RabbitMQ:

public class Sender {

private static final String EXCHANGE_NAME = “logs”;

public static void main(String[] args) throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, “fanout”);

String message = “Hello World!!!”

channel.basicPublish(EXCHANGE_NAME, “”, null, message.getBytes());

System.out.println(” [x] Sent ‘” + message + “‘”);

channel.close();

connection.close();

}

}

Sample java class to receive the message from RabbitMQ:

 

public class Receiver {

private static final String EXCHANGE_NAME = “logs”;

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, “fanout”);

String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, EXCHANGE_NAME, “”);

System.out.println(” [*] Waiting for messages. To exit press CTRL+C”);

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, “UTF-8″);

System.out.println(” [x] Received ‘” + message + “‘”);

}

};

channel.basicConsume(queueName, true, consumer);

}

}

Hope the purpose of an article is well delivered by java development experts. You can still ask your queries to them and get answers soon. For queries related to java, RabbitMQ, and other IT terms, you can write to IT professionals any time.

Conclusion: RabbitMQ is message broker that can be used to integrate the two different applications, it provides client API for different languages. Using the connection, the channel we can send/receive the messages from RabbitMQ, using durable either we can persist the exchange/queue details. Using different Exchange types we can do unicast and multicast messages.

 

Author bio:-

James Warner is working as content developer in a reputed java development company. You can share your thoughts regarding this post with her and suggest some topics. She will pick the interesting one and share her thoughts on the same.

 

Leave a Reply

Your email address will not be published. Required fields are marked *