[TOC]

##前言##
本博客介绍Java中间件的一些知识,仅仅是一些知识储备。

##中间件##

###中间件概念###
中间件:中间件是一种介于操作系统和应用软件之间的一种软件,它使用系统软件所提供的基础服务(功能),衔接网络上应用系统的各个部分或不同的应用,能够达到资源共享、功能共享的目的。
若是以新一代的中间件系列产品来组合应用,同时配合以可复用的商务对象构件,则应用开发费用可节省至80%。

###中间件分类###

  1. 消息中间件
    消息中间件适用与进行网络通讯的系统,建立网络通讯的通道,进行数据和文件的传送
    产品:ActiveMQ、ZeroMQ、RabbitMQ、IBM webSphere MQ…
  2. 交易中间件
    交易中间件管理分布与不同操作系统的数据,实现数据一致性,保证系统的负载均衡
    产品:IBM CICS,Bea tuxedo…
  3. 对象中间件
    保证不同厂家的软件之间的交互访问
    产品:IBM componentbroker, iona orbix,borland visibroker…
  4. 应用服务器
    用来构造internet/intranet应用和其它分布式构件应用
    产品:IBM Websphere,Bea weblogic…
  5. 安全中间件
    以公钥基础设施(pki)为核心的、建立在一系列相关国际安全标准之上的一个开放式应用开发平台
    产品:entrust entrust…
  6. 应用集成服务器
    把工作流和应用开发技术如消息及分布式构件结合在一起,使处理能方便自动地和构件、script 应用、工作流行为结合在一起,同时集成文档和电子邮件
    产品:lss flowman、ibm flowmark、vitria businessagiliti

##ESB##
ESB,即企业服务总线
松散耦合一直是企业软件开发中的一个很重要的内容,而面向服务的SOA编程在随着ESB的应用得到了进一步的发展,ESB就像服务提供者和服务使用者之间的中间层
这里写图片描述

##JMS##
JMS,即Java Message Service
ESB仅仅是作为一个中间层,所以应用程序之间的消息通讯必须借助JMS,即通过JMS从服务使用者接收消息,并将其转发到相应的服务提供者。
而且,JMS 还定义了可发送的若干不同类型的消息。例如,Text 消息包含消息的字符串表示形式;Object 消息包含序列化的 Java 对象;Map 消息包含键/值对的映射,等等。

