博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ 入门使用p2p模型-主动消费
阅读量:2385 次
发布时间:2019-05-10

本文共 5312 字,大约阅读时间需要 17 分钟。

生产者

package clc.active;import com.sun.xml.internal.bind.v2.runtime.unmarshaller.XmlVisitor;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageProducer;import javax.jms.Session;/** * ClassName: TestProducer
* Description: 发送一个字符串文本到ActiveMQ
* date: 2019/1/15 10:16 AM
* * @author chengluchao * @since JDK 1.8 */public class TestProducer { /** * 发送消息到ActiveMQ中,具体的消息内容为参数信息 * 开发JMS相关代码过程中,使用的接口类型都是javax.jms包下的类型. * * @param datas - 消息内容. */ public void sendTextMessage(String datas) { //连接工厂 ConnectionFactory factory = null; //连接 Connection connection = null; //目的地 Destination destination = null; //会话 Session session = null; //消息发送者 MessageProducer producer = null; //消息对象 Message message = null; try { //创建连接工厂,连接ActiveMq服务的连接工厂 //创建工厂,构造方法有三个参数,分别是用户名,密码,连接地址 //无参构造,有默认的连接地址,本地连接。localhost // 单参数构造,无验证模式的,没有用户的认证, // 三个参数的构造,有认证+指定地址。默认端口是61616 factory = new ActiveMQConnectionFactory("guest", "guest", "tcp://2.2.2.4:61616"); //通过工厂,创建连接对象 //创建连接的方法有重载,其中有createConnection(String username, String password); //可以再创建爱你连接工厂时,只传递连接地址,不传递用户信息。 connection = factory.createConnection(); //建议启动连接,消息的发送者不是必须启动连接。消息的消费者必须启动连接。 //producer在发送消息的时候,会检查是否启动了连接,如果未启动,自动启动。 //如果有特殊的配置,建议配置完毕后再启动连接 connection.start(); //通过连接对象,创建会话对象。 /* *创建会话的时候,必须传递两个参数,分别代表是否支持事物和如何确认消息处理。 * transacted - 是否支持事物,数据类型是boolean。true-支持,false-不支持 * true - 支持事物,第二个参数对producer默认无效。建议传递的数据是Session.SESSION_TEANSACTED * false - 不支持事物,常用参数。第二个参数必须传递,且必须有效。 * * acknowledgeMode - 如何确认消息的处理。使用确认机制实现的。 * AUTO_ACKNOWLEDGE - 自动确认消息。消息的消费者处理后,自动确认。常用.商业开发不推荐。 * CLIENT_ACKNOWLEDGE - 客户端手动确认,消息的消费者处理后,必须手工确认。 * DUPS_OK_ACKNOWLEDGE - 有副本的客户端手动确认。 * 一个消息可以多次处理。 * 可以略低Session的消耗,在可以容忍重复消息时使用(不推荐使用) */ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //创建目的地, destination = session.createQueue("first-mq"); //通过会话对象,创建爱你消息的发送者producer //创建的消息发送者,发送的消息一定到指定的目的地中。 //创建producer的时候可以不指定目的地,在发送消息的时候指定目的地。 producer = session.createProducer(destination); //创建文本消息对象,作为具体数据内容的载体。 message = session.createTextMessage(datas); //使用producer发送消息到ActiveMQ的目的地,如果消息发送失败,抛出异常 producer.send(message); System.out.println("消息已发送"); } catch (Exception e) { e.printStackTrace(); } finally { // 回收资源 //消息发送者 if (producer != null) { try { producer.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } //会话对象 if (session != null) { try { session.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } //连接对象 if (connection != null) { try { connection.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } } } public static void main(String[] args) { TestProducer producer = new TestProducer(); producer.sendTextMessage("clccc"); }}

消费者:

package clc.active;import org.apache.activemq.ActiveMQConnectionFactory;import org.testng.annotations.Test;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Queue;import javax.jms.Session;import javax.jms.TextMessage;/** * ClassName: TestConsumer
* Description: 消费消息
* date: 2019/1/15 11:21 AM
* * @author chengluchao * @since JDK 1.8 */public class TestConsumer { /** * 消费消息 * * @throws Exception */ @Test public void TestMQConsumerQueue() throws Exception { //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://2.2.2.4:61616"); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //指定目的地获取消息 Destination destination = session.createQueue("first-mq"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(destination); //获取队列中的消息,receive方法是一个主动获取消息的订单,执行一次,拉取一个消息 Message message = consumer.receive(); System.out.println(((TextMessage) message).getText()); //、关闭资源 consumer.close(); session.close(); connection.close(); }}

 

posted @
2019-01-15 15:18 阅读(
...) 评论(
...)

转载地址:http://cgcab.baihongyu.com/

你可能感兴趣的文章
openstack之 glance_image和instances存储目录解析
查看>>
centos7(三节点)搭建ceph环境
查看>>
将linux(ubuntu)安装到U盘下面--便携式ubuntu和使用dd制作U盘安装工具
查看>>
linux之强大的find命令
查看>>
python使用变量操作mysql语句
查看>>
linux bridge 网桥详解
查看>>
ceph&openstack发展前景
查看>>
Mysql之主键、外键和各种索引
查看>>
ceph&云计算
查看>>
python main()函数 name == ‘main’:
查看>>
flask一个基本的http响应流程
查看>>
linux常见的文件及目录操作12个命令
查看>>
挂载ceph的rbd块存储作为本地磁盘块
查看>>
ceph的块设备的两种使用方式及代码示例
查看>>
查看python中模块的所有方法
查看>>
ceph对象存储的配置与S3、swift接口的使用
查看>>
python通过librados库通过底层的rados操作ceph的对象存储和块存储
查看>>
在客户端使用python来调用boto S3 API来操作librados库
查看>>
ceph存储数据的详细流程(CRUSH)
查看>>
linux内核模块详解
查看>>