1. 点对点通信
点对点是一种一对一通信方式,更像是有一个队列,一个人往队列里放消息,另一个人从队列中取消息,其最大的特点是一个消息只会被消费一次,即使有多个消费者同时消费,他们消费的也是不同的消息。
2. 简单实现
添加依赖
添加Maven依赖:
org.apache.activemq activemq-all 5.15.2
activemq.properties
在resource下创建一个activemq.properties,用来保存activemq的用户名、密码、连接地址等信息:
username = xxxpasswd = xxxurl = tcp://xx.xx.xx.xx:61616
ActiveMqUtils
创建一个工具类,用来获取连接,因为工厂类一般都是比较重量级的类,不应该重复创建:
package org.cc11001100.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import java.io.IOException;import java.util.Properties;/** * @author: CC11001100 * @date: 2017/11/8 18:20 * @email: CC11001100@qq.com */public class ActiveMqUtils { private static ConnectionFactory connectionFactory; static{ try { Properties properties = new Properties(); properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("activemq.properties")); connectionFactory=new ActiveMQConnectionFactory(properties.getProperty("username"), properties.getProperty("passwd"), properties.getProperty("url")); } catch (IOException e) { e.printStackTrace(); } } /** * 获取JMS连接 * * @return JMS Connection */ public static Connection getConnection(){ try { return connectionFactory.createConnection(); } catch (JMSException e) { e.printStackTrace(); } return null; }}
SenderUtils
创建发送消息的工具类:
package org.cc11001100.activemq;import javax.jms.*;import java.util.function.Function;/** * @author: CC11001100 * @date: 2017/11/8 18:12 * @email: CC11001100@qq.com */public class SenderUtils { /** * 向指定的队列发送消息 * * @param queueName 发送到哪个队列 * @param generateMessage 使用这个方法产生要发送的消息 */ public static void send(String queueName, FunctiongenerateMessage){ Connection conn=null; Session session=null; MessageProducer messageProducer=null; try { conn = ActiveMqUtils.getConnection(); assert conn != null; conn.start(); session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE); /*队列名是区分大小写的,如果不存在的话会自动创建一个*/ Queue queue=session.createQueue(queueName); messageProducer=session.createProducer(queue); /*设置非持久化,持久化的意思是要求发送的时候接收方要在线*/ messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); // 生成消息并发送 Message message = generateMessage.apply(session); messageProducer.send(message); /*在提交的时候消息才会真正的发出去*/ session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally{ if(messageProducer!=null){ try { messageProducer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session!=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(conn!=null){ try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }}
注意:在session.commit()之前消息是不会被发送出去的。
ReceiverUtils
创建接收消息的工具类:
package org.cc11001100.activemq;import javax.jms.*;import java.util.function.Function;/** * @author: CC11001100 * @date: 2017/11/8 18:37 * @email: CC11001100@qq.com */public class ReceiverUtils { /** * 从指定队列中接收一个消息 * * @param queueName 队列名称 * @return 接收到的消息内容 */ public static Message receive(String queueName){ Connection conn=null; Session session=null; MessageConsumer messageConsumer=null; try { conn=ActiveMqUtils.getConnection(); assert conn != null; conn.start(); session=conn.createSession(true,Session.AUTO_ACKNOWLEDGE); Queue queue=session.createQueue(queueName); messageConsumer=session.createConsumer(queue); /*这是一个阻塞式的方法,在接收到消息之前会一直阻塞着*/ Message message=messageConsumer.receive(); session.commit(); return message; } catch (JMSException e) { e.printStackTrace(); }finally{ if(messageConsumer!=null){ try { messageConsumer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session!=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(conn!=null){ try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } return null; } /** * 从指定队列接收一个消息并将它传递给回调方法处理,返回处理后的结果 * * @param queueName 队列名称 * @param callback 处理消息的回调方法 * @param处理消息后的返回值 * @return 处理消息后的返回值 */ public static T receive(String queueName, Function callback){ Message message = receive(queueName); assert message!=null; return callback.apply(message); }}
Main
创建测试类:
package org.cc11001100.activemq;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.TextMessage;import java.util.concurrent.TimeUnit;import java.util.function.Consumer;/** * @author: CC11001100 * @date: 2017/11/8 18:49 * @email: CC11001100@qq.com */public class Main { public static void main(String[] args) { final String QUEUE_NAME = "FOO_QUEUE"; // 生产者 new Thread(()->{ while(true){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } SenderUtils.send(QUEUE_NAME, session -> { try { return session.createTextMessage(Long.toString(System.currentTimeMillis())); } catch (JMSException e) { e.printStackTrace(); } return null; }); } }).start(); // 消费者 new Thread(()->{ while(true){ ReceiverUtils.receive(QUEUE_NAME, message->{ if(message instanceof TextMessage){ try { TextMessage textMessage = (TextMessage) message; System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } return message; }); } }).start(); }}