Repository: activemq
Updated Branches:
  refs/heads/master d020af203 -> 8cc0c5ad6


AMQ-7106 - fix pending stop support by avoiding sync through single shared 
status var - fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8cc0c5ad
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8cc0c5ad
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8cc0c5ad

Branch: refs/heads/master
Commit: 8cc0c5ad6c85381cf6bbeaf179086d451d96650e
Parents: d020af2
Author: gtully <gary.tu...@gmail.com>
Authored: Wed Nov 21 10:23:13 2018 +0000
Committer: gtully <gary.tu...@gmail.com>
Committed: Wed Nov 21 10:23:13 2018 +0000

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    | 107 +++++-----
 .../transport/mqtt/MQTTInactivityMonitor.java   |   2 +-
 ...TcpTransportInactiveDuringHandshakeTest.java | 203 +++++++++++++++++++
 3 files changed, 254 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8cc0c5ad/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index ba1f1eb..c064b18 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -138,8 +138,14 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     private boolean blocked;
     private boolean connected;
     private boolean active;
-    private final AtomicBoolean starting = new AtomicBoolean();
-    private final AtomicBoolean pendingStop = new AtomicBoolean();
+
+    // state management around pending stop
+    private static final int NEW           = 0;
+    private static final int STARTING      = 1;
+    private static final int STARTED       = 2;
+    private static final int PENDING_STOP  = 3;
+    private final AtomicInteger status = new AtomicInteger(NEW);
+
     private long timeStamp;
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     private final CountDownLatch stopped = new CountDownLatch(1);
@@ -229,7 +235,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     }
 
     public void serviceTransportException(IOException e) {
-        if (!stopping.get() && !pendingStop.get()) {
+        if (!stopping.get() && status.get() != PENDING_STOP) {
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
                 TRANSPORTLOG.debug(this + " failed: " + e, e);
@@ -303,7 +309,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 }
                 ConnectionError ce = new ConnectionError();
                 ce.setException(e);
-                if (pendingStop.get()) {
+                if (status.get() == PENDING_STOP) {
                     dispatchSync(ce);
                 } else {
                     dispatchAsync(ce);
@@ -321,7 +327,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
         boolean responseRequired = command.isResponseRequired();
         int commandId = command.getCommandId();
         try {
-            if (!pendingStop.get()) {
+            if (status.get() != PENDING_STOP) {
                 response = command.visit(this);
             } else {
                 response = new ExceptionResponse(transportException.get());
@@ -993,7 +999,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     @Override
     public boolean iterate() {
         try {
-            if (pendingStop.get() || stopping.get()) {
+            if (status.get() == PENDING_STOP || stopping.get()) {
                 if (dispatchStopped.compareAndSet(false, true)) {
                     if (transportException.get() == null) {
                         try {
@@ -1049,39 +1055,39 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
 
     @Override
     public void start() throws Exception {
-        try {
-            synchronized (this) {
-                starting.set(true);
-                if (taskRunnerFactory != null) {
-                    taskRunner = taskRunnerFactory.createTaskRunner(this, 
"ActiveMQ Connection Dispatcher: "
-                            + getRemoteAddress());
-                } else {
-                    taskRunner = null;
+        if (status.compareAndSet(NEW, STARTING)) {
+            try {
+                synchronized (this) {
+                    if (taskRunnerFactory != null) {
+                        taskRunner = taskRunnerFactory.createTaskRunner(this, 
"ActiveMQ Connection Dispatcher: "
+                                + getRemoteAddress());
+                    } else {
+                        taskRunner = null;
+                    }
+                    transport.start();
+                    active = true;
+                    BrokerInfo info = connector.getBrokerInfo().copy();
+                    if (connector.isUpdateClusterClients()) {
+                        
info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
+                    } else {
+                        info.setPeerBrokerInfos(null);
+                    }
+                    dispatchAsync(info);
+
+                    connector.onStarted(this);
                 }
-                transport.start();
-                active = true;
-                BrokerInfo info = connector.getBrokerInfo().copy();
-                if (connector.isUpdateClusterClients()) {
-                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
-                } else {
-                    info.setPeerBrokerInfos(null);
+            } catch (Exception e) {
+                // Force clean up on an error starting up.
+                status.set(PENDING_STOP);
+                throw e;
+            } finally {
+                // stop() can be called from within the above block,
+                // but we want to be sure start() completes before
+                // stop() runs, so queue the stop until right now:
+                if (!status.compareAndSet(STARTING, STARTED)) {
+                    LOG.debug("Calling the delayed stop() after start() {}", 
this);
+                    stop();
                 }
-                dispatchAsync(info);
-
-                connector.onStarted(this);
-            }
-        } catch (Exception e) {
-            // Force clean up on an error starting up.
-            pendingStop.set(true);
-            throw e;
-        } finally {
-            // stop() can be called from within the above block,
-            // but we want to be sure start() completes before
-            // stop() runs, so queue the stop until right now:
-            setStarting(false);
-            if (isPendingStop()) {
-                LOG.debug("Calling the delayed stop() after start() {}", this);
-                stop();
             }
         }
     }
@@ -1099,10 +1105,8 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
 
     public void delayedStop(final int waitTime, final String reason, Throwable 
cause) {
         if (waitTime > 0) {
-            synchronized (this) {
-                pendingStop.set(true);
-                transportException.set(cause);
-            }
+            status.compareAndSet(STARTING, PENDING_STOP);
+            transportException.set(cause);
             try {
                 stopTaskRunnerFactory.execute(new Runnable() {
                     @Override
@@ -1128,12 +1132,9 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
 
     public void stopAsync() {
         // If we're in the middle of starting then go no further... for now.
-        synchronized (this) {
-            pendingStop.set(true);
-            if (starting.get()) {
-                LOG.debug("stopAsync() called in the middle of start(). 
Delaying till start completes..");
-                return;
-            }
+        if (status.compareAndSet(STARTING, PENDING_STOP)) {
+            LOG.debug("stopAsync() called in the middle of start(). Delaying 
till start completes..");
+            return;
         }
         if (stopping.compareAndSet(false, true)) {
             // Let all the connection contexts know we are shutting down
@@ -1342,7 +1343,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
      * @return true if the Connection is starting
      */
     public boolean isStarting() {
-        return starting.get();
+        return status.get() == STARTING;
     }
 
     @Override
@@ -1355,19 +1356,11 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
         return this.faultTolerantConnection;
     }
 
-    protected void setStarting(boolean starting) {
-        this.starting.set(starting);
-    }
-
     /**
      * @return true if the Connection needs to stop
      */
     public boolean isPendingStop() {
-        return pendingStop.get();
-    }
-
-    protected void setPendingStop(boolean pendingStop) {
-        this.pendingStop.set(pendingStop);
+        return status.get() == PENDING_STOP;
     }
 
     private NetworkBridgeConfiguration getNetworkConfiguration(final 
BrokerInfo info) throws IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/8cc0c5ad/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
index 8c56a24..b3d8fba 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
@@ -78,7 +78,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
                     ASYNC_TASKS.execute(new Runnable() {
                         @Override
                         public void run() {
-                            onException(new InactivityIOException("Channel was 
inactive for too (>" + (readKeepAliveTime + readGraceTime) + ") long: "
+                            onException(new InactivityIOException("CONNECT 
frame not received with in connectionTimeout (>" + connectionTimeout + "): "
                                 + next.getRemoteAddress()));
                         }
                     });

http://git-wip-us.apache.org/repos/asf/activemq/blob/8cc0c5ad/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportInactiveDuringHandshakeTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportInactiveDuringHandshakeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportInactiveDuringHandshakeTest.java
new file mode 100644
index 0000000..d01511f
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportInactiveDuringHandshakeTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.tcp;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.activemq.util.Wait;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.*;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class TcpTransportInactiveDuringHandshakeTest {
+
+    private static final org.slf4j.Logger LOG = 
LoggerFactory.getLogger(TcpTransportInactiveDuringHandshakeTest.class);
+
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final String PASSWORD = "password";
+    public static final String SERVER_KEYSTORE = 
"src/test/resources/server.keystore";
+    public static final String TRUST_KEYSTORE = 
"src/test/resources/client.keystore";
+
+    static {
+        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+    }
+
+    private BrokerService brokerService;
+    private DefaultTestAppender appender;
+    CountDownLatch inactivityMonitorFired = new CountDownLatch(1);
+    CountDownLatch handShakeComplete = new CountDownLatch(1);
+
+    @Before
+    public void before() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+
+        appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel().equals(Level.WARN) && 
event.getRenderedMessage().contains("InactivityIOException")) {
+                    inactivityMonitorFired.countDown();
+                }
+            }
+        };
+        org.apache.log4j.Logger rootLogger = 
org.apache.log4j.Logger.getRootLogger();
+        rootLogger.addAppender(appender);
+
+    }
+
+    @After
+    public void after() throws Exception {
+        org.apache.log4j.Logger rootLogger = 
org.apache.log4j.Logger.getRootLogger();
+        rootLogger.removeAppender(appender);
+
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testInactivityMonitorThreadCompletesWhenFiringDuringStart() 
throws Exception {
+        
brokerService.addConnector("mqtt+nio+ssl://localhost:0?transport.connectAttemptTimeout=1000&transport.closeAsync=false");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        TransportConnector transportConnector = 
brokerService.getTransportConnectors().get(0);
+        URI uri = transportConnector.getPublishableConnectURI();
+
+
+        CountDownLatch blockHandShakeCompletion = new CountDownLatch(1);
+
+        TrustManager[] trustManagers = new TrustManager[]{new 
X509TrustManager() {
+            @Override
+            public void checkClientTrusted(X509Certificate[] x509Certificates, 
String s) throws CertificateException {
+            }
+
+            @Override
+            public void checkServerTrusted(X509Certificate[] x509Certificates, 
String s) throws CertificateException {
+                LOG.info("Check Server Trusted: " + s, new Throwable("HERE"));
+                try {
+                    blockHandShakeCompletion.await(20, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                LOG.info("Check Server Trusted done!");
+            }
+
+            @Override
+            public X509Certificate[] getAcceptedIssuers() {
+                return new X509Certificate[0];
+            }
+        }};
+
+
+        SSLContext sslContext = SSLContext.getInstance("TLS");
+        sslContext.init(null, trustManagers, new SecureRandom());
+
+        final SSLSocket sslSocket = (SSLSocket) 
sslContext.getSocketFactory().createSocket("127.0.0.1", uri.getPort());
+
+        sslSocket.addHandshakeCompletedListener(new 
HandshakeCompletedListener() {
+            @Override
+            public void handshakeCompleted(HandshakeCompletedEvent 
handshakeCompletedEvent) {
+                handShakeComplete.countDown();
+            }
+        });
+
+        Executors.newCachedThreadPool().submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    sslSocket.startHandshake();
+                    assertTrue("Socket connected", sslSocket.isConnected());
+                } catch (IOException oops) {
+                    oops.printStackTrace();
+                }
+
+            }
+        });
+
+        assertTrue("inactivity fired", inactivityMonitorFired.await(10, 
TimeUnit.SECONDS));
+
+        assertTrue("Found non blocked inactivity monitor thread - done its 
work", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                // verify no InactivityMonitor Task blocked
+                Thread[] threads = new Thread[20];
+                int activeCount = 
Thread.currentThread().getThreadGroup().enumerate(threads);
+                for (int i = 0; i<activeCount; i++) {
+                    Thread thread = threads[i];
+                    LOG.info("T[" + i++ + "]: " + thread);
+                    if (thread.getName().contains("InactivityMonitor") && 
thread.getState().equals(Thread.State.TIMED_WAITING)) {
+                        LOG.info("Found inactivity monitor in timed-wait");
+                        // good
+                        return true;
+                    }
+                }
+                return false;
+            }
+        }));
+
+        // allow handshake to complete
+        blockHandShakeCompletion.countDown();
+
+        final OutputStream socketOutPutStream = sslSocket.getOutputStream();
+
+        assertTrue("Handshake complete", handShakeComplete.await(10, 
TimeUnit.SECONDS));
+
+        // wait for socket to be closed via Inactivity monitor
+
+        assertTrue("socket error", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Expecting socket to error from remote close: " + 
sslSocket);
+                try {
+                    socketOutPutStream.write(2);
+                    socketOutPutStream.flush();
+                } catch (IOException expected) {
+                    return true;
+                }
+                return false;
+            }
+        }));
+
+        LOG.info("Socket at end: " + sslSocket);
+        sslSocket.close();
+    }
+}

Reply via email to