Repository: activemq-artemis Updated Branches: refs/heads/master 558bb477f -> 6a7c549b9
ARTEMIS-2155 disconnect on failure to handle packet Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2a894cbe Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2a894cbe Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2a894cbe Branch: refs/heads/master Commit: 2a894cbe276b7f3c65b599ee4a262435ba1e687a Parents: 558bb47 Author: Justin Bertram <jbert...@apache.org> Authored: Tue Oct 23 13:06:22 2018 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Tue Oct 30 16:17:08 2018 -0400 ---------------------------------------------------------------------- .../client/ActiveMQClientMessageBundle.java | 6 ++ .../core/impl/ActiveMQSessionContext.java | 4 +- .../tests/extras/byteman/PacketFailureTest.java | 95 ++++++++++++++++++++ 3 files changed, 103 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a894cbe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java index 66b8184..8468e84 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java @@ -231,4 +231,10 @@ public interface ActiveMQClientMessageBundle { @Message(id = 219063, value = "Cannot send a packet while response cache is full.") IllegalStateException cannotSendPacketWhilstResponseCacheFull(); + + @Message(id = 219064, value = "Invalide packet: {0}", format = Message.Format.MESSAGE_FORMAT) + IllegalStateException invalidPacket(byte type); + + @Message(id = 219065, value = "Failed to handle packet.") + RuntimeException failedToHandlePacket(@Cause Exception e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a894cbe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 3307b4d..ccf10ab 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -1082,11 +1082,11 @@ public class ActiveMQSessionContext extends SessionContext { break; } default: { - throw new IllegalStateException("Invalid packet: " + type); + throw ActiveMQClientMessageBundle.BUNDLE.invalidPacket(type); } } } catch (Exception e) { - ActiveMQClientLogger.LOGGER.failedToHandlePacket(e); + throw ActiveMQClientMessageBundle.BUNDLE.failedToHandlePacket(e); } sessionChannel.confirm(packet); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a894cbe/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PacketFailureTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PacketFailureTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PacketFailureTest.java new file mode 100644 index 0000000..8cf5c42 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PacketFailureTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class PacketFailureTest extends JMSTestBase { + + private Queue queue; + static boolean pause = false; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + queue = createQueue("TestQueue"); + } + + @Test(timeout = 20000) + @BMRule( + name = "blow-up", + targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext", + targetMethod = "handleReceivedMessagePacket(org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage)", + targetLocation = "ENTRY", + action = "throw new Exception()") + public void testFailureToHandlePacket() throws Exception { + final int MESSAGE_COUNT = 20; + Connection sendConnection = null; + Connection connection = null; + try { + ((ActiveMQConnectionFactory)cf).setReconnectAttempts(0); + sendConnection = cf.createConnection(); + final Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = sendSession.createProducer(queue); + + for (int j = 0; j < MESSAGE_COUNT; j++) { + TextMessage message = sendSession.createTextMessage(); + + message.setText("Message" + j); + + producer.send(message); + } + + producer.close(); + sendSession.close(); + + connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + Message message = consumer.receive(1000); + assertNull(message); + assertTrue(((ActiveMQMessageConsumer) consumer).isClosed()); + } finally { + if (connection != null) { + connection.close(); + } + if (sendConnection != null) { + sendConnection.close(); + } + } + } +}