博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ笔记之点对点队列(Point-to-Point)
阅读量:5255 次
发布时间:2019-06-14

本文共 7163 字,大约阅读时间需要 23 分钟。

1. 点对点通信

点对点是一种一对一通信方式,更像是有一个队列,一个人往队列里放消息,另一个人从队列中取消息,其最大的特点是一个消息只会被消费一次,即使有多个消费者同时消费,他们消费的也是不同的消息。

 

2. 简单实现

添加依赖

添加Maven依赖:

org.apache.activemq
activemq-all
5.15.2

 

activemq.properties

在resource下创建一个activemq.properties,用来保存activemq的用户名、密码、连接地址等信息:

username = xxxpasswd = xxxurl = tcp://xx.xx.xx.xx:61616

 

ActiveMqUtils

创建一个工具类,用来获取连接,因为工厂类一般都是比较重量级的类,不应该重复创建:

package org.cc11001100.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import java.io.IOException;import java.util.Properties;/** * @author: CC11001100 * @date: 2017/11/8 18:20 * @email: CC11001100@qq.com */public class ActiveMqUtils {    private static ConnectionFactory connectionFactory;    static{        try {            Properties properties = new Properties();            properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("activemq.properties"));            connectionFactory=new ActiveMQConnectionFactory(properties.getProperty("username"),                    properties.getProperty("passwd"),                    properties.getProperty("url"));        } catch (IOException e) {            e.printStackTrace();        }    }    /**     * 获取JMS连接     *     * @return JMS Connection     */    public static Connection getConnection(){        try {            return connectionFactory.createConnection();        } catch (JMSException e) {            e.printStackTrace();        }        return null;    }}

 

SenderUtils

创建发送消息的工具类:

package org.cc11001100.activemq;import javax.jms.*;import java.util.function.Function;/** * @author: CC11001100 * @date: 2017/11/8 18:12 * @email: CC11001100@qq.com */public class SenderUtils {    /**     * 向指定的队列发送消息     *     * @param queueName 发送到哪个队列     * @param generateMessage 使用这个方法产生要发送的消息     */    public static void send(String queueName, Function
generateMessage){ Connection conn=null; Session session=null; MessageProducer messageProducer=null; try { conn = ActiveMqUtils.getConnection(); assert conn != null; conn.start(); session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE); /*队列名是区分大小写的,如果不存在的话会自动创建一个*/ Queue queue=session.createQueue(queueName); messageProducer=session.createProducer(queue); /*设置非持久化,持久化的意思是要求发送的时候接收方要在线*/ messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); // 生成消息并发送 Message message = generateMessage.apply(session); messageProducer.send(message); /*在提交的时候消息才会真正的发出去*/ session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally{ if(messageProducer!=null){ try { messageProducer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session!=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(conn!=null){ try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }}

注意:在session.commit()之前消息是不会被发送出去的。

 

ReceiverUtils

创建接收消息的工具类:

package org.cc11001100.activemq;import javax.jms.*;import java.util.function.Function;/** * @author: CC11001100 * @date: 2017/11/8 18:37 * @email: CC11001100@qq.com */public class ReceiverUtils {    /**     * 从指定队列中接收一个消息     *     * @param queueName 队列名称     * @return 接收到的消息内容     */    public static Message receive(String queueName){        Connection conn=null;        Session session=null;        MessageConsumer messageConsumer=null;        try {            conn=ActiveMqUtils.getConnection();            assert conn != null;            conn.start();            session=conn.createSession(true,Session.AUTO_ACKNOWLEDGE);            Queue queue=session.createQueue(queueName);            messageConsumer=session.createConsumer(queue);            /*这是一个阻塞式的方法,在接收到消息之前会一直阻塞着*/            Message message=messageConsumer.receive();            session.commit();            return message;        } catch (JMSException e) {            e.printStackTrace();        }finally{            if(messageConsumer!=null){                try {                    messageConsumer.close();                } catch (JMSException e) {                    e.printStackTrace();                }            }            if(session!=null){                try {                    session.close();                } catch (JMSException e) {                    e.printStackTrace();                }            }            if(conn!=null){                try {                    conn.close();                } catch (JMSException e) {                    e.printStackTrace();                }            }        }        return null;    }    /**     * 从指定队列接收一个消息并将它传递给回调方法处理,返回处理后的结果     *     * @param queueName 队列名称     * @param callback 处理消息的回调方法     * @param 
处理消息后的返回值 * @return 处理消息后的返回值 */ public static
T receive(String queueName, Function
callback){ Message message = receive(queueName); assert message!=null; return callback.apply(message); }}

 

Main

创建测试类:

package org.cc11001100.activemq;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.TextMessage;import java.util.concurrent.TimeUnit;import java.util.function.Consumer;/** * @author: CC11001100 * @date: 2017/11/8 18:49 * @email: CC11001100@qq.com */public class Main {    public static void main(String[] args) {        final String QUEUE_NAME = "FOO_QUEUE";        // 生产者        new Thread(()->{            while(true){                try {                    TimeUnit.SECONDS.sleep(1);                } catch (InterruptedException e) {                    e.printStackTrace();                }                SenderUtils.send(QUEUE_NAME, session -> {                    try {                        return session.createTextMessage(Long.toString(System.currentTimeMillis()));                    } catch (JMSException e) {                        e.printStackTrace();                    }                    return null;                });            }        }).start();        // 消费者        new Thread(()->{            while(true){                ReceiverUtils.receive(QUEUE_NAME, message->{                    if(message instanceof TextMessage){                        try {                            TextMessage textMessage = (TextMessage) message;                            System.out.println(textMessage.getText());                        } catch (JMSException e) {                            e.printStackTrace();                        }                    }                    return message;                });            }        }).start();    }}

转载于:https://www.cnblogs.com/cc11001100/p/7805958.html

你可能感兴趣的文章
详解两种C#自动实现DLL(OCX)控件注册的方法
查看>>
浅谈echo、print、var_dump()、print_r()的区别
查看>>
jQuery 知识积累
查看>>
Sublime Text 3中文乱码问题的解决(最有效)
查看>>
如何将json格式的string字符串转换为string数组
查看>>
c++三/五法则
查看>>
强人工智能之“全本的鹦鹉”
查看>>
spring事物配置,声明式事务管理和基于@Transactional注解的使用(转)
查看>>
【BZOJ3215/3216】[ZJOI2013]话旧/话旧2(组合数学,动态规划)
查看>>
原生JS例子
查看>>
PHP+MYSQL实现分页
查看>>
python网络爬虫抓取动态网页并将数据存入数据库MySQL
查看>>
PHP 表单和用户输入
查看>>
Marissa Mayer 的职业建议..... how about yours?
查看>>
JavaScript- 获得TreeView CheckBox里选中项的值
查看>>
MSSQLSERVER数据库- 作业调度定时备份数据库
查看>>
动态列 pivot 到表变量
查看>>
PHP 相关软件下载
查看>>
[转]ASP.NET MVC中的两个Action之间值的传递--TempData
查看>>
整流桥
查看>>