This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new c0b12b1  ARTEMIS-2969 / ARTEMIS-2937 Controlling connecting state on 
AMQP Broker Connection
c0b12b1 is described below

commit c0b12b14c88995a1ca2d3e2c2220aeca220d94b7
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Mon Nov 2 07:34:35 2020 -0500

    ARTEMIS-2969 / ARTEMIS-2937 Controlling connecting state on AMQP Broker 
Connection
    
    - Fixed an issue where I needed to set connection to null after closing it
    - Added more tests on QpidDispatchPeerTest (tests i would have done 
manually, and reproduced a few issues along the way)
---
 .../apache/activemq/artemis/utils/ExecuteUtil.java |  7 +++++
 .../amqp/connect/AMQPBrokerConnection.java         | 33 +++++++++++++++-----
 .../amqp/connect/QpidDispatchPeerTest.java         | 35 +++++++++++++++++-----
 3 files changed, 61 insertions(+), 14 deletions(-)

diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java
index 3a36425..19a046a 100644
--- 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.lang.reflect.Field;
 import java.util.concurrent.TimeUnit;
 
 public class ExecuteUtil {
@@ -38,6 +39,12 @@ public class ExecuteUtil {
          inputStreamReader.join();
       }
 
+      public int pid() throws Exception {
+         Field pidField = process.getClass().getDeclaredField("pid");
+         pidField.setAccessible(true);
+         return (int)pidField.get(process);
+      }
+
       public int waitFor(long timeout, TimeUnit unit) throws 
InterruptedException {
          if (!process.waitFor(timeout, unit)) {
             logger.warn("could not complete execution in time");
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 181d8ed..1e6d52b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -92,6 +92,7 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
    private volatile boolean started = false;
    private final AMQPBrokerConnectionManager bridgeManager;
    private int retryCounter = 0;
+   private boolean connecting = false;
    private volatile ScheduledFuture reconnectFuture;
    private Set<Queue> senders = new HashSet<>();
    private Set<Queue> receivers = new HashSet<>();
@@ -209,6 +210,8 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
 
    private void doConnect() {
       try {
+         connecting = true;
+
          List<TransportConfiguration> configurationList = 
brokerConnectConfiguration.getTransportConfigurations();
 
          TransportConfiguration tpConfig = configurationList.get(0);
@@ -283,6 +286,8 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
          protonRemotingConnection.getAmqpConnection().flush();
 
          bridgeManager.connected(connection, this);
+
+         connecting = false;
       } catch (Throwable e) {
          error(e);
       }
@@ -464,6 +469,7 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
    }
 
    protected void error(Throwable e) {
+      connecting = false;
       logger.warn(e.getMessage(), e);
       redoConnection();
    }
@@ -511,16 +517,29 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
    }
 
    private void redoConnection() {
-      try {
-         if (connection != null) {
-            connection.close();
+
+      // we need to use the connectExecutor to initiate a redoConnection
+      // otherwise we would need to add synchronized blocks along this class
+      // to control when connecting becomes true and when it becomes false
+      // keeping a single executor thread to this purpose would simplify things
+      connectExecutor.execute(() -> {
+         if (connecting) {
+            logger.debug("Broker connection " + this.getName() + " was already 
in retry mode, exception or retry no captured");
+            return;
          }
-      } catch (Throwable e) {
-         logger.warn(e.getMessage(), e);
-      }
+         connecting = true;
 
-      retryConnection();
+         try {
+            if (connection != null) {
+               connection.close();
+               connection = null;
+            }
+         } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+         }
 
+         retryConnection();
+      });
    }
 
    @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
index 369f030..0ab1e45 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
@@ -83,32 +83,49 @@ public class QpidDispatchPeerTest extends 
AmqpClientTestSupport {
       qpidProcess.kill();
    }
 
+   public void pauseThenKill(int timeToWait) throws Exception {
+      int pid = qpidProcess.pid();
+      int result = ExecuteUtil.runCommand(true, "kill", "-STOP", 
Long.toString(pid));
+      Assert.assertEquals(0, result);
+      
logger.info("\n*******************************************************************************************************************************\n"
 +
+                   "Paused" +
+                   
"\n*******************************************************************************************************************************");
+      Thread.sleep(timeToWait);
+      result = ExecuteUtil.runCommand(true, "kill", "-9", Long.toString(pid));
+      Assert.assertEquals(0, result);
+   }
+
    @Test(timeout = 60_000)
    public void testWithMatchingDifferentNamesOnQueueKill() throws Exception {
-      internalMultipleQueues(true, true, true);
+      internalMultipleQueues(true, true, true, false);
+   }
+
+   @Test
+   public void testWithMatchingDifferentNamesOnQueuePause() throws Exception {
+      internalMultipleQueues(true, true, false, true);
    }
 
    @Test(timeout = 60_000)
    public void testWithMatchingDifferentNamesOnQueue() throws Exception {
-      internalMultipleQueues(true, true, false);
+      internalMultipleQueues(true, true, false, false);
    }
 
    @Test(timeout = 60_000)
    public void testWithMatching() throws Exception {
-      internalMultipleQueues(true, false, false);
+      internalMultipleQueues(true, false, false, false);
    }
 
    @Test(timeout = 60_000)
    public void testwithQueueName() throws Exception {
-      internalMultipleQueues(false, false, false);
+      internalMultipleQueues(false, false, false, false);
    }
 
    @Test(timeout = 60_000)
    public void testwithQueueNameDistinctName() throws Exception {
-      internalMultipleQueues(false, true, false);
+      internalMultipleQueues(false, true, false, false);
    }
 
-   private void internalMultipleQueues(boolean useMatching, boolean 
distinctNaming, boolean kill) throws Exception {
+   private void internalMultipleQueues(boolean useMatching, boolean 
distinctNaming, boolean kill, boolean pause) throws Exception {
       final int numberOfMessages = 100;
       final int numberOfQueues = 10;
       AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", 
"tcp://localhost:24622?amqpIdleTimeout=1000").setRetryInterval(10).setReconnectAttempts(-1);
@@ -148,10 +165,14 @@ public class QpidDispatchPeerTest extends 
AmqpClientTestSupport {
       }
 
       if (kill) {
-         stopQpidRouter();
+         qpidProcess.kill();
+         startQpidRouter();
+      } else if (pause) {
+         pauseThenKill(3_000);
          startQpidRouter();
       }
 
+
       for (int dest = 0; dest < numberOfQueues; dest++) {
          ConnectionFactory factoryConsumer = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");
          Connection connectionConsumer = 
createConnectionDumbRetry(factoryConsumer);

Reply via email to