本文共 5743 字,大约阅读时间需要 19 分钟。
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
org.springframework.boot spring-boot-starter-activemq
## URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`# failover:(tcp://localhost:61616,tcp://localhost:61617)# tcp://localhost:61616spring.activemq.broker-url=tcp://localhost:61616spring.activemq.in-memory=true#如果此处设置为true,需要加如下的依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败spring.activemq.pool.enabled=false#默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置#spring.jms.pub-sub-domain=true
加入@EnableJms注解就是异步,没有加 @EnableJms注解则默认是同步。
@EnableJms@SpringBootApplicationpublic class QuartzsApplication { public static void main(String[] args) { SpringApplication.run(QuartzsApplication.class, args); }}
在JMS中,TOPIC实现了分发和订阅,当你分发一个消息,所有订阅这个消息的服务都能得到这个服务,所以从0到许多个订阅者都能得到一个消息的拷贝,只有在消息代理收到消息时有一个有效订阅时的订阅者才能得到这个消息的拷贝。
JMS Queue实现了负载均衡,一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者,一个queue可以有很多消费者,他们之间实现了负载均衡,所以Queue实现了一个可靠的JMS负载均衡。
package com.primeton.quartzs.activeMQ;import javax.jms.Destination;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.annotation.JmsListener;import org.springframework.jms.core.JmsMessagingTemplate;import org.springframework.stereotype.Service;import java.util.ArrayList;@Service("producer")public class Producer { @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装 private JmsMessagingTemplate jmsTemplate; // 发送消息,destination是发送到的队列,message是待发送的消息 public void sendMessage(Destination destination, final String message){ jmsTemplate.convertAndSend(destination, message); } @JmsListener(destination="out.queue")//实现双向队列 public void consumerMessage(String text){ System.out.println("从out.queue队列收到的回复报文为:"+text); }}
package com.primeton.quartzs.activeMQ;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;@Componentpublic class Consumer { // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息 @JmsListener(destination = "mytest.queue") public void receiveQueue(String text) { System.out.println("Consumer收到的报文为:"+text); }}
package com.primeton.quartzs.activeMQ;import org.springframework.jms.annotation.JmsListener;import org.springframework.messaging.handler.annotation.SendTo;import org.springframework.stereotype.Component;@Componentpublic class Consumer2 { @JmsListener(destination = "mytest.queue") @SendTo("out.queue") public String receiveQueue(String text) { System.out.println("Consumer2收到的报文为:"+text); return "return message"+text; }}
在生产者上加入out.queue
@JmsListener(destination="out.queue")//实现双向队列 public void consumerMessage(String text){ System.out.println("从out.queue队列收到的回复报文为:"+text); }
消费者注解@SendTo("out.queue")
@JmsListener(destination = "mytest.queue") @SendTo("out.queue") public String receiveQueue(String text) { System.out.println("Consumer2收到的报文为:"+text); return "return message"+text;}
在消费者的注解@JmsListener加上containerFactory
// 使用JmsListener配置消费者监听的队列,其中text是接收到的消息@JmsListener(destination = "mytest.queue", containerFactory = "jmsListenerContainerQueue")public void receiveQueue(String text) { System.out.println("Consumer收到的报文为:"+text);}@JmsListener(destination = "mytest.topic", containerFactory = "jmsListenerContainerTopic")public void testTopicCusumer(String test){ System.out.println(test);}
package com.primeton.quartzs;import com.primeton.quartzs.activeMQ.Producer;import org.apache.activemq.command.ActiveMQQueue;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import javax.jms.Destination;@RunWith(SpringRunner.class)@SpringBootTestpublic class QuartzsApplicationTests { @Autowired private Producer producer; @Test public void contextLoads() { Destination destination = new ActiveMQQueue("mytest.queue"); for(int i=0; i<10; i++){ producer.sendMessage(destination, "myname is chhliu!!!"); } }}
Consumer2收到的报文为:myname is chhliu!!!从out.queue队列收到的回复报文为:return messagemyname is chhliu!!!Consumer收到的报文为:myname is chhliu!!!Consumer2收到的报文为:myname is chhliu!!!从out.queue队列收到的回复报文为:return messagemyname is chhliu!!!Consumer收到的报文为:myname is chhliu!!!Consumer2收到的报文为:myname is chhliu!!!从out.queue队列收到的回复报文为:return messagemyname is chhliu!!!Consumer收到的报文为:myname is chhliu!!!Consumer2收到的报文为:myname is chhliu!!!Consumer收到的报文为:myname is chhliu!!!从out.queue队列收到的回复报文为:return messagemyname is chhliu!!!Consumer2收到的报文为:myname is chhliu!!!从out.queue队列收到的回复报文为:return messagemyname is chhliu!!!Consumer收到的报文为:myname is chhliu!!!
## URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`# failover:(tcp://localhost:61616,tcp://localhost:61617)# tcp://localhost:61616#spring.activemq.broker-url=tcp://localhost:61616#true时用内置activeMQ,否则用自己本机安装的activeMQspring.activemq.in-memory=true#如果此处设置为true,需要加如下的依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败spring.activemq.pool.enabled=false#默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置#spring.jms.pub-sub-domain=true
如果大家有什么问题,可以在下方留言,想要源码的可以到我的资源库下载:
---------------------------------------------------------一个人的态度,决定他的高度。---------------------------------------------------------
转载地址:http://nypti.baihongyu.com/