消息队列有很多应用场景,包括异步,削峰等,以下是rabbitmq的安装和springboot简单的整合使用
安装erlang
yum install -y epel-release
wget https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_22.1-1~centos~7_amd64.rpm
yum install esl-erlang_22.1-1~centos~7_amd64.rpm
erl -version
安装Rabbitmq
3.6.5:
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
开启可视化
rabbitmq-plugins enable rabbitmq_management
远程访问
cd /etc/rabbitmq
touch rabbitmq.config
#写入并保存:
[{rabbit, [{loopback_users, []}]}].
3.8.6:
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.8.6/rabbitmq-server-3.8.6-1.noarch.rpm
yum localinstall rabbitmq-server-3.8.6-1.el7.noarch.rpm
开启可视化
rabbitmq-plugins enable rabbitmq_management
远程访问配置
cd /etc/rabbitmq
touch rabbitmq.conf
##写入并保存
loopback_users=none
重启服务:
systemctl restart rabbitmq-server
访问视图:
ip:15672 guest/guest
Springboot整合RabbitMq
1.pom引入jar
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
2.添加配置(开放端口,在rabbitmq新添加非guest的user)
spring.rabbitmq.host=xxx.xxx.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
3.配置Configuration
//定义Exchage和Queue,并绑定
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("myFanoutExchange", true, false);//不同的转换器return new DirectExchange/FanoutExchage/TopicExchange
}
@Bean
public Queue fanoutQueue() {
return new Queue("myFanoutQueueA", true);
}
@Bean
public Binding bindingFanout() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());//.with("routing")
}
4.Producer发送
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("send")
@ResponseBody
public Result sendByDirect(@RequestParam String message) {
rabbitTemplate.convertAndSend("myDirectExchange", "routing", message,new CorrelationData(UUID.randomUUID().toString()));
return Result.success();
}
5.Consumer接收
@RabbitHandler
@RabbitListener(queues = "myDirectQueue")
public void processDirect(String msg) {
log.info("rabbitmq消息接收:"+msg);
}
6.消息的可靠投递(配置回调)
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
⚫ confirm 确认模式
⚫ return 退回模式
rabbitmq 整个消息投递的路径为:
producer—>rabbitmq broker—>exchange—>queue—>consumer
⚫ 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
⚫ 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
消息的可靠投递小结
➢ 设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。
➢ 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
➢ 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。
➢ 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
a.配置文件:
#新版本使用这个即可
#spring.rabbitmq.publisher-confirm-type=correlated
#老版本的话用下面这个即可 开启确认模式
spring.rabbitmq.publisher-confirms=true
#开启return
spring.rabbitmq.publisher-returns=true
b.producer添加Bean
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 开启Mandatory, 才能触发回调函数,无论消息推送结果如何都强制调用回调函数
rabbitTemplate.setMandatory(true);
//Broker能否接收
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("是否成功:" + ack);
log.info("相关数据:" + correlationData);
log.info("原因:" + cause);
}
});
//找不到路由等情况
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息:" + message);
log.info("回应码:" + replyCode);
log.info("回应信息:" + replyText);
log.info("交换机:" + exchange);
log.info("路由键:" + routingKey);
}
});
return rabbitTemplate;
}
c.consumer配置listener
ackReceiver
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 消息的唯一性ID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String msg = message.toString();
System.out.println("消息: " + msg);
System.out.println("消息来自: "+message.getMessageProperties().getConsumerQueue());
// 手动确认
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 拒绝策略
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
}
配置Listener
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private MyAckReceiver myAckReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
// 监听队列名
container.setQueueNames("myDirectQueue","myFanoutQueueA","myFanoutQueueB","myTopicQueueA","myTopicQueueB");
// 当前消费者数量
container.setConcurrentConsumers(1);
// 最大消费者数量
container.setMaxConcurrentConsumers(1);
// 手动确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置监听器
container.setMessageListener(myAckReceiver);
return container;
}
}
注意:本文归作者所有,未经作者允许,不得转载