fixing SlowConsumerDetection
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/62373748 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/62373748 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/62373748 Branch: refs/heads/refactor-openwire Commit: 6237374877d4eb252f1e7ab10365ef6ab066c857 Parents: 255d607 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Thu Feb 25 14:40:04 2016 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Mar 17 14:10:46 2016 -0400 ---------------------------------------------------------------------- .../core/protocol/openwire/OpenWireConnection.java | 12 +++++++++--- .../protocol/openwire/OpenWireProtocolManager.java | 17 ++++------------- .../core/server/impl/ServerConsumerImpl.java | 17 +++++++++++++++++ 3 files changed, 30 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62373748/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 6f2e3be..dc2a8a6 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -204,6 +204,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); + + + // TODO-NOW: the server should send packets to the client based on the requested times + // need to look at what Andy did on AMQP + // the connection handles pings, negotiations directly. // and delegate all other commands to manager. if (command.getClass() == KeepAliveInfo.class) { @@ -1196,12 +1201,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processMessageDispatch(MessageDispatch arg0) throws Exception { - throw new IllegalStateException("not implemented! "); + return null; } @Override public Response processMessageDispatchNotification(MessageDispatchNotification arg0) throws Exception { - throw new IllegalStateException("not implemented! "); + return null; } @Override @@ -1222,7 +1227,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processProducerAck(ProducerAck arg0) throws Exception { - throw new IllegalStateException("not implemented! "); + // a broker doesn't do producers.. this shouldn't happen + return null; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62373748/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index bdf27f8..514a2b9 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -17,14 +17,12 @@ package org.apache.activemq.artemis.core.protocol.openwire; import javax.jms.InvalidClientIDException; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; @@ -44,7 +42,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; -import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -60,7 +57,6 @@ import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; -import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.MessageDispatch; @@ -91,21 +87,14 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl private OpenWireFormatFactory wireFactory; - private boolean tightEncodingEnabled = true; - private boolean prefixPacketSize = true; private BrokerId brokerId; protected final ProducerId advisoryProducerId = new ProducerId(); - // from broker - protected final Map<ConnectionId, OpenWireConnection> brokerConnectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, OpenWireConnection>()); - private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>(); - protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<>(); - - // Clebert TODO: use ConcurrentHashMap, or maybe use the schema that's already available on Artemis upstream (unique-client-id) + // TODO-NOW: this can probably go away private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>(); private String brokerName; @@ -133,11 +122,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl // preferred prop, should be done via config wireFactory.setCacheEnabled(false); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); - ManagementService service = server.getManagementService(); scheduledPool = server.getScheduledPool(); final ClusterManager clusterManager = this.server.getClusterManager(); + + // TODO-NOW: use a property name for the cluster connection ClusterConnection cc = clusterManager.getDefaultConnection(null); + if (cc != null) { cc.addClusterTopologyListener(this); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62373748/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 545b4dc..b5ea5d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -89,6 +89,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private Object protocolContext; + private SlowConsumerDetectionListener slowConsumerListener; + /** * We get a readLock when a message is handled, and return the readLock when the message is finally delivered * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished @@ -223,6 +225,21 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // ---------------------------------------------------------------------- @Override + public void setlowConsumerDetection(SlowConsumerDetectionListener listener) { + this.slowConsumerListener = listener; + } + + @Override + public SlowConsumerDetectionListener getSlowConsumerDetecion() { + return slowConsumerListener; + } + + @Override + public void fireSlowConsumer() { + slowConsumerListener.onSlowConsumer(this); + } + + @Override public Object getProtocolContext() { return protocolContext; }