RabbitMQ Dynamic Queue Add And Listen At Runtime With SpringBoot
RabbitMQ is the most widely deployed open source message broker. Queues in RabbitMQ are ordered collections of messages. Messages are enqueued and dequeued (delivered to consumers) in the FIFO manner. RabbitMQ uses AMQP (messaging protocol) protocol as its messaging protocol.It provides communication opportunity independent of the platform and the messages it sends mainly consist of header, properties and message fields.
In this article, we will explain how to add queue to RabbitMQ server at runtime with Spring Boot and how to listen to the added queue without restarting the system.
Now let’s see how to create our dynamic queue structure step by step.
Rabbit Mq Global Configuration
@Configuration
public class RabbitMqConfiguration implements RabbitListenerConfigurer {@Autowired
private ConnectionFactory connectionFactory;@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory);
}@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry(){
return new RabbitListenerEndpointRegistry();
}@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setPrefetchCount(1);
factory.setConsecutiveActiveTrigger(1);
factory.setConsecutiveIdleTrigger(1);
factory.setConnectionFactory(connectionFactory);
registrar.setContainerFactory(factory);
registrar.setEndpointRegistry(rabbitListenerEndpointRegistry());
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
}
Let’s look at how to add a new queue to the RabbitMQ service at the runtime stage after making the global configurations for the application.
First, let’s create the RabbitQueueService interface, where we will do our operations related to the queue.
public interface RabbitQueueService {
void addNewQueue(String queueName,String exchangeName,String routingKey);
void addQueueToListener(String listenerId,String queueName);
void removeQueueFromListener(String listenerId,String queueName);
Boolean checkQueueExistOnListener(String listenerId,String queueName);
}
Here are the methods of adding a new queue, listening to the added queue in the application, removing the queue from the listener, and checking whether the queue is given in the listener.
@Service
@Log4j2
public class RabbitQueueServiceImpl implements RabbitQueueService {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@Override
public void addNewQueue(String queueName, String exchangeName, String routingKey) {
Queue queue = new Queue(queueName, true, false, false);
Binding binding = new Binding(
queueName,
Binding.DestinationType.QUEUE,
exchangeName,
routingKey,
null
);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(binding);
this.addQueueToListener(exchangeName,queueName);
}
@Override
public void addQueueToListener(String listenerId, String queueName) {
log.info("adding queue : " + queueName + " to listener with id : " + listenerId);
if (!checkQueueExistOnListener(listenerId,queueName)) {
this.getMessageListenerContainerById(listenerId).addQueueNames(queueName);
log.info("queue ");
} else {
log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
}
}
@Override
public void removeQueueFromListener(String listenerId, String queueName) {
log.info("removing queue : " + queueName + " from listener : " + listenerId);
if (checkQueueExistOnListener(listenerId,queueName)) {
this.getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
log.info("deleting queue from rabbit management");
this.rabbitAdmin.deleteQueue(queueName);
} else {
log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
}
}
@Override
public Boolean checkQueueExistOnListener(String listenerId, String queueName) {
try {
log.info("checking queueName : " + queueName + " exist on listener id : " + listenerId);
log.info("getting queueNames");
String[] queueNames = this.getMessageListenerContainerById(listenerId).getQueueNames();
log.info("queueNames : " + new Gson().toJson(queueNames));
if (queueNames != null) {
log.info("checking " + queueName + " exist on active queues");
for (String name : queueNames) {
log.info("name : " + name + " with checking name : " + queueName);
if (name.equals(queueName)) {
log.info("queue name exist on listener, returning true");
return Boolean.TRUE;
}
}
return Boolean.FALSE;
} else {
log.info("there is no queue exist on listener");
return Boolean.FALSE;
}
} catch (Exception e) {
log.error("Error on checking queue exist on listener");
log.error("error message : " + ExceptionUtils.getMessage(e));
log.error("trace : " + ExceptionUtils.getStackTrace(e));
return Boolean.FALSE;
}
}
private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) {
log.info("getting message listener container by id : " + listenerId);
return ((AbstractMessageListenerContainer) this.rabbitListenerEndpointRegistry
.getListenerContainer(listenerId)
);
}
}
We can define a receiver for the queues we created at the runtime stage and complete our process.
@Service
@Log4j2
public class RabbitMqTestSetRunnerConsumerService {
@Autowired
private RegressionRunnerService regressionRunnerService;
@RabbitListener(id = "test-set-runner-exchange",queues = {"test-set-runner-queue"},concurrency = "2")
public void receiver(Long testSetId) {
log.info("received Message from rabbit : " + testSetId);
try {
this.regressionRunnerService.runRegressionTestSet(testSetId);
log.info("completed " + testSetId + " task");
} catch (Exception e) {
log.error("Error on running test set");
log.error("Error message : " + ExceptionUtils.getMessage(e));
log.error("Error trace : " + ExceptionUtils.getStackTrace(e));
}
}
}
That’s all it takes :)