博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MQ 消息列队
阅读量:4031 次
发布时间:2019-05-24

本文共 7190 字,大约阅读时间需要 23 分钟。

是什么

消息队列(简称MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直传递接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求,

在项目中我们出了mq以外还有一个产品叫 TongLinkQ ,TongLinkQ给mq的功能是几乎一样的,只是是有国内的中间价厂商东方通公司研发的,该产品在国家很多大的重点项目中也得到了应用,所以如果是做项目用的话小弟还是建议没必要用mq,mq有他的好处,但是TongLinkQ 也有自己的强项,谁讲话,不是什么都是越好的越好,而是适合自己的才是最好的.....呵呵 好了 不废话了 看正题

图解
通讯方式
MQ有两种通讯模式,即数据报 (Datagram) 方式和请求/应答(Request/Reply) 方式:其中,Datagram方式通常又被称为"Send And Forget"(发送/忽略),是最简单的通讯模式,应用程序只需在创建完消息之后,利用MQ的API将消息发送到队列中,它充分利用了MQ确保消息传输,并且传一次且仅传一次(once and once only)的优势,发送端应用程序无需关心消息何时被处理;Request/Reply(请求/应答)方式相对复杂一些,在消息发出之后,你需要等待对方的处理结果,在这种情况下,我们通常需要考虑其他一些问题,如:
等待应答的时间是多少?
如果没有收到应答,是否再次发出请求?
应答发出之前是否会有数据库操作或其他交易被执行?
本次请求/应答过程的会话(session)信息是否需要被保留?
开发流程
Ø
         第一步是让应用程序与队列管理器连接。它通过 MQConnect 调用来进行此连接。
Ø
         下一步使用 MQOpen 调用为输出打开一个队列。
Ø
         然后应用程序使用 MQPut 调用将其数据放到队列上。
Ø
         要接收数据,应用程序调用 MQOpen 调用打开输入队列。
Ø
         应用程序使用 MQGet 调用从队列上接收数据。
<IBM WEBSPHERE MQ基础教程>
<精通WEBSPHERE MQ开发>
以 WebSphere MQ Windows V5.3版为例来介绍WebSphere MQ服务端在Windows XP(sp2) 下的安装和配置。
Windows下 WebSphere MQ 服务端的安装:
( 1)把WebSphere MQ Windows版服务器CD-ROM插入CD-ROM驱动器。
( 2)如果安装了自动运行,那么会启动安装进程。如果不启动,则双击CD-ROM上的根目录中的Setup图标以启动安装程序。
(3)请等待,直到出现"WebSphere MQ 安装启动板"窗口为止。
(4)如果需要更改安装的本地语言,单击"选择语言"图标,然后从列表中选择所需的语言。
(5)选择必备软件选项。
选择典型安装后,安装界面上的每个安装项右边有一个对钩号(表示已安装),反之则为一个叉号(表示还没有安装,如果要装 MQ,则必须先把这些软件装好)。
如果出现了叉号:
1)单击项目左边的 "+"号以显示安装连接;
2)选择要使用的安装源的选项,从以下各项选择:
  • WebSphere MQ CD
  • 因特网
  • 网络
