img

Rabbitmq安装及配置使用

旁观者 1年前 ⋅ 1021 阅读

消息队列有很多应用场景,包括异步,削峰等,以下是rabbitmq的安装和springboot简单的整合使用

安装erlang

yum install -y epel-release
wget https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_22.1-1~centos~7_amd64.rpm
yum install esl-erlang_22.1-1~centos~7_amd64.rpm
erl -version

安装Rabbitmq


3.6.5:

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

开启可视化

rabbitmq-plugins enable rabbitmq_management

远程访问

cd /etc/rabbitmq

touch rabbitmq.config

#写入并保存:

[{rabbit, [{loopback_users, []}]}].


3.8.6:

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.8.6/rabbitmq-server-3.8.6-1.noarch.rpm

yum localinstall rabbitmq-server-3.8.6-1.el7.noarch.rpm

开启可视化

rabbitmq-plugins enable rabbitmq_management

远程访问配置

cd /etc/rabbitmq

touch rabbitmq.conf

##写入并保存

loopback_users=none


重启服务:

systemctl restart rabbitmq-server

访问视图:

ip:15672 guest/guest

Springboot整合RabbitMq

1.pom引入jar

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

2.添加配置(开放端口,在rabbitmq新添加非guest的user)

spring.rabbitmq.host=xxx.xxx.xxx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

3.配置Configuration

//定义Exchage和Queue,并绑定
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("myFanoutExchange", true, false);//不同的转换器return new DirectExchange/FanoutExchage/TopicExchange 
}

@Bean
public Queue fanoutQueue() {
    return new Queue("myFanoutQueueA", true);
}

@Bean
public Binding bindingFanout() {
    return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());//.with("routing")
}

4.Producer发送

@Autowired
private RabbitTemplate rabbitTemplate;

@RequestMapping("send")
@ResponseBody
public Result sendByDirect(@RequestParam String message) {
    rabbitTemplate.convertAndSend("myDirectExchange", "routing", message,new CorrelationData(UUID.randomUUID().toString()));
    return Result.success();
}

5.Consumer接收

@RabbitHandler
@RabbitListener(queues = "myDirectQueue")
public void processDirect(String msg) {
    log.info("rabbitmq消息接收:"+msg);
}

6.消息的可靠投递(配置回调)

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

⚫ confirm 确认模式

⚫ return 退回模式

rabbitmq 整个消息投递的路径为:

producer—>rabbitmq broker—>exchange—>queue—>consumer

⚫ 消息从 producer 到 exchange 则会返回一个 confirmCallback 。

⚫ 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

消息的可靠投递小结

➢ 设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。

➢ 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

➢ 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。

➢ 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。


a.配置文件:

#新版本使用这个即可

#spring.rabbitmq.publisher-confirm-type=correlated

#老版本的话用下面这个即可 开启确认模式

spring.rabbitmq.publisher-confirms=true

#开启return

spring.rabbitmq.publisher-returns=true


b.producer添加Bean

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);

    // 开启Mandatory, 才能触发回调函数,无论消息推送结果如何都强制调用回调函数
    rabbitTemplate.setMandatory(true);
	//Broker能否接收
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("是否成功:" + ack);
            log.info("相关数据:" + correlationData);
            log.info("原因:" + cause);
        }
    });

	//找不到路由等情况
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("消息:" + message);
            log.info("回应码:" + replyCode);
            log.info("回应信息:" + replyText);
            log.info("交换机:" + exchange);
            log.info("路由键:" + routingKey);
        }
    });
    return rabbitTemplate;
}

c.consumer配置listener

ackReceiver

@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
	@Override
	public void onMessage(Message message, Channel channel) throws Exception {
		// 消息的唯一性ID
		long deliveryTag = message.getMessageProperties().getDeliveryTag();

		try {
			String msg = message.toString();
			System.out.println("消息: " + msg);
			System.out.println("消息来自: "+message.getMessageProperties().getConsumerQueue());
			// 手动确认
			channel.basicAck(deliveryTag, true);
		} catch (Exception e) {
			// 拒绝策略
			channel.basicReject(deliveryTag, false);
			e.printStackTrace();
		}
	}
}

配置Listener

@Configuration
public class MessageListenerConfig {

	@Autowired
	private CachingConnectionFactory cachingConnectionFactory;

	@Autowired
	private MyAckReceiver myAckReceiver;

	@Bean
	public SimpleMessageListenerContainer simpleMessageListenerContainer() {
		SimpleMessageListenerContainer container = new  SimpleMessageListenerContainer(cachingConnectionFactory);

		// 监听队列名
		container.setQueueNames("myDirectQueue","myFanoutQueueA","myFanoutQueueB","myTopicQueueA","myTopicQueueB");
		// 当前消费者数量
		container.setConcurrentConsumers(1);
		// 最大消费者数量
		container.setMaxConcurrentConsumers(1);
		// 手动确认
		container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		// 设置监听器
		container.setMessageListener(myAckReceiver);
		return container;
	}
}

全部评论: 0

    我有话说: