论坛首页 Java企业应用论坛

【入门】基于ActiveMQ 的发布/订阅(Pub/Sub) Chat 示例,上传了源码

浏览 10112 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (16)
作者 正文
   发表时间:2010-12-08   最后修改:2011-05-18

环境需求:

1. JDK 1.5 或者以上

2. Apache Ant , 在写本文时,用的是 Ant 1.7.1

3. ActiveMQ , 在写本文时,用的是 Apache ActiveMQ 5.4.1

技术需求:

1. JMS(Java Message Service)  

2. JNDI(Java Naming and Directory Interface )

 

在JMS的“发布/订阅(pub/sub) ”模型中,消息的发布者(Publisher) 通过主题(Topic) 发布消息,订阅者(Subscriber) 通过订阅主题获取消息。 一个主题可以同时有多个订阅者. 通过这种方式我们可以实现广播式(broadcast)消息。

为了更好的理解"发布/订阅(Pub/Sub)"模式,我在《Java消息服务器 第二版》 上找到了一个很好的例子来说明他的使用。不过书上只提供了相关代码供我们理解,没有讲述整个创建过程,在这里打算记录下整个构建实例的过程:

 

1.  创建项目目录入下图所示,并将activemq-all-*.jar 复制到项目的classpath中:

 

 

2. 编写Chat代码:

 

 

public class Chat implements MessageListener {
	private TopicSession pubSession;
	private TopicPublisher pub;
	private TopicConnection conn;
	private String username;

	public Chat(String topicFactory, String topicName, String username)
			throws NamingException, JMSException {

		// 创建 JNDI context
		InitialContext ctx = new InitialContext();

		//1. 创建 TopicConnectionFacotry
		TopicConnectionFactory factory = (TopicConnectionFactory) ctx
				.lookup(topicFactory);
		//2. 创建 TopicConnection
		TopicConnection connection = factory.createTopicConnection();

		//3. 根据 Connection 创建 JMS 会话
		TopicSession pubSession = (TopicSession) connection.createSession(
				false, Session.AUTO_ACKNOWLEDGE);
		TopicSession subSession = (TopicSession) connection.createSession(
				false, Session.AUTO_ACKNOWLEDGE);

		//4. 创建 Topic
		Topic topic = (Topic) ctx.lookup(topicName);

		//5. 创建 发布者 和 订阅者
		TopicPublisher pub = pubSession.createPublisher(topic);
		TopicSubscriber sub = subSession.createSubscriber(topic, null, true);

		//6. 为发布者设置消息监听
		sub.setMessageListener(this);

		this.conn = connection;
		this.pub = pub;
		this.pubSession = pubSession;
		this.username = username;

		//7. 开启JMS连接
		connection.start();
	}

	protected void writeMessage(String txt) {
		try {
			TextMessage message = pubSession.createTextMessage();
			message.setText(username + ": " + txt);

			pub.publish(message);
		} catch (JMSException e) {
			e.printStackTrace();
		}

	}

