This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new c0b12b1 ARTEMIS-2969 / ARTEMIS-2937 Controlling connecting state on AMQP Broker Connection c0b12b1 is described below commit c0b12b14c88995a1ca2d3e2c2220aeca220d94b7 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Mon Nov 2 07:34:35 2020 -0500 ARTEMIS-2969 / ARTEMIS-2937 Controlling connecting state on AMQP Broker Connection - Fixed an issue where I needed to set connection to null after closing it - Added more tests on QpidDispatchPeerTest (tests i would have done manually, and reproduced a few issues along the way) --- .../apache/activemq/artemis/utils/ExecuteUtil.java | 7 +++++ .../amqp/connect/AMQPBrokerConnection.java | 33 +++++++++++++++----- .../amqp/connect/QpidDispatchPeerTest.java | 35 +++++++++++++++++----- 3 files changed, 61 insertions(+), 14 deletions(-) diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java index 3a36425..19a046a 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ExecuteUtil.java @@ -22,6 +22,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.lang.reflect.Field; import java.util.concurrent.TimeUnit; public class ExecuteUtil { @@ -38,6 +39,12 @@ public class ExecuteUtil { inputStreamReader.join(); } + public int pid() throws Exception { + Field pidField = process.getClass().getDeclaredField("pid"); + pidField.setAccessible(true); + return (int)pidField.get(process); + } + public int waitFor(long timeout, TimeUnit unit) throws InterruptedException { if (!process.waitFor(timeout, unit)) { logger.warn("could not complete execution in time"); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index 181d8ed..1e6d52b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -92,6 +92,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, private volatile boolean started = false; private final AMQPBrokerConnectionManager bridgeManager; private int retryCounter = 0; + private boolean connecting = false; private volatile ScheduledFuture reconnectFuture; private Set<Queue> senders = new HashSet<>(); private Set<Queue> receivers = new HashSet<>(); @@ -209,6 +210,8 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, private void doConnect() { try { + connecting = true; + List<TransportConfiguration> configurationList = brokerConnectConfiguration.getTransportConfigurations(); TransportConfiguration tpConfig = configurationList.get(0); @@ -283,6 +286,8 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, protonRemotingConnection.getAmqpConnection().flush(); bridgeManager.connected(connection, this); + + connecting = false; } catch (Throwable e) { error(e); } @@ -464,6 +469,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, } protected void error(Throwable e) { + connecting = false; logger.warn(e.getMessage(), e); redoConnection(); } @@ -511,16 +517,29 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, } private void redoConnection() { - try { - if (connection != null) { - connection.close(); + + // we need to use the connectExecutor to initiate a redoConnection + // otherwise we would need to add synchronized blocks along this class + // to control when connecting becomes true and when it becomes false + // keeping a single executor thread to this purpose would simplify things + connectExecutor.execute(() -> { + if (connecting) { + logger.debug("Broker connection " + this.getName() + " was already in retry mode, exception or retry no captured"); + return; } - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } + connecting = true; - retryConnection(); + try { + if (connection != null) { + connection.close(); + connection = null; + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + retryConnection(); + }); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java index 369f030..0ab1e45 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java @@ -83,32 +83,49 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { qpidProcess.kill(); } + public void pauseThenKill(int timeToWait) throws Exception { + int pid = qpidProcess.pid(); + int result = ExecuteUtil.runCommand(true, "kill", "-STOP", Long.toString(pid)); + Assert.assertEquals(0, result); + logger.info("\n*******************************************************************************************************************************\n" + + "Paused" + + "\n*******************************************************************************************************************************"); + Thread.sleep(timeToWait); + result = ExecuteUtil.runCommand(true, "kill", "-9", Long.toString(pid)); + Assert.assertEquals(0, result); + } + @Test(timeout = 60_000) public void testWithMatchingDifferentNamesOnQueueKill() throws Exception { - internalMultipleQueues(true, true, true); + internalMultipleQueues(true, true, true, false); + } + + @Test + public void testWithMatchingDifferentNamesOnQueuePause() throws Exception { + internalMultipleQueues(true, true, false, true); } @Test(timeout = 60_000) public void testWithMatchingDifferentNamesOnQueue() throws Exception { - internalMultipleQueues(true, true, false); + internalMultipleQueues(true, true, false, false); } @Test(timeout = 60_000) public void testWithMatching() throws Exception { - internalMultipleQueues(true, false, false); + internalMultipleQueues(true, false, false, false); } @Test(timeout = 60_000) public void testwithQueueName() throws Exception { - internalMultipleQueues(false, false, false); + internalMultipleQueues(false, false, false, false); } @Test(timeout = 60_000) public void testwithQueueNameDistinctName() throws Exception { - internalMultipleQueues(false, true, false); + internalMultipleQueues(false, true, false, false); } - private void internalMultipleQueues(boolean useMatching, boolean distinctNaming, boolean kill) throws Exception { + private void internalMultipleQueues(boolean useMatching, boolean distinctNaming, boolean kill, boolean pause) throws Exception { final int numberOfMessages = 100; final int numberOfQueues = 10; AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622?amqpIdleTimeout=1000").setRetryInterval(10).setReconnectAttempts(-1); @@ -148,10 +165,14 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { } if (kill) { - stopQpidRouter(); + qpidProcess.kill(); + startQpidRouter(); + } else if (pause) { + pauseThenKill(3_000); startQpidRouter(); } + for (int dest = 0; dest < numberOfQueues; dest++) { ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622"); Connection connectionConsumer = createConnectionDumbRetry(factoryConsumer);