EN
Spring Boot 2 - broadcast messages to each application instance using RabbitMQ
3
points
In this article, we would like to show you how to create Spring Boot 2 application that broadcasts messages to each application instance that uses RabbitMQ.
Spring Boot 2 Applications
In this section, we can find a simplified solution that realizes broadcasting.
Hint: the article extends project in this article.
Producer
ProducerConfig.java
file:
package com.example.demo;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ProducerConfig {
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange("my-exchange");
}
}
ProducerController.java
file:
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class ProducerController {
@Autowired
private ProducerService producerService;
@RequestMapping("/test")
@ResponseBody
public void test() {
this.producerService.broadcastMessage("Testing message...");
}
}
ProducerService.java
file:
package com.example.demo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProducerService {
@Autowired
private RabbitTemplate template;
public void broadcastMessage(String message) {
this.template.convertAndSend("my-exchange", "", message); // broadcasts string message to each my-queue-* via my-exchange
}
}
Consumer
ConsumerConfig.java
file:
package com.example.demo;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.UUID;
@Configuration
@EnableRabbit
public class ConsumerConfig {
@Bean
public String instanceId() {
return "id-" + UUID.randomUUID();
}
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange("my-exchange");
}
@Bean
public Queue instanceQueue(String instanceId) {
return new Queue("my-queue-" + instanceId, true);
}
@Bean
public Binding instanceBinding(Queue instanceQueue, FanoutExchange broadcastExchange) {
return BindingBuilder.bind(instanceQueue).to(broadcastExchange);
}
@Bean
public SimpleMessageListenerContainer container(String instanceId, ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("my-queue-" + instanceId);
container.setMessageListener(listenerAdapter);
return container;
}
}
ConsumerService.java
file:
package com.example.demo;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
@Service
public class ConsumerService {
@Bean
public MessageListenerAdapter listenerAdapter(ConsumerService consumerService) {
return new MessageListenerAdapter(consumerService, "handleMessage");
}
public void handleMessage(String message) {
System.out.println("Message: " + message);
}
}