序列方式(点到点方式,P2P)的特性:

1.受托人包含经营者和顾客;

2.序列中的信息只有由一个使用人应用;

3.顾客能够随时随地消費序列中的信息;

activemq消息持久化方式-activemq和kafka区别-第1张图片序列方式和主题模式的差别:

1.提早定阅,序列方式:顾客不用提早定阅就可以消費信息;主题模式:仅有提早定阅的顾客才可以取得成功消費信息;

2.好几个顾客派发信息:序列方式:信息只有均值消費,别的顾客消費的信息不可以被别的顾客反复消費;主题模式:每一个定阅者还可以在主题模式下消費每条信息;

实例编码:

电影制片人:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQProducer { public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { //建立联接加工厂 ,,依照定的url地址给出默认设置的登录名和登陆密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL); //根据联接加工厂获得connection联接 并运行浏览 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //建立对话session 必须2个主要参数,第一个事务管理,第二个查收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立到达站(挑选是序列或是主题风格) Queue queue = session.createQueue(QUEUE_NAME); //建立信息的经营者 MessageProducer messageProducer = session.createProducer(queue); //根据应用信息经营者messageProducer生产制造3条信息发送至序列中 for (int i = 1; i " i); //根据messageProducer 公布信息 messageProducer.send(textMessage); } //关掉資源 messageProducer.close(); session.close(); connection.close(); System.out.println("信息发送至MQ取得成功"); }}

顾客1:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQConsumer { public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)"; public static final String QUEUE_NAME="queue01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL); //根据联接加工厂获得connection联接 并运行浏览 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //建立对话session 必须2个主要参数,第一个事务管理,第二个查收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立到达站(挑选是序列或是主题风格) Queue queue = session.createQueue(QUEUE_NAME); //建立信息的顾客 MessageConsumer messageConsumer = session.createConsumer(queue); while (true){ //从序列中获得信息 receive未设定较大時间 是堵塞的, TextMessage textMessage = (TextMessage) messageConsumer.receive(); if (textMessage !=null){ System.out.println("顾客接收到信息---->" textMessage.getText()); }else { break; } } messageConsumer.close(); session.close(); connection.close(); }}

輸出:

INFO | Successfully connected to tcp://192.168.1.17:61616顾客接收到信息---->msg---->2顾客接收到信息---->msg---->4顾客接收到信息---->msg---->6

顾客2:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import java.io.IOException;public class ActiveMQConsumerListener { public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException, IOException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL); //根据联接加工厂获得connection联接 并运行浏览 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //建立对话session 必须2个主要参数,第一个事务管理,第二个查收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立到达站(挑选是序列或是主题风格) Queue queue = session.createQueue(QUEUE_NAME); //建立信息的顾客 MessageConsumer messageConsumer = session.createConsumer(queue); //根据监视的体制消費信息 messageConsumer.setMessageListener((message) -> { if (message != null && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("顾客接收到信息---->" textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //不关掉控制面板 假如不用这样的话,在下面很有可能在联接的情况下会关掉了,导致没法消費的难题 System.in.read(); messageConsumer.close(); session.close(); connection.close(); }}

輸出:

INFO | Successfully connected to tcp://192.168.1.17:61616顾客接收到信息---->msg---->1顾客接收到信息---->msg---->3顾客接收到信息---->msg---->5顾客接收到信息---->msg---->7activemq消息持久化方式-activemq和kafka区别-第2张图片顾客总数:表明顾客总数;

挂起来信息数:等候耗费的信息数,即当今未排长队的信息数;

信息入队:信息进到序列;(此数据只能提升或降低,重新启动后会清零);

信息摘帽:序列外的信息能够解释为顾客耗费的总数(重新启动后会消除);

持续性实例编码:

ActiveMQ分布式锁就是指经营者转化成的数据信息在被顾客消費以前储存到数据库查询中,随后被顾客消費以后从数据库查询中删掉。

电影制片人:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQProducer { public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)"; public static final String QUEUE_NAME = "queue02"; public static void main(String[] args) throws JMSException { //建立联接加工厂 ,,依照定的url地址给出默认设置的登录名和登陆密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL); //根据联接加工厂获得connection联接 并运行浏览 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //建立对话session 必须2个主要参数,第一个事务管理,第二个查收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立到达站(挑选是序列或是主题风格) Queue queue = session.createQueue(QUEUE_NAME); //建立信息的经营者 MessageProducer messageProducer = session.createProducer(queue); // 信息分布式锁 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); //根据应用信息经营者messageProducer生产制造3条信息发送至序列中 for (int i = 1; i " i); //根据messageProducer 公布信息 messageProducer.send(textMessage); } //关掉資源 messageProducer.close(); session.close(); connection.close(); System.out.println("信息发送至MQ取得成功"); }}

编码:message producer . setdelivery mode(delivery mode . persistent);

顾客:

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ActiveMQConsumer { public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)"; public static final String QUEUE_NAME="queue02"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL); //根据联接加工厂获得connection联接 并运行浏览 Connection connection = activeMQConnectionFactory.createConnection(); connection.setClientID("client-queue02-01"); connection.start(); //建立对话session 必须2个主要参数,第一个事务管理,第二个查收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立到达站(挑选是序列或是主题风格) Queue queue = session.createQueue(QUEUE_NAME); //建立信息的顾客 MessageConsumer messageConsumer = session.createConsumer(queue); while (true){ //从序列中获得信息 receive未设定较大時间 是堵塞的, TextMessage textMessage = (TextMessage) messageConsumer.receive(); if (textMessage !=null){ System.out.println("顾客接收到信息---->" textMessage.getText()); }else { break; } } messageConsumer.close(); session.close(); connection.close(); }}

检测:

1.最先运作经营者,ActiveMQProducer。

2.查验数据库查询:

activemq消息持久化方式-activemq和kafka区别-第3张图片3.运作使用人,主题活动MQConsumer并輸出:

INFO | Successfully connected to tcp://192.168.1.17:61616顾客接收到信息---->msg---->1顾客接收到信息---->msg---->2顾客接收到信息---->msg---->3顾客接收到信息---->msg---->4顾客接收到信息---->msg---->5顾客接收到信息---->msg---->6顾客接收到信息---->msg---->7

4.再度查验数据库查询,信息已被删掉。

评论(0条)

刀客源码 游客评论