	public void onMessage(Message msg) {
		TextMessage txtMsg = (TextMessage) msg;
		try {
			System.out.println(txtMsg.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void close() throws JMSException {
		this.conn.close();
	}

	public static void main(String[] args) throws NamingException,
			JMSException, IOException {
		if (args.length != 3) {
			System.out.println("Factory, Topic, or username missing");
		}

		Chat chat = new Chat(args[0], args[1], args[2]);

		BufferedReader cmd = new BufferedReader(
				new InputStreamReader(System.in));

		while (true) {
			String s = cmd.readLine();

			if (s.equalsIgnoreCase("exit")) {
				chat.close();
				System.exit(0);
			} else {
				chat.writeMessage(s);
			}
		}
	}
}

 

  3.由于里我们使用了JNDI, 所以我们需要编辑jndi.properties。内容如下:

 

 

# START SNIPPET: jndi
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

# use the following property to configure the default connector
java.naming.provider.url = tcp://localhost:61616

java.naming.security.principal=system
java.naming.security.credentials=manager

# use the following property to specify the JNDI name the connection factory
# should appear as. 
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
connectionFactoryNames = topicConnectionFactry


# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
#queue.MyQueue = example.ChatQue
topic.chat = example.chat

# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
#topic.MyTopic = example.ChatTop

# END SNIPPET: jndi

 

4. 到这里已经基本完成Chat 编码工作,使用如下指令即可运行这个示例:

 

《Java 消息服务》原文 写道
java com.dayang.jms.demo.Chat [topicConnectionFactory] [topicName] [userName]

 

  不过如果没有设置相关classpath,是不可能通过这个指令来成功运行这个Demo,在这里我打算使用Ant来帮我完成这个工作

 

5. 编写build.xml脚本如下:

 

 

<?xml version="1.0" encoding="utf-8" ?>
<project name="chat" default="run" basedir=".">
	<property name="src.dir" value="src" />
	<property name="build.dir" value="build" />
	<property name="classes.dir" value="${build.dir}/classes" />
	<property name="jar.dir" value="${build.dir}/jar" />
	<property name="lib.dir" value="libs"/>		
	
	<!-- 设置main函数所在类 -->
	<property name="main-class" value="com.dayang.jms.chat.Chat" />
	
	<!-- 定义classpath -->
	<path id="classpath">
		<fileset dir="${lib.dir}" includes="**/*.jar" />
	</path>
	
	<!-- 创建构建目录,用于存放构建生成的文件 -->
	<target name="init">
		<mkdir dir="${build.dir}"/>
	</target>
	
	<!-- 编译 -->
	<target name="compile" depends="init">
		<mkdir dir="${classes.dir}"/>
		<javac srcdir="${src.dir}" destdir="${classes.dir}" 
			classpathref="classpath"/>
		<!-- copy properties file to classpath -->
		<copy todir="${classes.dir}">
			<fileset dir="${src.dir}" excludes="**.*.jar" />
		</copy>
	</target>
	
	<!-- 打包 -->
	<target name="jar" depends="compile">
		<mkdir dir="${jar.dir}"/>
		<jar destfile="${jar.dir}/${ant.project.name}.jar" 
				basedir="${classes.dir}">
			<manifest>
				<attribute name="Main-Class" value="${main-class}" />
			</manifest>
		</jar>
	</target>
	
	<!-- 运行client1 -->
	<target name="run1" depends="jar">
		<java fork="true" classname="${main-class}">
			<arg value="topicConnectionFactry"/>
			<arg value="chat"/>
			<arg value="client1"/>
			<classpath>
				<path refid="classpath"/>
				<path location="${jar.dir}/${ant.project.name}.jar"/>
			</classpath>
		</java>
	</target>
	
	<!-- 运行client2 -->
	<target name="run2" depends="jar">
		<java fork="true" classname="${main-class}">
			<arg value="topicConnectionFactry"/>
			<arg value="chat"/>
			<arg value="client2"/>
			<classpath>
				<path refid="classpath"/>
				<path location="${jar.dir}/${ant.project.name}.jar"/>
			</classpath>
		</java>
	</target>
	
	<target name="clean">
		<delete dir="${build.dir}"/>
	</target>
	
	<target name="clean-build" depends="clean,jar"/>
	
</project>
 

 

6. 打开两个控制台窗口,分别使用ant run1 和 ant run2 指令来运行程序, 如果成功我们将看到如下结果:


 

写在最后:

这个示例仅仅简单的说了JMS 发布/订阅 API的基本使用,更多特性需要在以后的使用中进行摸索。

发布/订阅 除了能够提供“1对多”的消息专递方式之外,还提供了消息持久化的特性。他允许订阅者在上线后接收离线时的消息,关于这部分特性,以及“发布/订阅”的应用场景打算在以后的文章中慢慢阐述。

参考资料:

1. JMS: http://baike.baidu.com/view/157103.htm

2. ActiveMQ: http://baike.baidu.com/view/433374.htm

3. JNDI http://baike.baidu.com/view/209575.htm

4. 《Java消息服务器 第二版》

 

5. Ant Manual http://ant.apache.org/manual/index.html

 

2011-05-18: 新增加了Demo的源代码, 需要的可以下载附件JSMDemo.rar

JMSDemo工程目录结构如下:


 

 

  • 大小: 21.7 KB
  • 大小: 88.1 KB
  • 大小: 47.6 KB
   发表时间:2010-12-08  
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡
0 请登录后投票
   发表时间:2010-12-09  
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡

人家就是写个helloworld的demo,
nc啊问这些问题.....
0 请登录后投票
   发表时间:2010-12-09  
GRDJE 写道
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡

人家就是写个helloworld的demo,
nc啊问这些问题.....

+1
0 请登录后投票
   发表时间:2010-12-09  
liubey 写道
GRDJE 写道
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡

人家就是写个helloworld的demo,
nc啊问这些问题.....

+1

我只是希望有路过的高手出来指点一二

nc的几位,激动个啥。。。
0 请登录后投票
   发表时间:2010-12-09  
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ


0 请登录后投票
   发表时间:2010-12-09  
witcheryne 写道
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ



1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些
   1) 有丢消息或消息重复问题
   2) 连续启停,failover机制有问题,存在幽灵队列
2、发现没有什么好的测试方法
   1) 测试的benchmark,网上有一个,
   2) loadrunner不太好使,自己写程序测不太可靠;
0 请登录后投票
   发表时间:2010-12-09  
androidleader 写道
witcheryne 写道
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ



1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些
   1) 有丢消息或消息重复问题
   2) 连续启停,failover机制有问题,存在幽灵队列
2、发现没有什么好的测试方法
   1) 测试的benchmark,网上有一个,
   2) loadrunner不太好使,自己写程序测不太可靠;


这个你了解的比我多,性能测试方面我不太清楚, 关于测试方法希望你能分享一下。

我用JMeter测ActiveMQ 5.4.1, 开500个线程,1秒间隔, 循环 10 次~ 没发现什么异常...
我们用ActiveMQ主要目的是代替原先的SocketServer,将消息传递独立出来,解决C/S和B/S应用集成的问题。

需要高并发·你试试 ZeroMQ : http://www.infoq.com/cn/news/2010/09/introduction-zero-mq,
基于AMQP协议,用Erlang写的RabbitMQ你也可以试试: http://www.infoq.com/cn/articles/AMQP-RabbitMQ



0 请登录后投票
   发表时间:2010-12-09  
jms--Java Message Service lz貌似写错了
0 请登录后投票
   发表时间:2010-12-09  
grady 写道
jms--Java Message Service lz貌似写错了

多谢~ 立刻纠正...
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics