博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【java-activemq】 activemq发布订阅模式
阅读量:6662 次
发布时间:2019-06-25

本文共 5600 字,大约阅读时间需要 18 分钟。

hot3.png

Destination destination = session.createTopic("xxxx");

import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class TopicPublisher {	private static String user = ActiveMQConnection.DEFAULT_USER;  	private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  	private static String url =  "tcp://localhost:61616";  	public static void main(String[] args)throws Exception {  		// ConnectionFactory :连接工厂,JMS 用它创建连接  		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  		// Connection :JMS 客户端到JMS Provider 的连接  		Connection connection = connectionFactory.createConnection();  		// Connection 启动  		connection.start();  		System.out.println("Connection is start...");  		// Session: 一个发送或接收消息的线程  		Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  		// Topicr :消息的目的地;消息发送给谁.  		Topic  topic = session.createTopic("example.A");  		// MessageProducer:消息发送者  		MessageProducer producer = session.createProducer(topic);  		// 设置不持久化,此处学习,实际根据项目决定  		producer.setDeliveryMode(DeliveryMode.PERSISTENT);  		// 构造消息,此处写死,项目就是参数,或者方法获取  		sendMessage(session, producer);  		session.commit();  		connection.close();  		System.out.println("send text ok.");  	}  	public static void sendMessage(Session session, MessageProducer producer)  throws Exception {  		for (int i = 1; i <= 100; i++) {//有限制,达到1000就不行  			TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);  			// 发送消息到目的地方  			System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);  			producer.send(message);  		}  	}  }
import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class TopicSubscriber{	private static String user = ActiveMQConnection.DEFAULT_USER;  	private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  	private static String url = "tcp://localhost:61616";  	private static boolean flag=true;	public static void main(String[] args) throws Exception{  		// ConnectionFactory :连接工厂,JMS 用它创建连接  		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  		// Connection :JMS 客户端到JMS Provider 的连接  		Connection connection = connectionFactory.createConnection();  		connection.start();  		// Session: 一个发送或接收消息的线程  		final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  		// Destination :消息的目的地;消息发送给谁.  		Topic topic=session.createTopic("example.B");  		// 消费者,消息接收者  		MessageConsumer consumer = session.createConsumer(topic);  		consumer.setMessageListener(new MessageListener(){//有事务限制  			@Override  			public void onMessage(Message message) {  				try {  					TextMessage textMessage=(TextMessage)message;  					System.out.println(textMessage.getText());  				} catch (JMSException e1) {  					e1.printStackTrace();  				}  				try {  					session.commit();  				} catch (JMSException e) {  					e.printStackTrace();  				}  			}  		});  		//		  另外一种接受方式 		//		while(flag){  		//			TextMessage message = (TextMessage)consumer.receive(1);  		//			if(message != null){  		//				if("stop".equals( message.getText())){  		//					flag = false;  		//				}  		//			}  		//		}  	}}
import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class TopicSubscriber2{	private static String user = ActiveMQConnection.DEFAULT_USER;  	private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  	private static String url = "tcp://localhost:61616";  	private static boolean flag=true;	public static void main(String[] args) throws Exception{  		// ConnectionFactory :连接工厂,JMS 用它创建连接  		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  		// Connection :JMS 客户端到JMS Provider 的连接  		Connection connection = connectionFactory.createConnection();  		connection.start();  		// Session: 一个发送或接收消息的线程  		final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  		// Destination :消息的目的地;消息发送给谁.  		Topic topic=session.createTopic("example.A");  		// 消费者,消息接收者  		MessageConsumer consumer = session.createConsumer(topic);  		consumer.setMessageListener(new MessageListener(){//有事务限制  			@Override  			public void onMessage(Message message) {  				try {  					TextMessage textMessage=(TextMessage)message;  					System.out.println(textMessage.getText());  				} catch (JMSException e1) {  					e1.printStackTrace();  				}  				try {  					session.commit();  				} catch (JMSException e) {  					e.printStackTrace();  				}  			}  		});  		//		  另外一种接受方式 		//		while(flag){  		//			TextMessage message = (TextMessage)consumer.receive(1);  		//			if(message != null){  		//				if("stop".equals( message.getText())){  		//					flag = false;  		//				}  		//			}  		//		}  	}}

转载于:https://my.oschina.net/v512345/blog/1530090

你可能感兴趣的文章
ES查看segment大小
查看>>
java----序列化与反序列化中及java序列化本质就是存储一个对象,然后在其他地方在调用它...
查看>>
C#编程(四)
查看>>
javascript中五种基本数据类型
查看>>
MySQL 5.7最新版本的2个bug
查看>>
从jvm的角度来看单例模式
查看>>
基于msm8909高通平台Android驱动开发之hello程序
查看>>
【Little Demo】左右按钮tab选项卡双切换
查看>>
【POJ1037】A decorative fence(DP)
查看>>
SAP安装前添加虚拟网卡步骤
查看>>
React Conf 2017 干货总结 1: React + ES next = ♥
查看>>
互联网+时代,是更加开放还是封闭
查看>>
iOS单元測试:Specta + Expecta + OCMock + OHHTTPStubs + KIF
查看>>
Cocos2d-x 3.x版2048游戏开发
查看>>
Scroller源码解析
查看>>
strcmp在CTF中的案例
查看>>
C标准提前定义宏,调试时加打印非常实用
查看>>
List&lt;InvestInfoDO&gt; invest = advertiseDao6.qryInvestInfo(InvestInfoDO1);怎样获得list的实体类;...
查看>>
【Java编程】建立一个简单的JDBC连接-Drivers, Connection, Statement and PreparedStatement
查看>>
Android ble蓝牙问题
查看>>