Repository: activemq-artemis Updated Branches: refs/heads/master d09aba4cb -> 1548a4e21
NO-JIRA Removing Thread usage for Pings on Stomp Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1548a4e2 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1548a4e2 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1548a4e2 Branch: refs/heads/master Commit: 1548a4e21772ff70690136e5d7653630cef7a851 Parents: d09aba4 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Mon Sep 26 18:32:17 2016 -0400 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Mon Sep 26 18:32:25 2016 -0400 ---------------------------------------------------------------------- .../core/protocol/stomp/StompConnection.java | 20 ++++++-- .../protocol/stomp/StompProtocolManager.java | 2 +- .../stomp/VersionedStompFrameHandler.java | 17 +++++-- .../stomp/v10/StompFrameHandlerV10.java | 6 ++- .../stomp/v11/StompFrameHandlerV11.java | 53 ++++++++------------ .../stomp/v12/StompFrameHandlerV12.java | 7 ++- 6 files changed, 59 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index b918b75..f1605cb 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; @@ -44,6 +45,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ConfigurationHelper; +import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.VersionLoader; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; @@ -96,6 +98,10 @@ public final class StompConnection implements RemotingConnection { private final int minLargeMessageSize; + private final ScheduledExecutorService scheduledExecutorService; + + private final ExecutorFactory factory; + @Override public boolean isSupportReconnect() { return false; @@ -111,7 +117,7 @@ public final class StompConnection implements RemotingConnection { case ActiveMQStompException.INVALID_EOL_V10: if (version != null) throw e; - frameHandler = new StompFrameHandlerV12(this); + frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, factory); buffer.resetReaderIndex(); frame = decode(buffer); break; @@ -136,12 +142,18 @@ public final class StompConnection implements RemotingConnection { StompConnection(final Acceptor acceptorUsed, final Connection transportConnection, - final StompProtocolManager manager) { + final StompProtocolManager manager, + final ScheduledExecutorService scheduledExecutorService, + final ExecutorFactory factory) { + this.scheduledExecutorService = scheduledExecutorService; + + this.factory = factory; + this.transportConnection = transportConnection; this.manager = manager; - this.frameHandler = new StompFrameHandlerV10(this); + this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, factory); this.creationTime = System.currentTimeMillis(); @@ -452,7 +464,7 @@ public final class StompConnection implements RemotingConnection { } if (this.version != (StompVersions.V1_0)) { - VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version); + VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, factory); newHandler.initDecoder(this.frameHandler); this.frameHandler = newHandler; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 5de63d3..235cddd 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -117,7 +117,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto @Override public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) { - StompConnection conn = new StompConnection(acceptorUsed, connection, this); + StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getScheduledPool(), server.getExecutorFactory()); // Note that STOMP 1.0 has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection // will be timed out and closed! http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 185f81f..25d32be 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.stomp; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; @@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.artemis.utils.ExecutorFactory; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; @@ -37,21 +39,26 @@ public abstract class VersionedStompFrameHandler { protected StompConnection connection; protected StompDecoder decoder; - public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version) { + protected final ScheduledExecutorService scheduledExecutorService; + protected final ExecutorFactory executorFactory; + + public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) { if (version == StompVersions.V1_0) { - return new StompFrameHandlerV10(connection); + return new StompFrameHandlerV10(connection, scheduledExecutorService, executorFactory); } if (version == StompVersions.V1_1) { - return new StompFrameHandlerV11(connection); + return new StompFrameHandlerV11(connection, scheduledExecutorService, executorFactory); } if (version == StompVersions.V1_2) { - return new StompFrameHandlerV12(connection); + return new StompFrameHandlerV12(connection, scheduledExecutorService, executorFactory); } return null; } - protected VersionedStompFrameHandler(StompConnection connection) { + protected VersionedStompFrameHandler(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) { this.connection = connection; + this.scheduledExecutorService = scheduledExecutorService; + this.executorFactory = executorFactory; } public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java index 25db3b0..0e32881 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v10; import javax.security.cert.X509Certificate; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.core.protocol.stomp.FrameEventListener; import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException; @@ -30,13 +31,14 @@ import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.utils.CertificateUtil; +import org.apache.activemq.artemis.utils.ExecutorFactory; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements FrameEventListener { - public StompFrameHandlerV10(StompConnection connection) { - super(connection); + public StompFrameHandlerV10(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory factory) { + super(connection, scheduledExecutorService, factory); decoder = new StompDecoder(this); decoder.init(); connection.addStompEventListener(this); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java index 974a889..35de63c 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java @@ -18,6 +18,9 @@ package org.apache.activemq.artemis.core.protocol.stomp.v11; import javax.security.cert.X509Certificate; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException; @@ -31,9 +34,11 @@ import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandle import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl; +import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.utils.CertificateUtil; +import org.apache.activemq.artemis.utils.ExecutorFactory; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; @@ -43,8 +48,8 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements private HeartBeater heartBeater; - public StompFrameHandlerV11(StompConnection connection) { - super(connection); + public StompFrameHandlerV11(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory executorFactory) { + super(connection, scheduledExecutorService, executorFactory); connection.addStompEventListener(this); decoder = new StompDecoderV11(this); decoder.init(); @@ -127,19 +132,13 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements //client receive ping long minAcceptInterval = Long.valueOf(params[1]); - heartBeater = new HeartBeater(minPingInterval, minAcceptInterval); + heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval); } @Override public StompFrame onDisconnect(StompFrame frame) { if (this.heartBeater != null) { heartBeater.shutdown(); - try { - heartBeater.join(); - } - catch (InterruptedException e) { - ActiveMQServerLogger.LOGGER.errorOnStompHeartBeat(e); - } } return null; } @@ -250,7 +249,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements * (b) configure connection ttl so that org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.FailureCheckAndFlushThread * can deal with closing connections which go stale */ - private class HeartBeater extends Thread { + private class HeartBeater extends ActiveMQScheduledComponent { private static final int MIN_SERVER_PING = 500; @@ -260,7 +259,13 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements AtomicLong lastPingTimestamp = new AtomicLong(0); ConnectionEntry connectionEntry; - private HeartBeater(final long clientPing, final long clientAcceptPing) { + private HeartBeater(ScheduledExecutorService scheduledExecutorService, Executor executor, final long clientPing, final long clientAcceptPing) { + super(scheduledExecutorService, executor, clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING, TimeUnit.MILLISECONDS, false); + + if (clientAcceptPing != 0) { + serverPingPeriod = super.getPeriod(); + } + connectionEntry = ((RemotingServiceImpl)connection.getManager().getServer().getRemotingService()).getConnectionEntry(connection.getID()); if (connectionEntry != null) { @@ -299,14 +304,11 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements } } - if (clientAcceptPing != 0) { - serverPingPeriod = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING; - } } - public synchronized void shutdown() { - shutdown = true; - this.notify(); + public void shutdown() { + this.stop(); + } public void pinged() { @@ -315,21 +317,8 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements @Override public void run() { - synchronized (this) { - while (!shutdown) { - long lastPingPeriod = System.currentTimeMillis() - lastPingTimestamp.get(); - if (lastPingPeriod >= serverPingPeriod) { - lastPingTimestamp.set(System.currentTimeMillis()); - connection.ping(createPingFrame()); - lastPingPeriod = 0; - } - try { - this.wait(serverPingPeriod - lastPingPeriod); - } - catch (InterruptedException e) { - } - } - } + lastPingTimestamp.set(System.currentTimeMillis()); + connection.ping(createPingFrame()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1548a4e2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java index 5127dd0..19149bf 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.stomp.v12; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompConnection; @@ -26,13 +28,14 @@ import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11; import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.utils.ExecutorFactory; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; public class StompFrameHandlerV12 extends StompFrameHandlerV11 { - public StompFrameHandlerV12(StompConnection connection) { - super(connection); + public StompFrameHandlerV12(StompConnection connection, ScheduledExecutorService scheduledExecutorService, ExecutorFactory factory) { + super(connection, scheduledExecutorService, factory); decoder = new StompDecoderV12(this); decoder.init(); }