moving send method to the connection
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c4cd469e Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c4cd469e Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c4cd469e Branch: refs/heads/refactor-openwire Commit: c4cd469e233a756d6d01944e040b40db39b4c011 Parents: cbb0904 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Wed Feb 24 14:26:55 2016 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Wed Mar 9 14:41:41 2016 -0500 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 39 ++++++++++++++------ .../openwire/OpenWireProtocolManager.java | 36 ------------------ 2 files changed, 27 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4cd469e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 7c1c094..991f24b 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -22,9 +22,7 @@ import javax.jms.JMSSecurityException; import javax.jms.ResourceAllocationException; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -36,13 +34,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil; -import org.apache.activemq.artemis.core.protocol.openwire.SendingResult; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; @@ -58,7 +52,6 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerInfo; @@ -104,7 +97,6 @@ import org.apache.activemq.wireformat.WireFormat; /** * Represents an activemq connection. - * ToDo: extends AbstractRemotingConnection */ public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth { @@ -214,7 +206,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se info.setResponseRequired(false); // if we don't respond to KeepAlive commands then the client will think the server is dead and timeout // for some reason KeepAliveInfo.isResponseRequired() is always false - protocolManager.sendReply(this, info); + sendCommand(info); } else { Response response = null; @@ -333,7 +325,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se // throw a WireFormatInfo to the peer public void init() { WireFormatInfo info = wireFormat.getPreferedWireFormatInfo(); - protocolManager.send(this, info); + sendCommand(info); } public ConnectionState getState() { @@ -536,7 +528,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se m.setBrokerOutTime(endTime); } - protocolManager.send(this, dispatch); + sendCommand(dispatch); } public WireFormat getMarshaller() { @@ -577,7 +569,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se destroyed = true; - //before closing transport, send the last response if any + //before closing transport, sendCommand the last response if any Command command = context.getLastCommand(); if (command != null && command.isResponseRequired()) { Response lastResponse = new Response(); @@ -689,6 +681,29 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se context.incRefCount(); } + /** This will answer with commands to the client */ + public boolean sendCommand(final Command command) { + if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { + ActiveMQServerLogger.LOGGER.trace("sending " + command); + } + synchronized (this) { + if (isDestroyed()) { + return false; + } + + try { + physicalSend(command); + } + catch (Exception e) { + return false; + } + catch (Throwable t) { + return false; + } + return true; + } + } + // This will listen for commands throught the protocolmanager public class CommandProcessor implements CommandVisitor { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4cd469e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 440fcce..add1455 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -280,42 +280,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No } - public void sendReply(final OpenWireConnection connection, final Command command) { - server.getStorageManager().afterCompleteOperations(new IOCallback() { - @Override - public void onError(final int errorCode, final String errorMessage) { - ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); - } - - @Override - public void done() { - send(connection, command); - } - }); - } - - public boolean send(final OpenWireConnection connection, final Command command) { - if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { - ActiveMQServerLogger.LOGGER.trace("sending " + command); - } - synchronized (connection) { - if (connection.isDestroyed()) { - return false; - } - - try { - connection.physicalSend(command); - } - catch (Exception e) { - return false; - } - catch (Throwable t) { - return false; - } - return true; - } - } - public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception { String username = info.getUserName(); String password = info.getPassword();