( 6)安装完成时,单击项目左边的"-"符号。
注意:对于定制安装,可能不需要所有的必备软件。
( 7)安装完所有的必备软件,然后选择"网络先决条件"选项。
( 8)选择"WebSphere MQ"安装选项。
( 9)选择启动WebSphere MQ安装程序,然后等待,直到显示了带有欢迎信息的"WebSphere MQ安装"窗口为止。
( 10)单击"下一步"按钮继续。
( 11)阅读面板上的信息和许可证条款,选择接受,然后单击"下一步"。
( 12)如果机器上未安装过此产品的前一个版本,则显示"安装类型"面板。选择希望的安装类型(一般选择"典型"安装即可),然后单击"下一步"按钮。
( 13)"WebSphere MQ安装"窗口显示"安装WebSphere MQ就绪"信息。该窗口还显示用户选中的安装摘要,单击"安装"开始正式安装。
( 14)成功安装WebSphere MQ后,"WebSphere MQ安装"窗口显示以下信息:安装向导成功完成。
( 15)单击"完成"按钮启动"准备WebSphere MQ"向导。
该章节主要内容包括MQ服务端的基本配置,如队列管理器,队列以及通道的建立, 且包含JAVA实现的客户端程序。由此构成一个完整的客户端-服务器的流程。
1)点 "开始"->"所有程序"->"IBM WebSphere MQ"->"WebSphere MQ 资源管理器",进入WebSphere MQ 资源管理器界面。如下图所示:
2)创建名为 "QM_JACK"的队列管理器
<1>选中 "队列管理器"->"新建"->"队列管理器",如下图所示:
<2>在队列管理中输入 "QM_JACK",其他选项默认不变,点"下一步":
<3>设置队列日志(本步骤采用系统默认设置),点 "下一步":
<4>启动队列管理器,创建服务器连接通道,允许在 TCP/IP上进行队列管理器的远程管理,点击"下一步":
<5>设置队列管理器 QM_JACK的侦听端口:8927(用户可以根据需要自行更改端口号),点击"完成"。
<6>系统进入等待界面:
<7>队列管理器 QM_JACK创建成功
3)在 QM_JACK下创建名为"QUEUE_RECV"和"QUEUE_REPLY"的本地队列(客户可以根据自己的需求随意更改本地队列的名字和数量,这里创建这两个本地队列只是为之后的MQ_Tuxedo项目作准备):
设置队列名后其他属性全为系统默认值,点击 "确定"。
4)在 QM_JACK下创建名为"CNN_JACK"的服务器通道。
通道名称设为 CNN_JACK,其他选项保留为系统默认设置,点击"确定"。
5)在 MQ服务器端的计算机用户中添加MQ客户端所在计算机的系统用户。比如我的MQ客户端被Suse Linux下的root用户使用,那么,我们就需要在MQ的服务端(也就是Windows xp系统中添加名为"root"的用户)所在的计算机系统中添加名为"root"的用户。具体步骤如下:
<1>点击 "开始"->"控制面板"->"计算机管理"->"系统工具"->"本地用户和组"->"用户",点右键,选"新用户",如下图所示:
<2>创建新用户: root(注意:这里的用户名"root"是MQ客户端所在系统的用户名,用户要根据具体情况进行修改)
随意设置一个有效密码,选中密码永不过期,点击 "创建"。
<3>将新用户加入 mqm组(注意:这里的mqm组是我们安装完MQ Server后系统自动创建的)。具体操作如下:
点击 "属性":
点击 "添加":
输入对象名称: mqm,点击"确定":
root用户被添入 mqm组中:
点击 "应用",点击"确定"。
重启机器。至此, MQ服务端的配置完成。
JAVA客户端源码
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
/**
* @author
* @version 1.0
* 创建时间:2007-7-26 9:40:01
* 类说明
*/
public class MQClient {
    /**
    * Logger for this class
    */
    private static final Log logger = LogFactory.getLog(MQClient.class);
    private String strExtraSendXmlFileName = "D:\\jndi.properties";
    private static Properties props;
    static {
       props = new Properties();
       props.put("mqHostName", "139.31.89.67");
       props.put("mqPort", "4032");
       props.put("mqCCSID", "1381");
       props.put("mqUserName", "liujx");
       props.put("mqPassword", "linux");
       props.put("mqQManager", "Monitor_Queue_Manager");
       props.put("mqChannel", "Monitor_Conn_Chanel");
       props.put("mqLocalOutQueue", "Q_RECEIVE");
       props.put("mqLocalInQueue", "Q_RECEIVE");
    }
    /**
    * 主测试方法
    * @param args
    */
    public static void main(String[] args) {
       MQClient test = new MQClient();
       //发送消息
       test.putMsg();
       //接收消息
       test.getMsg();
    }
    public void putMsg() {
       // MQ发送数据   
       try {
           // 建立MQ客户端应用上下文环境   
           MQEnvironment.hostname = props.getProperty("mqHostName");
           MQEnvironment.port = Integer.parseInt(props.getProperty("mqPort"));
           MQEnvironment.CCSID = Integer
                  .parseInt(props.getProperty("mqCCSID"));
           MQEnvironment.channel = props.getProperty("mqChannel");
           MQEnvironment.userID = props.getProperty("mqUserName");
           MQEnvironment.password = props.getProperty("mqPassword");
           // 连接队列管理器
           MQQueueManager qMgr = new MQQueueManager(props
                  .getProperty("mqQManager"));
           int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
           // 打开队列
           MQQueue q = null;
           try {
              q = qMgr.accessQueue(props.getProperty("mqLocalOutQueue"),
                     openOptions);
           } catch (MQException me) {
              System.out.println("打开队列出现通讯异常" + me.getMessage() + "\n");
              return;
           }
           
           InputStream fins = new FileInputStream(new File(strExtraSendXmlFileName));
          
           byte[] data = new byte[fins.available()];
           fins.read(data);
           fins.close();
           MQMessage msg = new MQMessage();
           msg.write(data);
           // 放入消息   
           q.put(msg);
           System.out.println("客户端发送数据包成功..");
           // 关闭队列   
           q.close();
           // 断开队列管理器连接   
           qMgr.disconnect();
       } catch (MQException e) {
           if (logger.isDebugEnabled())
              logger.debug(e.getMessage());
           e.printStackTrace();
       } catch (Exception e) {
           if (logger.isDebugEnabled())
              logger.debug(e.getMessage());
           e.printStackTrace();
       }
    }
    /**
    * 获取数据
    *
    */
    public void getMsg() {
       // MQ接收数据   
       try {
           // 建立用上下文环境   
           MQEnvironment.hostname = props.getProperty("mqHostName");
           MQEnvironment.port = Integer.parseInt(props.getProperty("mqPort"));
           MQEnvironment.CCSID = Integer.parseInt(props.getProperty("mqCCSID"));   
           MQEnvironment.channel = props.getProperty("mqChannel");
           MQEnvironment.userID = props.getProperty("mqUserName");
           MQEnvironment.password = props.getProperty("mqPassword");
           // 建立队列管理器
           MQQueueManager qMgr = new MQQueueManager(props
                  .getProperty("mqQManager"));
           int openOptions = MQC.MQOO_INPUT_AS_Q_DEF
                  | MQC.MQOO_FAIL_IF_QUIESCING;
           // 打开队列
           MQQueue q = qMgr.accessQueue(props.getProperty("mqLocalInQueue"),
                  openOptions);
           MQGetMessageOptions mgo = new MQGetMessageOptions();
           mgo.options |= MQC.MQGMO_NO_WAIT;
           //构造返回消息
           MQMessage msg = new MQMessage();
           if ((msg = fetchOneMsg(q)) != null) {
              byte[] xmlData = new byte[msg.getDataLength()];
              msg.readFully(xmlData);
              logger.info(new String(xmlData));
              System.out.println("接收服务器端返回数据包成功..\n接收数据为:\n"+new String(xmlData));
           }
           // 关闭队列   
           q.close();
           // 断开队列管理器   
           qMgr.disconnect();
       } catch (MQException e) {
           logger.error(e);
           e.printStackTrace();
       } catch (Exception e) {
           logger.error(e);
           e.printStackTrace();
       }
    }
    /**
    * 从队列中取出一个消息
    *  
    * @param q 队列名称
    * @return
    * @throws Exception
    */
    private static MQMessage fetchOneMsg(MQQueue q) throws Exception {
       MQGetMessageOptions mgo = new MQGetMessageOptions();
       mgo.options |= MQC.MQGMO_NO_WAIT;
       MQMessage msg = new MQMessage();
       try {
           // 获取消息   
           q.get(msg, mgo);
       } catch (MQException e) {
           return null;
       }
       return msg;
    }
}
发送文件内容:
name = JACK
age = 23
sex = male
执行结果,控制台输出如下:
客户端发送数据包成功
..
接收服务器端返回数据包成功
..
接收数据为
:
name = JACK
age = 23
sex = male

