Repository: activemq Updated Branches: refs/heads/master 30ff378a3 -> 96494f74c
https://issues.apache.org/jira/browse/AMQ-6346 Prevent concurrent access to the MQTT protocol handlers which can lead to a tansport level deadlock Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/96494f74 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/96494f74 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/96494f74 Branch: refs/heads/master Commit: 96494f74c7142c3396f17696f345c2355c16a61c Parents: 30ff378 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Tue Jul 5 17:47:49 2016 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Tue Jul 5 17:50:03 2016 +0000 ---------------------------------------------------------------------- .../activemq/transport/ws/AbstractMQTTSocket.java | 12 +++++++++++- .../activemq/transport/ws/jetty9/MQTTSocket.java | 17 ++++++++++++++--- 2 files changed, 25 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/96494f74/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java index fe0bd32..48282d9 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.security.cert.X509Certificate; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -36,6 +37,7 @@ import org.fusesource.mqtt.codec.MQTTFrame; public abstract class AbstractMQTTSocket extends TransportSupport implements MQTTTransport, BrokerServiceAware { + protected ReentrantLock protocolLock = new ReentrantLock(); protected volatile MQTTProtocolConverter protocolConverter = null; protected MQTTWireFormat wireFormat = new MQTTWireFormat(); protected final MQTTInactivityMonitor mqttInactivityMonitor = new MQTTInactivityMonitor(this, wireFormat); @@ -53,16 +55,24 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT @Override public void oneway(Object command) throws IOException { + protocolLock.lock(); try { getProtocolConverter().onActiveMQCommand((Command)command); } catch (Exception e) { onException(IOExceptionSupport.create(e)); + } finally { + protocolLock.unlock(); } } @Override public void sendToActiveMQ(Command command) { - doConsume(command); + protocolLock.lock(); + try { + doConsume(command); + } finally { + protocolLock.unlock(); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/96494f74/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java index 8e0c416..53bad07 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty9; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.ws.AbstractMQTTSocket; import org.apache.activemq.util.ByteSequence; @@ -33,6 +34,7 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class); + private final int ORDERLY_CLOSE_TIMEOUT = 10; private Session session; public MQTTSocket(String remoteAddress) { @@ -65,22 +67,31 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener } } - receiveCounter += length; - + protocolLock.lock(); try { + receiveCounter += length; MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length)); getProtocolConverter().onMQTTCommand(frame); } catch (Exception e) { onException(IOExceptionSupport.create(e)); + } finally { + protocolLock.unlock(); } } @Override public void onWebSocketClose(int arg0, String arg1) { try { - getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); + if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { + LOG.debug("MQTT WebSocket closed: code[{}] message[{}]", arg0, arg1); + getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); + } } catch (Exception e) { LOG.debug("Failed to close MQTT WebSocket cleanly", e); + } finally { + if (protocolLock.isHeldByCurrentThread()) { + protocolLock.unlock(); + } } }