Repository: activemq-artemis
Updated Branches:
  refs/heads/master 558bb477f -> 6a7c549b9


ARTEMIS-2155 disconnect on failure to handle packet


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2a894cbe
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2a894cbe
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2a894cbe

Branch: refs/heads/master
Commit: 2a894cbe276b7f3c65b599ee4a262435ba1e687a
Parents: 558bb47
Author: Justin Bertram <jbert...@apache.org>
Authored: Tue Oct 23 13:06:22 2018 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Tue Oct 30 16:17:08 2018 -0400

----------------------------------------------------------------------
 .../client/ActiveMQClientMessageBundle.java     |  6 ++
 .../core/impl/ActiveMQSessionContext.java       |  4 +-
 .../tests/extras/byteman/PacketFailureTest.java | 95 ++++++++++++++++++++
 3 files changed, 103 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a894cbe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
index 66b8184..8468e84 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientMessageBundle.java
@@ -231,4 +231,10 @@ public interface ActiveMQClientMessageBundle {
 
    @Message(id = 219063, value = "Cannot send a packet while response cache is 
full.")
    IllegalStateException cannotSendPacketWhilstResponseCacheFull();
+
+   @Message(id = 219064, value = "Invalide packet: {0}", format = 
Message.Format.MESSAGE_FORMAT)
+   IllegalStateException invalidPacket(byte type);
+
+   @Message(id = 219065, value = "Failed to handle packet.")
+   RuntimeException failedToHandlePacket(@Cause Exception e);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a894cbe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 3307b4d..ccf10ab 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -1082,11 +1082,11 @@ public class ActiveMQSessionContext extends 
SessionContext {
                   break;
                }
                default: {
-                  throw new IllegalStateException("Invalid packet: " + type);
+                  throw ActiveMQClientMessageBundle.BUNDLE.invalidPacket(type);
                }
             }
          } catch (Exception e) {
-            ActiveMQClientLogger.LOGGER.failedToHandlePacket(e);
+            throw ActiveMQClientMessageBundle.BUNDLE.failedToHandlePacket(e);
          }
 
          sessionChannel.confirm(packet);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a894cbe/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PacketFailureTest.java
----------------------------------------------------------------------
diff --git 
a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PacketFailureTest.java
 
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PacketFailureTest.java
new file mode 100644
index 0000000..8cf5c42
--- /dev/null
+++ 
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/PacketFailureTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class PacketFailureTest extends JMSTestBase {
+
+   private Queue queue;
+   static boolean pause = false;
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      queue = createQueue("TestQueue");
+   }
+
+   @Test(timeout = 20000)
+   @BMRule(
+      name = "blow-up",
+      targetClass = 
"org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext",
+      targetMethod = 
"handleReceivedMessagePacket(org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage)",
+      targetLocation = "ENTRY",
+      action = "throw new Exception()")
+   public void testFailureToHandlePacket() throws Exception {
+      final int MESSAGE_COUNT = 20;
+      Connection sendConnection = null;
+      Connection connection = null;
+      try {
+         ((ActiveMQConnectionFactory)cf).setReconnectAttempts(0);
+         sendConnection = cf.createConnection();
+         final Session sendSession = sendConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         final MessageProducer producer = sendSession.createProducer(queue);
+
+         for (int j = 0; j < MESSAGE_COUNT; j++) {
+            TextMessage message = sendSession.createTextMessage();
+
+            message.setText("Message" + j);
+
+            producer.send(message);
+         }
+
+         producer.close();
+         sendSession.close();
+
+         connection = cf.createConnection();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Message message = consumer.receive(1000);
+         assertNull(message);
+         assertTrue(((ActiveMQMessageConsumer) consumer).isClosed());
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+         if (sendConnection != null) {
+            sendConnection.close();
+         }
+      }
+   }
+}

Reply via email to