Destination destination = session.createTopic("xxxx");
import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class TopicPublisher { private static String user = ActiveMQConnection.DEFAULT_USER; private static String password =ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://localhost:61616"; public static void main(String[] args)throws Exception { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); // Connection 启动 connection.start(); System.out.println("Connection is start..."); // Session: 一个发送或接收消息的线程 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); // Topicr :消息的目的地;消息发送给谁. Topic topic = session.createTopic("example.A"); // MessageProducer:消息发送者 MessageProducer producer = session.createProducer(topic); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer); session.commit(); connection.close(); System.out.println("send text ok."); } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= 100; i++) {//有限制,达到1000就不行 TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } } }
import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class TopicSubscriber{ private static String user = ActiveMQConnection.DEFAULT_USER; private static String password =ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://localhost:61616"; private static boolean flag=true; public static void main(String[] args) throws Exception{ // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. Topic topic=session.createTopic("example.B"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener(){//有事务限制 @Override public void onMessage(Message message) { try { TextMessage textMessage=(TextMessage)message; System.out.println(textMessage.getText()); } catch (JMSException e1) { e1.printStackTrace(); } try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); // 另外一种接受方式 // while(flag){ // TextMessage message = (TextMessage)consumer.receive(1); // if(message != null){ // if("stop".equals( message.getText())){ // flag = false; // } // } // } }}
import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class TopicSubscriber2{ private static String user = ActiveMQConnection.DEFAULT_USER; private static String password =ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://localhost:61616"; private static boolean flag=true; public static void main(String[] args) throws Exception{ // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. Topic topic=session.createTopic("example.A"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener(){//有事务限制 @Override public void onMessage(Message message) { try { TextMessage textMessage=(TextMessage)message; System.out.println(textMessage.getText()); } catch (JMSException e1) { e1.printStackTrace(); } try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); // 另外一种接受方式 // while(flag){ // TextMessage message = (TextMessage)consumer.receive(1); // if(message != null){ // if("stop".equals( message.getText())){ // flag = false; // } // } // } }}