Github user pmouawad commented on a diff in the pull request:
https://github.com/apache/jmeter/pull/325#discussion_r150328441
--- Diff:
src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/JMSSampler.java ---
@@ -142,48 +173,244 @@ public SampleResult sample(Entry entry) {
res.sampleStart();
try {
- TextMessage msg = createMessage();
- if (isOneway()) {
- int deliveryMode = isNonPersistent() ?
-
DeliveryMode.NON_PERSISTENT:DeliveryMode.PERSISTENT;
- producer.send(msg, deliveryMode,
Integer.parseInt(getPriority()),
- Long.parseLong(getExpiration()));
- res.setRequestHeaders(Utils.messageProperties(msg));
- res.setResponseOK();
- res.setResponseData("Oneway request has no response data",
null);
+ LOGGER.debug("Point-to-point mode: " +
getCommunicationstyle());
+ if (isBrowse()) {
+ handleBrowse(res);
+ } else if (isClearQueue()) {
+ handleClearQueue(res);
+ } else if (isOneway()) {
+ handleOneWay(res);
+ } else if (isRead()) {
+ handleRead(context, res);
} else {
- if (!useTemporyQueue()) {
- msg.setJMSReplyTo(receiveQueue);
- }
- Message replyMsg = executor.sendAndReceive(msg,
- isNonPersistent() ? DeliveryMode.NON_PERSISTENT :
DeliveryMode.PERSISTENT,
- Integer.parseInt(getPriority()),
- Long.parseLong(getExpiration()));
- res.setRequestHeaders(Utils.messageProperties(msg));
- if (replyMsg == null) {
- res.setResponseMessage("No reply message received");
- } else {
- if (replyMsg instanceof TextMessage) {
- res.setResponseData(((TextMessage)
replyMsg).getText(), null);
- } else {
- res.setResponseData(replyMsg.toString(), null);
- }
-
res.setResponseHeaders(Utils.messageProperties(replyMsg));
- res.setResponseOK();
- }
+ handleRequestResponse(res);
}
} catch (Exception e) {
LOGGER.warn(e.getLocalizedMessage(), e);
- if (thrown != null){
+ if (thrown != null) {
res.setResponseMessage(thrown.toString());
- } else {
+ } else {
res.setResponseMessage(e.getLocalizedMessage());
}
}
res.sampleEnd();
return res;
}
+ private void handleBrowse(SampleResult res) throws JMSException {
+ LOGGER.debug("isBrowseOnly");
+ StringBuffer sb = new StringBuffer("");
+ res.setSuccessful(true);
+ sb.append("\n \n Browse message on Send Queue " +
sendQueue.getQueueName());
+ sb.append(browseQueueDetails(sendQueue, res));
+ res.setResponseData(sb.toString().getBytes());
+ }
+
+ private void handleClearQueue(SampleResult res) throws JMSException {
+ LOGGER.debug("isClearQueue");
+ StringBuffer sb = new StringBuffer("");
+ res.setSuccessful(true);
+ sb.append("\n \n Clear messages on Send Queue " +
sendQueue.getQueueName());
+ sb.append(clearQueue(sendQueue, res));
+ res.setResponseData(sb.toString().getBytes());
+ }
+
+ private void handleOneWay(SampleResult res) throws JMSException {
+ LOGGER.debug("isOneWay");
+ TextMessage msg = createMessage();
+ int deliveryMode = isNonPersistent() ? DeliveryMode.NON_PERSISTENT
: DeliveryMode.PERSISTENT;
+ producer.send(msg, deliveryMode, Integer.parseInt(getPriority()),
Long.parseLong(getExpiration()));
+ res.setRequestHeaders(Utils.messageProperties(msg));
+ res.setResponseOK();
+ res.setResponseData("Oneway request has no response data", null);
+ }
+
+ private void handleRead(JMeterContext context, SampleResult res) {
+ LOGGER.debug("isRead");
+ StringBuffer sb = new StringBuffer("");
+ res.setSuccessful(true);
+ Sampler sampler = context.getPreviousSampler();
+ SampleResult sr = context.getPreviousResult();
+ String jmsSelector = getJMSSelector();
+ if (jmsSelector.equals("_PREV_SAMPLER_")) {
+ if (sampler instanceof JMSSampler) {
+ jmsSelector = sr.getResponseMessage();
+ }
+ }
+ int sampleCounter = 0;
+ int sampleTries = 0;
+ String result = null;
+
+ StringBuilder buffer = new StringBuilder();
+ StringBuilder propBuffer = new StringBuilder();
+
+ do {
+ result = browseQueueForConsumption(sendQueue, jmsSelector,
res, buffer, propBuffer);
+ if (result != null) {
+ sb.append(result);
+ sb.append('\n');
+ sampleCounter++;
+ }
+ sampleTries++;
+ } while ((result != null) && (sampleTries <
getNumberOfSamplesToAggregateAsInt()));
+
+ res.setResponseMessage(sampleCounter + " samples messages
received");
+ res.setResponseData(buffer.toString().getBytes()); // TODO -
charset?
+ res.setResponseHeaders(propBuffer.toString());
+ if (sampleCounter == 0) {
+ res.setResponseCode("404");
+ res.setSuccessful(false);
+ } else {
+ res.setResponseCodeOK();
+ res.setSuccessful(true);
+ }
+ res.setResponseMessage(sampleCounter + " message(s) received
successfully");
+ res.setSamplerData(getNumberOfSamplesToAggregateAsInt() + "
messages expected");
+ res.setSampleCount(sampleCounter);
+ }
+
+ private void handleRequestResponse(SampleResult res) throws
JMSException {
+ TextMessage msg = createMessage();
+ if (!useTemporyQueue()) {
+ LOGGER.debug("NO TEMP QUEUE");
+ msg.setJMSReplyTo(receiveQueue);
+ }
+ LOGGER.debug("Create temp message");
+ Message replyMsg = executor.sendAndReceive(msg,
+ isNonPersistent() ? DeliveryMode.NON_PERSISTENT :
DeliveryMode.PERSISTENT,
+ Integer.parseInt(getPriority()),
Long.parseLong(getExpiration()));
+ res.setRequestHeaders(Utils.messageProperties(msg));
+ if (replyMsg == null) {
+ res.setResponseMessage("No reply message received");
+ } else {
+ if (replyMsg instanceof TextMessage) {
+ res.setResponseData(((TextMessage) replyMsg).getText(),
null);
+ } else {
+ res.setResponseData(replyMsg.toString(), null);
+ }
+ res.setResponseHeaders(Utils.messageProperties(replyMsg));
+ res.setResponseOK();
+ }
+ }
+
+ private String browseQueueForConsumption(Queue queue, String
jmsSelector, SampleResult res, StringBuilder buffer,
+ StringBuilder propBuffer) {
+ String retVal = null;
+ try {
+ QueueReceiver consumer = session.createReceiver(queue,
jmsSelector);
+ Message reply = consumer.receive(Long.valueOf(getTimeout()));
+ LOGGER.debug("Message: " + reply);
+ consumer.close();
+ if (reply != null) {
+ res.setResponseMessage("1 message(s) received
successfully");
+ res.setResponseHeaders(reply.toString());
+ TextMessage msg = (TextMessage) reply;
+ retVal = msg.getText();
+ extractContent(buffer, propBuffer, msg);
+ } else {
+ res.setResponseMessage("No message received");
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ LOGGER.error(ex.getMessage());
+ }
+ return retVal;
+ }
+
+ private void extractContent(StringBuilder buffer, StringBuilder
propBuffer, Message msg) {
+ if (msg != null) {
+ try {
+ if (msg instanceof TextMessage) {
+ buffer.append(((TextMessage) msg).getText());
+ } else if (msg instanceof ObjectMessage) {
+ ObjectMessage objectMessage = (ObjectMessage) msg;
+ if (objectMessage.getObject() != null) {
+
buffer.append(objectMessage.getObject().getClass());
+ } else {
+ buffer.append("object is null");
+ }
+ } else if (msg instanceof BytesMessage) {
+ BytesMessage bytesMessage = (BytesMessage) msg;
+ buffer.append(bytesMessage.getBodyLength() + " bytes
received in BytesMessage");
+ } else if (msg instanceof MapMessage) {
+ MapMessage mapm = (MapMessage) msg;
+ @SuppressWarnings("unchecked") // MapNames are Strings
+ Enumeration<String> enumb = mapm.getMapNames();
+ while (enumb.hasMoreElements()) {
+ String name = enumb.nextElement();
+ Object obj = mapm.getObject(name);
+ buffer.append(name);
+ buffer.append(",");
+ buffer.append(obj.getClass().getCanonicalName());
+ buffer.append(",");
+ buffer.append(obj);
+ buffer.append("\n");
+ }
+ }
+ Utils.messageProperties(propBuffer, msg);
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ }
+
+ private String browseQueueDetails(Queue queue, SampleResult res) {
+ try {
+ String messageBodies = new String("\n==== Browsing Messages
=== \n");
+ // get some queue details
+ QueueBrowser qBrowser = session.createBrowser(queue);
+ // browse the messages
+ Enumeration<?> e = qBrowser.getEnumeration();
+ int numMsgs = 0;
+ // count number of messages
+ String corrID = "";
+ while (e.hasMoreElements()) {
+ TextMessage message = (TextMessage) e.nextElement();
+ corrID = message.getJMSCorrelationID();
+ if (corrID == null) {
+ corrID = message.getJMSMessageID();
+ messageBodies = messageBodies + numMsgs + " -
MessageID: " + corrID + ": " + message.getText()
+ + "\n";
+ } else {
+ messageBodies = messageBodies + numMsgs + " -
CorrelationID: " + corrID + ": " + message.getText()
+ + "\n";
+ }
+ numMsgs++;
+ }
+ res.setResponseMessage(numMsgs + " messages available on the
queue");
+ res.setResponseHeaders(qBrowser.toString());
+ return (messageBodies + queue.getQueueName() + " has " +
numMsgs + " messages");
+ } catch (Exception e) {
+ res.setResponseMessage("Error counting message on the queue");
+ e.printStackTrace();
--- End diff --
Same remarks as above
---