Repository: activemq-artemis Updated Branches: refs/heads/2.6.x a7dbd5711 -> f4734868a
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 37564b5..f5756f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; @@ -76,11 +77,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; @@ -313,7 +316,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -342,7 +345,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = request.isRequiresResponse(); session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -351,7 +354,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = request.isRequiresResponse(); session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -361,7 +364,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue(), request.isAutoCreated()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -373,7 +376,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString()); } if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -385,7 +388,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue()); } if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -393,7 +396,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet; session.deleteQueue(request.getQueueName()); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); break; } case SESS_QUEUEQUERY: { @@ -453,62 +456,62 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_COMMIT: { requiresResponse = true; session.commit(); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); break; } case SESS_ROLLBACK: { requiresResponse = true; session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered()); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); break; } case SESS_XA_COMMIT: { requiresResponse = true; SessionXACommitMessage message = (SessionXACommitMessage) packet; session.xaCommit(message.getXid(), message.isOnePhase()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_END: { requiresResponse = true; SessionXAEndMessage message = (SessionXAEndMessage) packet; session.xaEnd(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_FORGET: { requiresResponse = true; SessionXAForgetMessage message = (SessionXAForgetMessage) packet; session.xaForget(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_JOIN: { requiresResponse = true; SessionXAJoinMessage message = (SessionXAJoinMessage) packet; session.xaJoin(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_RESUME: { requiresResponse = true; SessionXAResumeMessage message = (SessionXAResumeMessage) packet; session.xaResume(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_ROLLBACK: { requiresResponse = true; SessionXARollbackMessage message = (SessionXARollbackMessage) packet; session.xaRollback(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_START: { requiresResponse = true; SessionXAStartMessage message = (SessionXAStartMessage) packet; session.xaStart(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_FAILED: { @@ -521,14 +524,14 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_XA_SUSPEND: { requiresResponse = true; session.xaSuspend(); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_PREPARE: { requiresResponse = true; SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet; session.xaPrepare(message.getXid()); - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + response = createSessionXAResponseMessage(packet); break; } case SESS_XA_INDOUBT_XIDS: { @@ -557,14 +560,14 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_STOP: { requiresResponse = true; session.stop(); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); break; } case SESS_CLOSE: { requiresResponse = true; session.close(false); // removeConnectionListeners(); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); flush = true; closeChannel = true; break; @@ -574,7 +577,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); session.individualAcknowledge(message.getConsumerID(), message.getMessageID()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } break; } @@ -582,7 +585,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet; session.closeConsumer(message.getConsumerID()); - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); break; } case SESS_FORCE_CONSUMER_DELIVERY: { @@ -591,7 +594,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { break; } case PacketImpl.SESS_ADD_METADATA: { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet; session.addMetaData(message.getKey(), message.getData()); break; @@ -600,7 +603,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet; if (message.isRequiresConfirmations()) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } session.addMetaData(message.getKey(), message.getData()); break; @@ -609,7 +612,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet; if (session.addUniqueMetaData(message.getKey(), message.getData())) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } else { response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData())); } @@ -617,15 +620,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, flush, closeChannel); } finally { @@ -633,6 +636,26 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } + private Packet createNullResponseMessage(Packet packet) { + final Packet response; + if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) { + response = new NullResponseMessage(); + } else { + response = new NullResponseMessage_V2(packet.getCorrelationID()); + } + return response; + } + + private Packet createSessionXAResponseMessage(Packet packet) { + Packet response; + if (packet.isResponseAsync()) { + response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), false, XAResource.XA_OK, null); + } else { + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); + } + return response; + } + private void onSessionAcknowledge(Packet packet) { this.storageManager.setContext(session.getSessionContext()); try { @@ -643,18 +666,18 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); this.session.acknowledge(message.getConsumerID(), message.getMessageID()); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -672,18 +695,18 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct); if (requiresResponse) { - response = new NullResponseMessage(); + response = createNullResponseMessage(packet); } } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -700,15 +723,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet; session.requestProducerCredits(message.getAddress(), message.getCredits()); } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -725,15 +748,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet; session.receiveConsumerCredits(message.getConsumerID(), message.getCredits()); } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -742,50 +765,68 @@ public class ServerSessionPacketHandler implements ChannelHandler { } - private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException e, + private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(Packet packet, + ActiveMQIOErrorException e, boolean requiresResponse, Packet response, ServerSession session) { session.markTXFailed(e); if (requiresResponse) { logger.debug("Sending exception to client", e); - response = new ActiveMQExceptionMessage(e); + response = convertToExceptionPacket(packet, e); } else { ActiveMQServerLogger.LOGGER.caughtException(e); } return response; } - private static Packet onActiveMQXAExceptionWhileHandlePacket(ActiveMQXAException e, + private static Packet onActiveMQXAExceptionWhileHandlePacket(Packet packet, + ActiveMQXAException e, boolean requiresResponse, Packet response) { if (requiresResponse) { logger.debug("Sending exception to client", e); - response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage()); + if (packet.isResponseAsync()) { + response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), true, e.errorCode, e.getMessage()); + } else { + response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage()); + } } else { ActiveMQServerLogger.LOGGER.caughtXaException(e); } return response; } - private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached e, + private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(Packet packet, + ActiveMQQueueMaxConsumerLimitReached e, boolean requiresResponse, Packet response) { if (requiresResponse) { logger.debug("Sending exception to client", e); - response = new ActiveMQExceptionMessage(e); + response = convertToExceptionPacket(packet, e); } else { ActiveMQServerLogger.LOGGER.caughtException(e); } return response; } - private static Packet onActiveMQExceptionWhileHandlePacket(ActiveMQException e, + private static Packet convertToExceptionPacket(Packet packet, ActiveMQException e) { + Packet response; + if (packet.isResponseAsync()) { + response = new ActiveMQExceptionMessage_V2(packet.getCorrelationID(), e); + } else { + response = new ActiveMQExceptionMessage(e); + } + return response; + } + + private static Packet onActiveMQExceptionWhileHandlePacket(Packet packet, + ActiveMQException e, boolean requiresResponse, Packet response) { if (requiresResponse) { logger.debug("Sending exception to client", e); - response = new ActiveMQExceptionMessage(e); + response = convertToExceptionPacket(packet, e); } else { if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) { logger.debug("Caught exception", e); @@ -796,7 +837,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { return response; } - private static Packet onCatchThrowableWhileHandlePacket(Throwable t, + private static Packet onCatchThrowableWhileHandlePacket(Packet packet, + Throwable t, boolean requiresResponse, Packet response, ServerSession session) { @@ -805,7 +847,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t); ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException(); activeMQInternalErrorException.initCause(t); - response = new ActiveMQExceptionMessage(activeMQInternalErrorException); + response = convertToExceptionPacket(packet, activeMQInternalErrorException); } else { ActiveMQServerLogger.LOGGER.caughtException(t); } @@ -827,12 +869,11 @@ public class ServerSessionPacketHandler implements ChannelHandler { public void onError(final int errorCode, final String errorMessage) { ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); - ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage)); - - doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel); + Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage)); + doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel); if (logger.isTraceEnabled()) { - logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage); + logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6e26fa6..1f376d4 100644 --- a/pom.xml +++ b/pom.xml @@ -125,7 +125,7 @@ <activemq.version.majorVersion>1</activemq.version.majorVersion> <activemq.version.minorVersion>0</activemq.version.minorVersion> <activemq.version.microVersion>0</activemq.version.microVersion> - <activemq.version.incrementingVersion>129,128,127,126,125,124,123,122</activemq.version.incrementingVersion> + <activemq.version.incrementingVersion>130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion> <activemq.version.versionTag>${project.version}</activemq.version.versionTag> <ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index e4afb5b..c7ed869 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2; @@ -315,6 +316,11 @@ public class BackupSyncDelay implements Interceptor { } @Override + public void setResponseHandler(ResponseHandler handler) { + throw new UnsupportedOperationException(); + } + + @Override public void flushConfirmations() { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java index d3951f2..3020310 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java @@ -167,7 +167,24 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase { @Override public void onException(Message message, Exception exception) { - // TODO Auto-generated method stub + latch.countDown(); + try { + switch (call) { + case 0: + context.rollback(); + break; + case 1: + context.commit(); + break; + case 2: + context.close(); + break; + default: + throw new IllegalArgumentException("call code " + call); + } + } catch (Exception error1) { + this.error = error1; + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java index d7137ae..7e121f3 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java @@ -16,12 +16,23 @@ */ package org.apache.activemq.artemis.jms.tests; +import static org.junit.Assert.fail; + +import javax.jms.CompletionListener; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; import javax.jms.IllegalStateException; import javax.jms.JMSSecurityException; +import javax.jms.Message; +import javax.jms.MessageProducer; import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; @@ -67,9 +78,9 @@ public class SecurityTest extends JMSTestCase { } - /** - * Login with no user, no password Should allow login (equivalent to guest) - */ + /** + * Login with no user, no password Should allow login (equivalent to guest) + */ @Test public void testLoginNoUserNoPassword() throws Exception { createConnection(); @@ -169,6 +180,71 @@ public class SecurityTest extends JMSTestCase { } } + /** + * Login with valid user and password + * But try send to address not authorised - Persistent + * Should not allow and should throw exception + */ + @Test + public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + Connection connection = connectionFactory.createConnection("guest", "guest"); + Session session = connection.createSession(); + Destination destination = session.createQueue("guest.cannot.send"); + MessageProducer messageProducer = session.createProducer(destination); + try { + messageProducer.send(session.createTextMessage("hello")); + fail("JMSSecurityException expected as guest is not allowed to send"); + } catch (JMSSecurityException activeMQSecurityException) { + //pass + } + connection.close(); + } + + /** + * Login with valid user and password + * But try send to address not authorised - Non Persistent. + * Should have same behaviour as Persistent with exception on send. + */ + @Test + public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connectionFactory.setConfirmationWindowSize(100); + connectionFactory.setBlockOnDurableSend(false); + connectionFactory.setBlockOnNonDurableSend(false); + Connection connection = connectionFactory.createConnection("guest", "guest"); + Session session = connection.createSession(); + Destination destination = session.createQueue("guest.cannot.send"); + MessageProducer messageProducer = session.createProducer(destination); + messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + try { + AtomicReference<Exception> e = new AtomicReference<>(); + // messageProducer.send(session.createTextMessage("hello")); + + CountDownLatch countDownLatch = new CountDownLatch(1); + messageProducer.send(session.createTextMessage("hello"), new CompletionListener() { + @Override + public void onCompletion(Message message) { + countDownLatch.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + e.set(exception); + countDownLatch.countDown(); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + if (e.get() != null) { + throw e.get(); + } + fail("JMSSecurityException expected as guest is not allowed to send"); + } catch (JMSSecurityException activeMQSecurityException) { + activeMQSecurityException.printStackTrace(); + } + connection.close(); + } + /* Now some client id tests */ /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c9d8697a/tests/jms-tests/src/test/resources/broker.xml ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/resources/broker.xml b/tests/jms-tests/src/test/resources/broker.xml index 733e8c3..644ce83 100644 --- a/tests/jms-tests/src/test/resources/broker.xml +++ b/tests/jms-tests/src/test/resources/broker.xml @@ -54,6 +54,16 @@ <permission type="browse" roles="guest,def"/> <permission type="send" roles="guest,def"/> </security-setting> + + <security-setting match="guest.cannot.send"> + <permission type="createDurableQueue" roles="guest,def"/> + <permission type="deleteDurableQueue" roles="guest,def"/> + <permission type="createNonDurableQueue" roles="guest,def"/> + <permission type="deleteNonDurableQueue" roles="guest,def"/> + <permission type="consume" roles="guest,def"/> + <permission type="browse" roles="guest,def"/> + <permission type="send" roles="def"/> + </security-setting> </security-settings> </core> </configuration> \ No newline at end of file
