博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅
阅读量:4149 次
发布时间:2019-05-25

本文共 5743 字,大约阅读时间需要 19 分钟。

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

 1.添加SpringBoot集成ActiveMQ所需依赖 

org.springframework.boot
spring-boot-starter-activemq

2.配置application.properties文件

## URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`# failover:(tcp://localhost:61616,tcp://localhost:61617)# tcp://localhost:61616spring.activemq.broker-url=tcp://localhost:61616spring.activemq.in-memory=true#如果此处设置为true,需要加如下的依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败spring.activemq.pool.enabled=false#默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置#spring.jms.pub-sub-domain=true

3.在启动类中使用同步、异步消息队列

加入@EnableJms注解就是异步,没有加 @EnableJms注解则默认是同步。

@EnableJms@SpringBootApplicationpublic class QuartzsApplication {	public static void main(String[] args) {		SpringApplication.run(QuartzsApplication.class, args);	}}

4.点对点模式和发布订阅模式

  • 点对点模式:生产者发送一条消息到queue,只有一个消费者能收到。
  • 发布订阅模式:发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。

在JMS中,TOPIC实现了分发和订阅,当你分发一个消息,所有订阅这个消息的服务都能得到这个服务,所以从0到许多个订阅者都能得到一个消息的拷贝,只有在消息代理收到消息时有一个有效订阅时的订阅者才能得到这个消息的拷贝。

JMS Queue实现了负载均衡,一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者,一个queue可以有很多消费者,他们之间实现了负载均衡,所以Queue实现了一个可靠的JMS负载均衡。

5.Producer消息生产者

package com.primeton.quartzs.activeMQ;import javax.jms.Destination;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.annotation.JmsListener;import org.springframework.jms.core.JmsMessagingTemplate;import org.springframework.stereotype.Service;import java.util.ArrayList;@Service("producer")public class Producer {    @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装    private JmsMessagingTemplate jmsTemplate;    // 发送消息,destination是发送到的队列,message是待发送的消息    public void sendMessage(Destination destination, final String message){        jmsTemplate.convertAndSend(destination, message);    }    @JmsListener(destination="out.queue")//实现双向队列    public void consumerMessage(String text){        System.out.println("从out.queue队列收到的回复报文为:"+text);    }}

6.Consumer两个消费者

package com.primeton.quartzs.activeMQ;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;@Componentpublic class Consumer {    // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息    @JmsListener(destination = "mytest.queue")    public void receiveQueue(String text) {        System.out.println("Consumer收到的报文为:"+text);    }}
package com.primeton.quartzs.activeMQ;import org.springframework.jms.annotation.JmsListener;import org.springframework.messaging.handler.annotation.SendTo;import org.springframework.stereotype.Component;@Componentpublic class Consumer2 {    @JmsListener(destination = "mytest.queue")    @SendTo("out.queue")    public String receiveQueue(String text) {        System.out.println("Consumer2收到的报文为:"+text);        return "return message"+text;    }}

7.实现消息队列和双向队列

在生产者上加入out.queue

@JmsListener(destination="out.queue")//实现双向队列    public void consumerMessage(String text){        System.out.println("从out.queue队列收到的回复报文为:"+text);    }

消费者注解@SendTo("out.queue")

@JmsListener(destination = "mytest.queue")    @SendTo("out.queue")    public String receiveQueue(String text) {        System.out.println("Consumer2收到的报文为:"+text);        return "return message"+text;}

8.实现发布订阅

在消费者的注解@JmsListener加上containerFactory

// 使用JmsListener配置消费者监听的队列,其中text是接收到的消息@JmsListener(destination = "mytest.queue", containerFactory = "jmsListenerContainerQueue")public void receiveQueue(String text) {    System.out.println("Consumer收到的报文为:"+text);}@JmsListener(destination = "mytest.topic", containerFactory = "jmsListenerContainerTopic")public void testTopicCusumer(String test){    System.out.println(test);}

9.编写测试类

package com.primeton.quartzs;import com.primeton.quartzs.activeMQ.Producer;import org.apache.activemq.command.ActiveMQQueue;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import javax.jms.Destination;@RunWith(SpringRunner.class)@SpringBootTestpublic class QuartzsApplicationTests {	@Autowired	private Producer producer;	@Test	public void contextLoads() {		Destination destination = new ActiveMQQueue("mytest.queue");		for(int i=0; i<10; i++){			producer.sendMessage(destination, "myname is chhliu!!!");		}	}}

10.查看测试结果

Consumer2收到的报文为:myname is chhliu!!!从out.queue队列收到的回复报文为:return messagemyname is chhliu!!!Consumer收到的报文为:myname is chhliu!!!Consumer2收到的报文为:myname is chhliu!!!从out.queue队列收到的回复报文为:return messagemyname is chhliu!!!Consumer收到的报文为:myname is chhliu!!!Consumer2收到的报文为:myname is chhliu!!!从out.queue队列收到的回复报文为:return messagemyname is chhliu!!!Consumer收到的报文为:myname is chhliu!!!Consumer2收到的报文为:myname is chhliu!!!Consumer收到的报文为:myname is chhliu!!!从out.queue队列收到的回复报文为:return messagemyname is chhliu!!!Consumer2收到的报文为:myname is chhliu!!!从out.queue队列收到的回复报文为:return messagemyname is chhliu!!!Consumer收到的报文为:myname is chhliu!!!

11.在application.properties下配置activeMQ时需要注意地方

## URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`# failover:(tcp://localhost:61616,tcp://localhost:61617)# tcp://localhost:61616#spring.activemq.broker-url=tcp://localhost:61616#true时用内置activeMQ,否则用自己本机安装的activeMQspring.activemq.in-memory=true#如果此处设置为true,需要加如下的依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败spring.activemq.pool.enabled=false#默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置#spring.jms.pub-sub-domain=true

如果大家有什么问题,可以在下方留言,想要源码的可以到我的资源库下载:

 

---------------------------------------------------------一个人的态度,决定他的高度。---------------------------------------------------------

转载地址:http://nypti.baihongyu.com/

你可能感兴趣的文章
【Python基础1】变量和字符串定义
查看>>
【Python基础2】python字符串方法及格式设置
查看>>
【Python】random生成随机数
查看>>
【Python基础3】数字类型与常用运算
查看>>
Jenkins迁移jobs
查看>>
【Python基础4】for循环、while循环与if分支
查看>>
【Python基础5】列表和元组
查看>>
【Python基础6】格式化字符串
查看>>
【Python基础7】字典
查看>>
【Python基础8】函数参数
查看>>
【Python基础9】浅谈深浅拷贝及变量赋值
查看>>
Jenkins定制一个具有筛选功能的列表视图
查看>>
【Python基础10】探索模块
查看>>
【Python】将txt文件转换为html
查看>>
[Linux]Shell脚本实现按照模块信息拆分文件内容
查看>>
idea添加gradle模块报错The project is already registered
查看>>
在C++中如何实现模板函数的外部调用
查看>>
在C++中,关键字explicit有什么作用
查看>>
C++中异常的处理方法以及使用了哪些关键字
查看>>
如何定义和实现一个类的成员函数为回调函数
查看>>