Repository: activemq-artemis Updated Branches: refs/heads/2.6.x da7fb8903 -> 2242d2447
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java index ee4223c..ae1d270 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java @@ -34,8 +34,6 @@ import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicPublisher; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; @@ -566,14 +564,6 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To private final ActiveMQMessageProducer producer; /** - * It's possible that this SendAcknowledgementHandler might be called twice due to subsequent - * packet confirmations on the same connection. Using this boolean avoids that possibility. - * A new CompletionListenerWrapper is created for each message sent so once it's called once - * it will never be called again. - */ - private AtomicBoolean active = new AtomicBoolean(true); - - /** * @param jmsMessage * @param producer */ @@ -587,63 +577,27 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To @Override public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) { - if (active.get()) { - if (jmsMessage instanceof StreamMessage) { - try { - ((StreamMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? - } - } - if (jmsMessage instanceof BytesMessage) { - try { - ((BytesMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? - } - } - + if (jmsMessage instanceof StreamMessage) { try { - producer.connection.getThreadAwareContext().setCurrentThread(true); - completionListener.onCompletion(jmsMessage); - } finally { - producer.connection.getThreadAwareContext().clearCurrentThread(true); - active.set(false); + ((StreamMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? } } - } - - @Override - public void sendFailed(org.apache.activemq.artemis.api.core.Message clientMessage, Exception exception) { - if (active.get()) { - if (jmsMessage instanceof StreamMessage) { - try { - ((StreamMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? - } - } - if (jmsMessage instanceof BytesMessage) { - try { - ((BytesMessage) jmsMessage).reset(); - } catch (JMSException e) { - // HORNETQ-1209 XXX ignore? - } - } - + if (jmsMessage instanceof BytesMessage) { try { - producer.connection.getThreadAwareContext().setCurrentThread(true); - if (exception instanceof ActiveMQException) { - exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQException) exception); - } else if (exception instanceof ActiveMQInterruptedException) { - exception = JMSExceptionHelper.convertFromActiveMQException((ActiveMQInterruptedException) exception); - } - completionListener.onException(jmsMessage, exception); - } finally { - producer.connection.getThreadAwareContext().clearCurrentThread(true); - active.set(false); + ((BytesMessage) jmsMessage).reset(); + } catch (JMSException e) { + // HORNETQ-1209 XXX ignore? } } + + try { + producer.connection.getThreadAwareContext().setCurrentThread(true); + completionListener.onCompletion(jmsMessage); + } finally { + producer.connection.getThreadAwareContext().clearCurrentThread(true); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index 0428abe..d38f45f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -53,7 +53,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; @@ -91,10 +90,8 @@ public class ServerPacketDecoder extends ClientPacketDecoder { if (connection.isVersionBeforeAddressChange()) { sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools)); - } else if (connection.isVersionBeforeAsyncResponseChange()) { - sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools)); } else { - sendMessage = new SessionSendMessage_V2(new CoreMessage(this.coreMessageObjectPools)); + sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools)); } sendMessage.decode(in); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 16a87d8..37564b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -41,7 +41,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueu import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; @@ -77,13 +76,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAG import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAJoinMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResponseMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAResumeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXARollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; @@ -316,7 +313,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues()); if (requiresResponse) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } break; } @@ -345,7 +342,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = request.isRequiresResponse(); session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated()); if (requiresResponse) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } break; } @@ -354,7 +351,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = request.isRequiresResponse(); session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable()); if (requiresResponse) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } break; } @@ -364,7 +361,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue(), request.isAutoCreated()); if (requiresResponse) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } break; } @@ -376,7 +373,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { session.createSharedQueue(request.getAddress(), request.getQueueName(), request.isDurable(), request.getFilterString()); } if (requiresResponse) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } break; } @@ -388,7 +385,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue()); } if (requiresResponse) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } break; } @@ -396,7 +393,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet; session.deleteQueue(request.getQueueName()); - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); break; } case SESS_QUEUEQUERY: { @@ -456,62 +453,62 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_COMMIT: { requiresResponse = true; session.commit(); - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); break; } case SESS_ROLLBACK: { requiresResponse = true; session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered()); - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); break; } case SESS_XA_COMMIT: { requiresResponse = true; SessionXACommitMessage message = (SessionXACommitMessage) packet; session.xaCommit(message.getXid(), message.isOnePhase()); - response = createSessionXAResponseMessage(packet); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); break; } case SESS_XA_END: { requiresResponse = true; SessionXAEndMessage message = (SessionXAEndMessage) packet; session.xaEnd(message.getXid()); - response = createSessionXAResponseMessage(packet); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); break; } case SESS_XA_FORGET: { requiresResponse = true; SessionXAForgetMessage message = (SessionXAForgetMessage) packet; session.xaForget(message.getXid()); - response = createSessionXAResponseMessage(packet); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); break; } case SESS_XA_JOIN: { requiresResponse = true; SessionXAJoinMessage message = (SessionXAJoinMessage) packet; session.xaJoin(message.getXid()); - response = createSessionXAResponseMessage(packet); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); break; } case SESS_XA_RESUME: { requiresResponse = true; SessionXAResumeMessage message = (SessionXAResumeMessage) packet; session.xaResume(message.getXid()); - response = createSessionXAResponseMessage(packet); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); break; } case SESS_XA_ROLLBACK: { requiresResponse = true; SessionXARollbackMessage message = (SessionXARollbackMessage) packet; session.xaRollback(message.getXid()); - response = createSessionXAResponseMessage(packet); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); break; } case SESS_XA_START: { requiresResponse = true; SessionXAStartMessage message = (SessionXAStartMessage) packet; session.xaStart(message.getXid()); - response = createSessionXAResponseMessage(packet); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); break; } case SESS_XA_FAILED: { @@ -524,14 +521,14 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_XA_SUSPEND: { requiresResponse = true; session.xaSuspend(); - response = createSessionXAResponseMessage(packet); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); break; } case SESS_XA_PREPARE: { requiresResponse = true; SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet; session.xaPrepare(message.getXid()); - response = createSessionXAResponseMessage(packet); + response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); break; } case SESS_XA_INDOUBT_XIDS: { @@ -560,14 +557,14 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_STOP: { requiresResponse = true; session.stop(); - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); break; } case SESS_CLOSE: { requiresResponse = true; session.close(false); // removeConnectionListeners(); - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); flush = true; closeChannel = true; break; @@ -577,7 +574,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); session.individualAcknowledge(message.getConsumerID(), message.getMessageID()); if (requiresResponse) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } break; } @@ -585,7 +582,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet; session.closeConsumer(message.getConsumerID()); - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); break; } case SESS_FORCE_CONSUMER_DELIVERY: { @@ -594,7 +591,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { break; } case PacketImpl.SESS_ADD_METADATA: { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet; session.addMetaData(message.getKey(), message.getData()); break; @@ -603,7 +600,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet; if (message.isRequiresConfirmations()) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } session.addMetaData(message.getKey(), message.getData()); break; @@ -612,7 +609,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet; if (session.addUniqueMetaData(message.getKey(), message.getData())) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } else { response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData())); } @@ -620,15 +617,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); } sendResponse(packet, response, flush, closeChannel); } finally { @@ -636,26 +633,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } - private Packet createNullResponseMessage(Packet packet) { - final Packet response; - if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) { - response = new NullResponseMessage(); - } else { - response = new NullResponseMessage_V2(packet.getCorrelationID()); - } - return response; - } - - private Packet createSessionXAResponseMessage(Packet packet) { - Packet response; - if (packet.isResponseAsync()) { - response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), false, XAResource.XA_OK, null); - } else { - response = new SessionXAResponseMessage(false, XAResource.XA_OK, null); - } - return response; - } - private void onSessionAcknowledge(Packet packet) { this.storageManager.setContext(session.getSessionContext()); try { @@ -666,18 +643,18 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); this.session.acknowledge(message.getConsumerID(), message.getMessageID()); if (requiresResponse) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -695,18 +672,18 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = message.isRequiresResponse(); this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage()), this.direct); if (requiresResponse) { - response = createNullResponseMessage(packet); + response = new NullResponseMessage(); } } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -723,15 +700,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet; session.requestProducerCredits(message.getAddress(), message.getCredits()); } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -748,15 +725,15 @@ public class ServerSessionPacketHandler implements ChannelHandler { SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet; session.receiveConsumerCredits(message.getConsumerID(), message.getCredits()); } catch (ActiveMQIOErrorException e) { - response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session); + response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session); } catch (ActiveMQXAException e) { - response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response); } catch (ActiveMQQueueMaxConsumerLimitReached e) { - response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response); } catch (ActiveMQException e) { - response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response); + response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response); } catch (Throwable t) { - response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session); + response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session); } sendResponse(packet, response, false, false); } finally { @@ -765,68 +742,50 @@ public class ServerSessionPacketHandler implements ChannelHandler { } - private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(Packet packet, - ActiveMQIOErrorException e, + private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException e, boolean requiresResponse, Packet response, ServerSession session) { session.markTXFailed(e); if (requiresResponse) { logger.debug("Sending exception to client", e); - response = convertToExceptionPacket(packet, e); + response = new ActiveMQExceptionMessage(e); } else { ActiveMQServerLogger.LOGGER.caughtException(e); } return response; } - private static Packet onActiveMQXAExceptionWhileHandlePacket(Packet packet, - ActiveMQXAException e, + private static Packet onActiveMQXAExceptionWhileHandlePacket(ActiveMQXAException e, boolean requiresResponse, Packet response) { if (requiresResponse) { logger.debug("Sending exception to client", e); - if (packet.isResponseAsync()) { - response = new SessionXAResponseMessage_V2(packet.getCorrelationID(), true, e.errorCode, e.getMessage()); - } else { - response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage()); - } + response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage()); } else { ActiveMQServerLogger.LOGGER.caughtXaException(e); } return response; } - private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(Packet packet, - ActiveMQQueueMaxConsumerLimitReached e, + private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached e, boolean requiresResponse, Packet response) { if (requiresResponse) { logger.debug("Sending exception to client", e); - response = convertToExceptionPacket(packet, e); + response = new ActiveMQExceptionMessage(e); } else { ActiveMQServerLogger.LOGGER.caughtException(e); } return response; } - private static Packet convertToExceptionPacket(Packet packet, ActiveMQException e) { - Packet response; - if (packet.isResponseAsync()) { - response = new ActiveMQExceptionMessage_V2(packet.getCorrelationID(), e); - } else { - response = new ActiveMQExceptionMessage(e); - } - return response; - } - - private static Packet onActiveMQExceptionWhileHandlePacket(Packet packet, - ActiveMQException e, + private static Packet onActiveMQExceptionWhileHandlePacket(ActiveMQException e, boolean requiresResponse, Packet response) { if (requiresResponse) { logger.debug("Sending exception to client", e); - response = convertToExceptionPacket(packet, e); + response = new ActiveMQExceptionMessage(e); } else { if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) { logger.debug("Caught exception", e); @@ -837,8 +796,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { return response; } - private static Packet onCatchThrowableWhileHandlePacket(Packet packet, - Throwable t, + private static Packet onCatchThrowableWhileHandlePacket(Throwable t, boolean requiresResponse, Packet response, ServerSession session) { @@ -847,7 +805,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { ActiveMQServerLogger.LOGGER.sendingUnexpectedExceptionToClient(t); ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException(); activeMQInternalErrorException.initCause(t); - response = convertToExceptionPacket(packet, activeMQInternalErrorException); + response = new ActiveMQExceptionMessage(activeMQInternalErrorException); } else { ActiveMQServerLogger.LOGGER.caughtException(t); } @@ -869,11 +827,12 @@ public class ServerSessionPacketHandler implements ChannelHandler { public void onError(final int errorCode, final String errorMessage) { ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); - Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage)); - doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel); + ActiveMQExceptionMessage exceptionMessage = new ActiveMQExceptionMessage(ActiveMQExceptionType.createException(errorCode, errorMessage)); + + doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel); if (logger.isTraceEnabled()) { - logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket); + logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionMessage); } } @@ -893,8 +852,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { final Packet response, final boolean flush, final boolean closeChannel) { - // don't confirm if the response is an exception - if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) { + if (confirmPacket != null) { channel.confirm(confirmPacket); if (flush) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1f376d4..6e26fa6 100644 --- a/pom.xml +++ b/pom.xml @@ -125,7 +125,7 @@ <activemq.version.majorVersion>1</activemq.version.majorVersion> <activemq.version.minorVersion>0</activemq.version.minorVersion> <activemq.version.microVersion>0</activemq.version.microVersion> - <activemq.version.incrementingVersion>130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion> + <activemq.version.incrementingVersion>129,128,127,126,125,124,123,122</activemq.version.incrementingVersion> <activemq.version.versionTag>${project.version}</activemq.version.versionTag> <ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index c7ed869..e4afb5b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -25,7 +25,6 @@ import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.ResponseHandler; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2; @@ -316,11 +315,6 @@ public class BackupSyncDelay implements Interceptor { } @Override - public void setResponseHandler(ResponseHandler handler) { - throw new UnsupportedOperationException(); - } - - @Override public void flushConfirmations() { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java index 3020310..d3951f2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/JmsProducerCompletionListenerTest.java @@ -167,24 +167,7 @@ public class JmsProducerCompletionListenerTest extends JMSTestBase { @Override public void onException(Message message, Exception exception) { - latch.countDown(); - try { - switch (call) { - case 0: - context.rollback(); - break; - case 1: - context.commit(); - break; - case 2: - context.close(); - break; - default: - throw new IllegalArgumentException("call code " + call); - } - } catch (Exception error1) { - this.error = error1; - } + // TODO Auto-generated method stub } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java index 851dbe0..d7137ae 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/SecurityTest.java @@ -16,28 +16,12 @@ */ package org.apache.activemq.artemis.jms.tests; -import static org.junit.Assert.fail; - -import javax.jms.BytesMessage; -import javax.jms.CompletionListener; import javax.jms.Connection; import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; import javax.jms.IllegalStateException; -import javax.jms.JMSContext; -import javax.jms.JMSProducer; import javax.jms.JMSSecurityException; -import javax.jms.Message; -import javax.jms.MessageProducer; import javax.jms.Session; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; @@ -83,9 +67,9 @@ public class SecurityTest extends JMSTestCase { } - /** - * Login with no user, no password Should allow login (equivalent to guest) - */ + /** + * Login with no user, no password Should allow login (equivalent to guest) + */ @Test public void testLoginNoUserNoPassword() throws Exception { createConnection(); @@ -185,173 +169,6 @@ public class SecurityTest extends JMSTestCase { } } - /** - * Login with valid user and password - * But try send to address not authorised - Persistent - * Should not allow and should throw exception - */ - @Test - public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception { - SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); - if (getJmsServer().locateQueue(queueName) == null) { - getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); - } - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); - Connection connection = connectionFactory.createConnection("guest", "guest"); - Session session = connection.createSession(); - Destination destination = session.createQueue(queueName.toString()); - MessageProducer messageProducer = session.createProducer(destination); - try { - messageProducer.send(session.createTextMessage("hello")); - fail("JMSSecurityException expected as guest is not allowed to send"); - } catch (JMSSecurityException activeMQSecurityException) { - //pass - } - connection.close(); - } - - /** - * Login with valid user and password - * But try send to address not authorised - Non Persistent. - * Should have same behaviour as Persistent with exception on send. - */ - @Test - public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent() throws Exception { - SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); - if (getJmsServer().locateQueue(queueName) == null) { - getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); - } - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); - connectionFactory.setConfirmationWindowSize(100); - connectionFactory.setBlockOnDurableSend(false); - connectionFactory.setBlockOnNonDurableSend(false); - Connection connection = connectionFactory.createConnection("guest", "guest"); - Session session = connection.createSession(); - Destination destination = session.createQueue(queueName.toString()); - MessageProducer messageProducer = session.createProducer(destination); - messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - try { - AtomicReference<Exception> e = new AtomicReference<>(); - - CountDownLatch countDownLatch = new CountDownLatch(1); - messageProducer.send(session.createTextMessage("hello"), new CompletionListener() { - @Override - public void onCompletion(Message message) { - countDownLatch.countDown(); - } - - @Override - public void onException(Message message, Exception exception) { - e.set(exception); - countDownLatch.countDown(); - } - }); - countDownLatch.await(10, TimeUnit.SECONDS); - if (e.get() != null) { - throw e.get(); - } - fail("JMSSecurityException expected as guest is not allowed to send"); - } catch (JMSSecurityException activeMQSecurityException) { - activeMQSecurityException.printStackTrace(); - } finally { - connection.close(); - } - } - - /** - * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using JMS 2 API. - */ - @Test - public void testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistentJMS2() throws Exception { - SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); - if (getJmsServer().locateQueue(queueName) == null) { - getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); - } - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); - connectionFactory.setConfirmationWindowSize(100); - connectionFactory.setBlockOnDurableSend(false); - connectionFactory.setBlockOnNonDurableSend(false); - JMSContext context = connectionFactory.createContext("guest", "guest"); - Destination destination = context.createQueue(queueName.toString()); - JMSProducer messageProducer = context.createProducer(); - messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - try { - AtomicReference<Exception> e = new AtomicReference<>(); - - CountDownLatch countDownLatch = new CountDownLatch(1); - messageProducer.setAsync(new CompletionListener() { - @Override - public void onCompletion(Message message) { - countDownLatch.countDown(); - } - - @Override - public void onException(Message message, Exception exception) { - e.set(exception); - countDownLatch.countDown(); - } - }); - messageProducer.send(destination, context.createTextMessage("hello")); - countDownLatch.await(10, TimeUnit.SECONDS); - if (e.get() != null) { - throw e.get(); - } - fail("JMSSecurityException expected as guest is not allowed to send"); - } catch (JMSSecurityException activeMQSecurityException) { - activeMQSecurityException.printStackTrace(); - } finally { - context.close(); - } - } - - /** - * Same as testLoginValidUserAndPasswordButNotAuthorisedToSendNonPersistent, but using a large message. - */ - @Test - public void testLoginValidUserAndPasswordButNotAuthorisedToSendLargeNonPersistent() throws Exception { - SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send"); - if (getJmsServer().locateQueue(queueName) == null) { - getJmsServer().createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false); - } - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); - connectionFactory.setConfirmationWindowSize(100); - connectionFactory.setBlockOnDurableSend(false); - connectionFactory.setBlockOnNonDurableSend(false); - connectionFactory.setMinLargeMessageSize(1024); - Connection connection = connectionFactory.createConnection("guest", "guest"); - Session session = connection.createSession(); - Destination destination = session.createQueue(queueName.toString()); - MessageProducer messageProducer = session.createProducer(destination); - messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - try { - AtomicReference<Exception> e = new AtomicReference<>(); - - CountDownLatch countDownLatch = new CountDownLatch(1); - BytesMessage message = session.createBytesMessage(); - message.writeBytes(new byte[10 * 1024]); - messageProducer.send(message, new CompletionListener() { - @Override - public void onCompletion(Message message) { - countDownLatch.countDown(); - } - - @Override - public void onException(Message message, Exception exception) { - e.set(exception); - countDownLatch.countDown(); - } - }); - countDownLatch.await(10, TimeUnit.SECONDS); - if (e.get() != null) { - throw e.get(); - } - fail("JMSSecurityException expected as guest is not allowed to send"); - } catch (JMSSecurityException activeMQSecurityException) { - activeMQSecurityException.printStackTrace(); - } - connection.close(); - } - /* Now some client id tests */ /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2242d244/tests/jms-tests/src/test/resources/broker.xml ---------------------------------------------------------------------- diff --git a/tests/jms-tests/src/test/resources/broker.xml b/tests/jms-tests/src/test/resources/broker.xml index 644ce83..733e8c3 100644 --- a/tests/jms-tests/src/test/resources/broker.xml +++ b/tests/jms-tests/src/test/resources/broker.xml @@ -54,16 +54,6 @@ <permission type="browse" roles="guest,def"/> <permission type="send" roles="guest,def"/> </security-setting> - - <security-setting match="guest.cannot.send"> - <permission type="createDurableQueue" roles="guest,def"/> - <permission type="deleteDurableQueue" roles="guest,def"/> - <permission type="createNonDurableQueue" roles="guest,def"/> - <permission type="deleteNonDurableQueue" roles="guest,def"/> - <permission type="consume" roles="guest,def"/> - <permission type="browse" roles="guest,def"/> - <permission type="send" roles="def"/> - </security-setting> </security-settings> </core> </configuration> \ No newline at end of file