转载地址:http://lcebi.baihongyu.com/

你可能感兴趣的文章
Redis面试题之持久化和五种部署方式
查看>>
搞定Spring Cloud断路器组件 Hystrix 的舱壁模式
查看>>
mybatis之大于、小于、大于等于和小于等于的写法
查看>>
如何实现 Oracle 的自增序列,两步轻松搞定
查看>>
不要等到Oracle磁盘空间满了,再去查表空间使用情况
查看>>
Hive SQL常用命令总结,大数据开发人员按需收藏
查看>>
Caused by: java.lang.NoClassDefFoundError: org/apache/commons/io/IOUtils
查看>>
使用 Springboot 对 Kettle 进行调度开发
查看>>
Kettle链接MySQL报错:Driver class 'org.gjt.mm.mysql.Driver' could not be found
查看>>
Python的Turtle库非常实用,绘制图形竟然这么简单
查看>>
JVM面试要点:G1 垃圾收集器和如何做到可预测的停顿
查看>>
阿里巴巴倡导的数据中台,到底是什么东东
查看>>
揭底JVM,怎么能不了解G1垃圾收集器
查看>>
如何优雅的编程,lombok你怎么这么好用
查看>>
@RequestParam、@QueryParam等Spring常见参数注解区别,你知道吗
查看>>
玩转远程Debug,两步轻松开启IDEA远程调试
查看>>
Jmeter压测错误,Address already in use: connect
查看>>
Intellij IDEA常用快捷键,最全总结
查看>>
前端干货,超实用的JQuery小技巧
查看>>
Spring Boot 几个常见的核心注解
查看>>