附录:
MQ DEMO:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
package com.wms.batchMsg;
import java.io.File;
import java.io.IOException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.util.Date;
import org.apache.log4j.Logger;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.MQConstants;
public class MQUtil {
private static String qmName;
private static MQQueueManager qMgr;
private static Logger logger = Logger.getLogger(MQUtil.class);
static{
try{
MQEnvironment.hostname=ConfigManager.getValue("MQ_MQHost");
MQEnvironment.channel=ConfigManager.getValue("MQ_Server_Channel");
MQEnvironment.CCSID=Integer.parseInt(ConfigManager.getValue("MQ_CCSID"));
MQEnvironment.port=Integer.parseInt(ConfigManager.getValue("MQ_port"));
//MQEnvironment.userID = ConfigManager.getValue("MQ_UserId");
//MQEnvironment.password = ConfigManager.getValue("MQ_pass");
qmName = ConfigManager.getValue("MQ_QMname");
MQEnvironment.properties.put(MQConstants.TRANSPORT_PROPERTY,MQConstants.TRANSPORT_MQSERIES_CLIENT);
qMgr = new MQQueueManager(qmName);
}catch(MQException e){
e.printStackTrace();
logger.info("qManager failed: Completion code " + e.completionCode + " Reason Code is "
+ e.reasonCode);
}
}
public static MQQueue getSendQueue(String queueName) {
MQQueue sQueue;
int openSendOptions = MQConstants.MQOO_OUTPUT | MQConstants.MQOO_FAIL_IF_QUIESCING
| MQConstants.MQOO_SET_IDENTITY_CONTEXT;
try {
sQueue = qMgr.accessQueue(queueName, openSendOptions);
} catch (MQException e) {
e.printStackTrace();
return null;
}
return sQueue;
}
public static MQQueue getReceiveQueue(String revQueueName){
MQQueue rQueue ;
int openRcvOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_FAIL_IF_QUIESCING;
try{
rQueue = qMgr.accessQueue(revQueueName, openRcvOptions);
}catch(MQException e){
e.printStackTrace();
return null;
}
return rQueue;
}
public static void sendMsg(MQMsgEntity entity,String queueName) {
MQQueue sendQ = null;
try {
MQMessage qMsg = new MQMessage();
byte[] qByte = entity.getMsgStr().getBytes("UTF-8");
// String message = entity.getMsgStr();
qMsg.messageId = MQConstants.MQMI_NONE;
//TODO send and receive
if(entity.getCorrelId()!=null){
qMsg.correlationId = entity.getCorrelId();
}
qMsg.format = MQConstants.MQFMT_STRING;
qMsg.write(qByte);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = pmo.options + MQConstants.MQPMO_NEW_MSG_ID;
pmo.options = pmo.options + MQConstants.MQPMO_NO_SYNCPOINT;
pmo.options = pmo.options + MQConstants.MQPMO_SET_IDENTITY_CONTEXT;
sendQ = getSendQueue(queueName);
sendQ.put(qMsg, pmo);
qMgr.commit();
//logger.info("The send message is: " +new String(qByte,"UTF-8"));
} catch (MQException e) {
logger.info("A WebSphere MQ error occurred : Completion code "
+ e.completionCode + " Reason Code is "
+ e.reasonCode);
} catch (java.io.IOException e) {
logger.info("An error occurred whilst to the message buffer "
+ e);
}finally{
try{
if(sendQ!=null){
sendQ.close();
}
}catch(MQException e){
// TODO Auto-generated catch block
e.printStackTrace();
logger.info("Error for MQ connection:"+e.getMessage());
}
}
}
// public static void messageHandlerByQueueName(MQMsgEntity entity,String queueName) {
// try {
// if(queueName.equalsIgnoreCase("sap_OrdersQueue")){
// ECOrder order = new ECOrder();
// order.CallOrderCURFC(entity, "ZECI001");
// }else if(queueName.equalsIgnoreCase("sap_OrderPendReqQueue")){
// ECOrderPending orderPending = new ECOrderPending();
// orderPending.CallOrderPendRFC(entity, "ZECI005");
// }else if(queueName.equalsIgnoreCase("sap_OrderPendCancelQueue")){
// ECOrderPending orderPending = new ECOrderPending();
// orderPending.CallCancelOrderPendRFC(entity, "ZECI006");
// }else if(queueName.equalsIgnoreCase("sap_ECReturnsQueue")){
// ECOrder order = new ECOrder();
// order.callOrderCancelRFC(entity, "ZECI001");
// }else if(queueName.equalsIgnoreCase("sap_downpaymentQueue")){
// ECDownPayment downPayment = new ECDownPayment();
// downPayment.callDownPaymentRFC(entity, "ZECI007");
// }else if(queueName.equalsIgnoreCase("sap_360LBPQueue")){
// EC360LBP lbp = new EC360LBP();
// lbp.generateHtmlFromQueue(entity.getMsgStr());
// }
// } catch (Exception e) {
// e.printStackTrace();
// logger.error(e.getMessage());
// }
//
// }
public MQQueueManager generateNewMQQM(){
MQQueueManager qMgr = null;
try{
MQEnvironment.hostname=ConfigManager.getValue("MQ_MQHost");
MQEnvironment.channel=ConfigManager.getValue("MQ_Server_Channel");
MQEnvironment.CCSID=Integer.parseInt(ConfigManager.getValue("MQ_CCSID"));
MQEnvironment.port=Integer.parseInt(ConfigManager.getValue("MQ_port"));
String qmName = ConfigManager.getValue("MQ_QMname");
MQEnvironment.properties.put(MQConstants.TRANSPORT_PROPERTY,MQConstants.TRANSPORT_MQSERIES_CLIENT);
qMgr = new MQQueueManager(qmName);
}catch(MQException e){
e.printStackTrace();
logger.info("qManager failed: Completion code " + e.completionCode + " Reason Code is "
+ e.reasonCode);
}
return qMgr;
}
public void MultiThreadGetMqMessage(MQQueueManager qMgr,String queueName){
MQQueue revQ = null;
String mqString = null;
MQMsgEntity entity = new MQMsgEntity();
int openRcvOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_FAIL_IF_QUIESCING;
try {
MQMessage retrievedMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options += MQConstants.MQPMO_NO_SYNCPOINT;//
gmo.options = gmo.options + MQConstants.MQGMO_WAIT;//
gmo.options = gmo.options + MQConstants.MQGMO_FAIL_IF_QUIESCING;//
gmo.waitInterval = MQConstants.MQWI_UNLIMITED;//
gmo.matchOptions = MQConstants.MQMO_MATCH_MSG_ID;
retrievedMessage.format=MQConstants.MQFMT_STRING;
// MQC.MQWI_UNLIMITED;
revQ = qMgr.accessQueue(queueName, openRcvOptions);
revQ.get(retrievedMessage, gmo);
qMgr.commit();
int length = retrievedMessage.getDataLength();
if(length >0){
long startTime = System.currentTimeMillis();
byte[] msg = new byte[length];
retrievedMessage.readFully(msg);
mqString = new String(msg, "UTF-8");
if(queueName.equalsIgnoreCase("sap_360LBPQueue")){
mqString = mqString.replace("'", "\"");
}
long timeuse = System.currentTimeMillis() - startTime;
Date currentDate = new Date();
Timestamp receiveTimestamp = new Timestamp(currentDate.getTime());
logger.info("=========mqString from "+queueName+" :"+mqString);
DBUtil.insertIntoMQLog("Receive",queueName, mqString, timeuse, "success", "", null, receiveTimestamp);
entity.setMsgStr(mqString);
//messageHandlerByQueueName(entity,queueName);
}else{
logger.info("Error MQ string Sent!");
}
}
catch (MQException e) {
e.printStackTrace();
if (e.reasonCode != 2033)
{
logger.info(e.getMessage());
logger.info("Completion code "
+ e.completionCode + " Reason Code is " + e.reasonCode);
}
} catch (IOException e) {
logger.info("IO error:" + e.getMessage());
} finally{
try{
if(revQ!=null){
revQ.close();
}
}catch(MQException mqEx){
int rc = mqEx.reasonCode;
if (rc != MQException.MQRC_NO_MSG_AVAILABLE)
{
logger.info(" PUT Message failed with rc = "
+ rc);
}
}
}
}
public static String getMQMessage(String queueName) throws ParseException {
MQQueue revQ = null;
String mqString = null;
MQMsgEntity entity = new MQMsgEntity();
try {
MQMessage retrievedMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options += MQConstants.MQPMO_NO_SYNCPOINT;//
gmo.options = gmo.options + MQConstants.MQGMO_WAIT;//
gmo.options = gmo.options + MQConstants.MQGMO_FAIL_IF_QUIESCING;//
gmo.waitInterval = MQConstants.MQWI_UNLIMITED;//
gmo.matchOptions = MQConstants.MQMO_MATCH_MSG_ID;
retrievedMessage.format=MQConstants.MQFMT_STRING;
// MQC.MQWI_UNLIMITED;
revQ = getReceiveQueue(queueName);
revQ.get(retrievedMessage, gmo);
qMgr.commit();
int length = retrievedMessage.getDataLength();
if(length >0){
byte[] msg = new byte[length];
retrievedMessage.readFully(msg);
mqString = new String(msg, "UTF-8");
logger.info("=========getMQMessage===mqString from "+queueName+" :"+mqString);
entity.setMsgStr(mqString);
//messageHandlerByQueueName(entity,queueName);
}else{
logger.info("Error MQ string Sent!");
}
}
catch (MQException e) {
e.printStackTrace();
if (e.reasonCode != 2033)
{
e.printStackTrace();
logger.info("Completion code "
+ e.completionCode + " Reason Code is " + e.reasonCode);
}
} catch (java.io.IOException e) {
System.out.println("error" + e.getMessage());
}finally{
try{
if(revQ!=null){
revQ.close();
}
}catch(MQException mqEx){
int rc = mqEx.reasonCode;
if (rc != MQException.MQRC_NO_MSG_AVAILABLE)
{
System.out.println(" PUT Message failed with rc = "
+ rc);
}
}
}
return mqString;
}
public void revAndSend(MQMsgEntity entity,String queueName){
//
sendMsg(entity,queueName);
}
public void subscribeMessage() throws ParseException{
while(true){
logger.info("waiting to get message.....");
getMQMessage("sap_OrdersQueue");
}
}
public void subscribeOrderPendMessage() throws ParseException{
while(true){
logger.info("waiting to get message.....");
getMQMessage("sap_ECReturnsQueue");
}
}
public static void main(String[] args) throws IOException, ParseException {
MQMsgEntity entity = new MQMsgEntity();
String sendMsg = XMLBeanUtil.readFileToString(new File("D://batchXML0108.txt"));
int intPktCtlNbr = 1;
String StrPkt = null;
String newPktCtlNbr =null;
for (int i = 0; i < 20000; i++) {
newPktCtlNbr = String.format("%09d", intPktCtlNbr+i);
StrPkt="<PktCtlNbr>"+"V"+newPktCtlNbr+"</PktCtlNbr>";
String changeSendMsg = sendMsg.replaceAll("<PktCtlNbr>6001996171</PktCtlNbr>", StrPkt);
System.out.println(StrPkt);
entity.setMsgStr(changeSendMsg);
sendMsg(entity,"wms_SAPOrderQueue");
}
// MQUtil util = new MQUtil();
// util.subscribeMessage();
// util.subscribeOrderPendMessage();
// util.messageHandlerByQueueName(entity, "sap_360LBPQueue");
// getMQMessage("sap_OrderPendCancelQueue");
// System.out.println("rev message is:"+message);
}
}
文章目录
|