This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 18692ec3c4ba2662050def03471c7cb25d93b4b1
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Sun Nov 5 12:19:53 2023 -0500

    ARTEMIS-4476 Fixing Ghost consumer situation with AMQP
---
 .../apache/activemq/artemis/logs/AuditLogger.java  |   7 +
 .../broker/ActiveMQProtonRemotingConnection.java   |  35 +-
 .../amqp/logger/ActiveMQAMQPProtocolLogger.java    |   3 +
 .../logger/ActiveMQAMQPProtocolMessageBundle.java  |   3 +
 .../amqp/proton/ProtonServerSenderContext.java     |  26 +-
 .../amqp/proton/ProtonServerSenderContextTest.java |  10 +
 .../core/protocol/openwire/OpenWireConnection.java |   4 +-
 .../management/impl/ActiveMQServerControlImpl.java |   2 +-
 .../core/management/impl/QueueControlImpl.java     |   5 +
 .../core/management/impl/view/ConsumerField.java   |   4 +-
 .../core/remoting/server/RemotingService.java      |   2 +
 .../remoting/server/impl/RemotingServiceImpl.java  |   3 +-
 .../artemis/core/server/ActiveMQMessageBundle.java |   3 +
 .../artemis/core/server/impl/QueueImpl.java        |   2 +-
 .../core/server/impl/ServerConsumerImpl.java       |  20 +-
 .../management/impl/ManagementServiceImpl.java     |   2 +
 tests/integration-tests-isolated/pom.xml           |  33 ++
 .../isolated/client/ConnectionDroppedTest.java     | 502 +++++++++++++++++++++
 .../amqp/connect/AMQPFederationConnectTest.java    |   1 +
 .../connect/AMQPFederationQueuePolicyTest.java     |   1 +
 .../amqp/connect/AMQPMirrorConnectionTest.java     |  15 -
 .../consumer/DetectOrphanedConsumerTest.java       | 115 +++++
 .../consumer/OrphanedConsumerDefenseTest.java      | 169 +++++++
 .../tests/leak/ConnectionDroppedLeakTest.java      | 202 +++++++++
 .../artemis/tests/leak/ConnectionLeakTest.java     |  14 +-
 .../artemis/tests/leak/MemoryAssertions.java       |   6 +
 26 files changed, 1153 insertions(+), 36 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 62fa7ca62e..ee42aa5c14 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2675,4 +2675,11 @@ public interface AuditLogger {
    @LogMessage(id = 601771, value = "User {} is getting name on target 
resource: {}", level = LogMessage.Level.INFO)
    void getCurrentTimeMillis(Object source);
 
+   static void verifyConnections(Object source) {
+      BASE_LOGGER.verifyConnections(getCaller(), source);
+   }
+
+   @LogMessage(id = 601772, value = "User {} is calling verifyConnections on 
resource: {}", level = LogMessage.Level.INFO)
+   void verifyConnections(String user, Object queue);
+
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
index 9ceea15655..bb6ad2e4ba 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java
@@ -17,11 +17,11 @@
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import javax.security.auth.Subject;
+import java.lang.invoke.MethodHandles;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
@@ -29,12 +29,17 @@ import 
org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import 
org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a Server's Connection representation used by ActiveMQ Artemis.
  */
 public class ActiveMQProtonRemotingConnection extends 
AbstractRemotingConnection {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    private final AMQPConnectionContext amqpConnection;
 
    private final ProtonProtocolManager manager;
@@ -73,17 +78,31 @@ public class ActiveMQProtonRemotingConnection extends 
AbstractRemotingConnection
 
       destroyed = true;
 
-      //filter it like the other protocols
-      if (!(me instanceof ActiveMQRemoteDisconnectException)) {
-         
ActiveMQClientLogger.LOGGER.connectionFailureDetected(amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress(),
 me.getMessage(), me.getType());
+      if (logger.isDebugEnabled()) {
+         try {
+            logger.debug("Connection failure detected. 
amqpConnection.getHandler().getConnection().getRemoteState() = {}, 
remoteIP={}", amqpConnection.getHandler().getConnection().getRemoteState(), 
amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress());
+         } catch (Throwable e) { // just to avoid a possible NPE from the 
debug statement itself
+            logger.debug(e.getMessage(), e);
+         }
       }
 
-      // Then call the listeners
-      callFailureListeners(me, scaleDownTargetNodeID);
+      try {
+         if (amqpConnection.getHandler().getConnection().getRemoteState() != 
EndpointState.CLOSED) {
+            // A remote close was received on the client, on that case it's 
just a normal operation and we don't need to log this.
+            
ActiveMQClientLogger.LOGGER.connectionFailureDetected(amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress(),
 me.getMessage(), me.getType());
+         }
+      } catch (Throwable e) { // avoiding NPEs from te logging statement. I 
don't think this would happen, but just in case
+         logger.warn(e.getMessage(), e);
+      }
 
-      callClosingListeners();
+      amqpConnection.runNow(() -> {
+         // Then call the listeners
+         callFailureListeners(me, scaleDownTargetNodeID);
 
-      internalClose();
+         callClosingListeners();
+
+         internalClose();
+      });
    }
 
    @Override
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
index 7d29be97ac..4ab69e6a55 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
@@ -55,4 +55,7 @@ public interface ActiveMQAMQPProtocolLogger {
 
    @LogMessage(id = 111006, value = "Unable to send message {} to Dead Letter 
Address.", level = LogMessage.Level.WARN)
    void unableToSendMessageToDLA(MessageReference ref, Throwable t);
+
+   @LogMessage(id = 111007, value = "Invalid Connection State: {} for remote 
IP {}", level = LogMessage.Level.WARN)
+   void invalidAMQPConnectionState(Object state, Object remoteIP);
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
index 561122f02c..110b1d5662 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
@@ -108,4 +108,7 @@ public interface ActiveMQAMQPProtocolMessageBundle {
 
    @Message(id = 119026, value = "Malformed Federation control message: {}")
    ActiveMQException malformedFederationControlMessage(String address);
+
+   @Message(id = 119027, value = "Invalid AMQPConnection Remote State: {}")
+   ActiveMQException invalidAMQPConnectionState(Object state);
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index fe92213930..12e905b656 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -60,6 +60,7 @@ import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@@ -92,6 +93,7 @@ import 
org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Link;
@@ -295,7 +297,9 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       } catch (ActiveMQException e) {
          throw e;
       } catch (Exception e) {
-         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
+         ActiveMQAMQPInternalErrorException internalErrorException = 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
+         internalErrorException.initCause(e);
+         throw internalErrorException;
       }
    }
 
@@ -980,6 +984,8 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
 
       @Override
       public Consumer init(ProtonServerSenderContext senderContext) throws 
Exception {
+         validateConnectionState();
+
          Source source = (Source) sender.getRemoteSource();
          final Map<Symbol, Object> supportedFilters = new HashMap<>();
 
@@ -1334,4 +1340,22 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
          }
       }
    }
+
+   private void validateConnectionState() throws ActiveMQException {
+      ProtonHandler handler = null;
+      Connection qpidConnection = null;
+
+      if (connection == null || (handler = connection.getHandler()) == null || 
(qpidConnection = handler.getConnection()) == null) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("validateConnectionState:: connection={}, handler={}, 
qpidConnection={}", connection, handler, qpidConnection);
+         }
+
+         ActiveMQAMQPProtocolLogger.LOGGER.invalidAMQPConnectionState("null", 
"null");
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.invalidAMQPConnectionState("null");
+      }
+      if (qpidConnection.getRemoteState() == EndpointState.CLOSED) {
+         
ActiveMQAMQPProtocolLogger.LOGGER.invalidAMQPConnectionState(qpidConnection.getRemoteState(),
 connection.getRemoteAddress());
+         throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.invalidAMQPConnectionState(qpidConnection.getRemoteState());
+      }
+   }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java
index bd8070ad29..0f9ed45af0 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java
@@ -21,7 +21,10 @@ import 
org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
 import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Sender;
 
 import org.junit.Test;
@@ -41,6 +44,13 @@ public class ProtonServerSenderContextTest {
       when(mock.getServer()).thenReturn(mock(ActiveMQServer.class));
       Sender mockSender = mock(Sender.class);
       AMQPConnectionContext mockConnContext = 
mock(AMQPConnectionContext.class);
+
+      ProtonHandler handler = mock(ProtonHandler.class);
+      Connection connection = mock(Connection.class);
+      when(connection.getRemoteState()).thenReturn(EndpointState.ACTIVE);
+      when(mockConnContext.getHandler()).thenReturn(handler);
+      when(handler.getConnection()).thenReturn(connection);
+
       when(mockConnContext.getProtocolManager()).thenReturn(mock);
 
       AMQPSessionCallback mockSessionCallback = 
mock(AMQPSessionCallback.class);
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 0956c2adca..77a095ae63 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -148,8 +148,6 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
    private final OpenWireProtocolManager protocolManager;
 
-   private boolean destroyed = false;
-
    private volatile ScheduledFuture ttlCheck;
 
    //separated in/out wireFormats allow deliveries (eg async and consumers) to 
not slow down bufferReceived
@@ -1233,6 +1231,8 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
             }
             dispatchAsync(command);
          }
+         // During a chanceClientID a disconnect could have been sent by the 
client, and the client will then re-issue a connect packet
+         destroyed = false;
          return null;
 
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index b433f8140a..0cd2e229f2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -2297,7 +2297,7 @@ public class ActiveMQServerControlImpl extends 
AbstractControl implements Active
                         RemotingConnection connection = null;
 
                         for (RemotingConnection potentialConnection : 
remotingService.getConnections()) {
-                           if 
(potentialConnection.getID().toString().equals(serverConsumer.getConnectionID()))
 {
+                           if 
(potentialConnection.getID().toString().equals(String.valueOf(serverConsumer.getConnectionID())))
 {
                               connection = potentialConnection;
                            }
                         }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 5b37b6570d..e7be97e596 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1841,6 +1841,11 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
                        
.add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), 
serverConsumer.getMessagesAcknowledgedAwaitingCommit())
                        .add(ConsumerField.LAST_DELIVERED_TIME.getName(), 
serverConsumer.getLastDeliveredTime())
                        .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), 
serverConsumer.getLastAcknowledgedTime());
+
+               if (server.getRemotingService().getConnection(((ServerConsumer) 
consumer).getConnectionID()) == null) {
+                  obj.add(ConsumerField.ORPHANED.getName(), true);
+               }
+
                jsonArray.add(obj);
             }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
index c4066a19a5..5c7c90b921 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
@@ -42,7 +42,9 @@ public enum ConsumerField {
    MESSAGES_ACKNOWLEDGED("messagesAcknowledged"),
    MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT("messagesAcknowledgedAwaitingCommit"),
    LAST_DELIVERED_TIME("lastDeliveredTime"),
-   LAST_ACKNOWLEDGED_TIME("lastAcknowledgedTime");
+   LAST_ACKNOWLEDGED_TIME("lastAcknowledgedTime"),
+   ORPHANED("orphaned");
+
 
    private static final Map<String, ConsumerField> lookup = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index 5014d114d7..478146d733 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -44,6 +44,8 @@ public interface RemotingService {
     */
    RemotingConnection removeConnection(Object remotingConnectionID);
 
+   RemotingConnection getConnection(Object remotingConnectionID);
+
    Set<RemotingConnection> getConnections();
 
    /**
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 20d777bd43..b0fb4efd00 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -460,7 +460,8 @@ public class RemotingServiceImpl implements 
RemotingService, ServerConnectionLif
       return started;
    }
 
-   private RemotingConnection getConnection(final Object remotingConnectionID) 
{
+   @Override
+   public RemotingConnection getConnection(final Object remotingConnectionID) {
       ConnectionEntry entry = connections.get(remotingConnectionID);
 
       if (entry != null) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 5de85adb7f..36477dd974 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -539,4 +539,7 @@ public interface ActiveMQMessageBundle {
 
    @Message(id = 229249, value = "Invalid Store property, only DATABASE 
property is supported")
    RuntimeException unsupportedStorePropertyType();
+
+   @Message(id = 229250, value = "Connection has been marked as destroyed for 
remote connection {}.")
+   ActiveMQException connectionDestroyed(String remoteAddress);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a50938ffe7..7f7cae750e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -4762,7 +4762,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                   RemotingService remotingService = 
server.getRemotingService();
 
                   for (RemotingConnection potentialConnection : 
remotingService.getConnections()) {
-                     if 
(potentialConnection.getID().toString().equals(serverConsumer.getConnectionID()))
 {
+                     if 
(potentialConnection.getID().toString().equals(String.valueOf(serverConsumer.getConnectionID())))
 {
                         connection = potentialConnection;
                      }
                   }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index c9d1215ae1..f534795c5d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -219,6 +219,16 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
                              final boolean supportLargeMessage,
                              final Integer credits,
                              final ActiveMQServer server) throws Exception {
+
+
+      if (session == null || session.getRemotingConnection() == null) {
+         throw new NullPointerException("session = " + session);
+      }
+
+      if (session != null && session.getRemotingConnection() != null && 
session.getRemotingConnection().isDestroyed()) {
+         throw 
ActiveMQMessageBundle.BUNDLE.connectionDestroyed(session.getRemotingConnection().getRemoteAddress());
+      }
+
       this.id = id;
 
       this.sequentialID = server.getStorageManager().generateID();
@@ -356,8 +366,8 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
    }
 
    @Override
-   public String getConnectionID() {
-      return this.session.getConnectionID().toString();
+   public Object getConnectionID() {
+      return this.session.getConnectionID();
    }
 
    @Override
@@ -1575,7 +1585,11 @@ public class ServerConsumerImpl implements 
ServerConsumer, ReadyListener {
 
    @Override
    public String getConnectionRemoteAddress() {
-      return 
this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
+      if (this.session == null || this.session.getRemotingConnection() == null 
|| this.session.getRemotingConnection().getTransportConnection() == null) {
+         return null;
+      } else {
+         return 
this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
+      }
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 92c523d1c5..6a1785d793 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -619,6 +619,8 @@ public class ManagementServiceImpl implements 
ManagementService {
    public synchronized void registerInRegistry(final String resourceName, 
final Object managedResource) {
       unregisterFromRegistry(resourceName);
 
+      logger.debug("Registering {} as {}", resourceName, managedResource);
+
       registry.put(resourceName, managedResource);
    }
 
diff --git a/tests/integration-tests-isolated/pom.xml 
b/tests/integration-tests-isolated/pom.xml
index 7317fb4e40..d0db972418 100644
--- a/tests/integration-tests-isolated/pom.xml
+++ b/tests/integration-tests-isolated/pom.xml
@@ -70,6 +70,12 @@
          <version>${project.version}</version>
          <scope>test</scope>
       </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-openwire-protocol</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+      </dependency>
       <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>artemis-unit-test-support</artifactId>
@@ -194,6 +200,33 @@
          <artifactId>jakarta.json-api</artifactId>
          <scope>test</scope>
       </dependency>
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>protonj2-test-driver</artifactId>
+         <scope>test</scope>
+      </dependency>
+      <dependency>
+         <groupId>jakarta.jms</groupId>
+         <artifactId>jakarta.jms-api</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>jakarta.management.j2ee</groupId>
+         <artifactId>jakarta.management.j2ee-api</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-client</artifactId>
+         <exclusions>
+            <exclusion>
+               <groupId>org.apache.geronimo.specs</groupId>
+               <artifactId>geronimo-jms_1.1_spec</artifactId>
+            </exclusion>
+            <exclusion>
+               <groupId>org.apache.geronimo.specs</groupId>
+               <artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
+            </exclusion>
+         </exclusions>
+      </dependency>
    </dependencies>
 
    <build>
diff --git 
a/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
 
b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
new file mode 100644
index 0000000000..b65cf47c0e
--- /dev/null
+++ 
b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.isolated.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.ThreadDumpUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionDroppedTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public ConnectionDroppedTest() {
+      // this is the reason why I'm putting this test on the "isolated" 
package.
+      disableCheckThread();
+   }
+
+   @Test(timeout = 20_000)
+   public void testConsumerDroppedWithProtonTestClient() throws Exception {
+      int NUMBER_OF_CONNECTIONS = 100;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         executorService.execute(() -> {
+            try (ProtonTestClient peer = new ProtonTestClient()) {
+               peer.queueClientSaslAnonymousConnect();
+               peer.remoteOpen().queue();
+               peer.expectOpen();
+               peer.remoteBegin().queue();
+               peer.expectBegin();
+               
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
 "amqp:rejected:list").also().queue();
+               peer.dropAfterLastHandler(1000); // This closes the netty 
connection after the attach is written
+               peer.connect("localhost", 61616);
+
+               // Waits for all the commands to fire and the drop action to be 
run.
+               peer.waitForScriptToComplete();
+            } catch (Throwable e) {
+               errors.incrementAndGet();
+               logger.warn(e.getMessage(), e);
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, errors.get());
+
+      Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+   }
+
+   @Test(timeout = 20_000)
+   public void testRegularClose() throws Exception {
+      int NUMBER_OF_CONNECTIONS = 100;
+      int REPEATS = 10;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+      AtomicInteger errors = new AtomicInteger(0);
+      AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
+      runAfter(loggerHandler::stop);
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         executorService.execute(() -> {
+            for (int r = 0; r < REPEATS; r++) {
+               try (ProtonTestClient peer = new ProtonTestClient()) {
+                  peer.queueClientSaslAnonymousConnect();
+                  peer.remoteOpen().queue();
+                  peer.expectOpen();
+                  peer.remoteBegin().queue();
+                  peer.expectBegin();
+                  
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
 "amqp:rejected:list").also().queue();
+                  peer.expectAttach();
+                  peer.remoteClose().queue();
+                  peer.expectClose();
+
+                  peer.connect("localhost", 61616);
+
+                  peer.waitForScriptToComplete();
+               } catch (Throwable e) {
+                  errors.incrementAndGet();
+                  logger.warn(e.getMessage(), e);
+                  break;
+               }
+            }
+            done.countDown();
+         });
+      }
+
+      Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+      Assert.assertEquals(0, errors.get());
+
+      Assert.assertFalse(loggerHandler.findText("AMQ212037"));
+
+      // TODO: Fix these as part of ARTEMIS-4483
+      /*Assert.assertFalse(loggerHandler.findText("Connection failure"));
+      Assert.assertFalse(loggerHandler.findText("REMOTE_DISCONNECT"));
+      Assert.assertFalse(loggerHandler.findText("AMQ222061"));
+      Assert.assertFalse(loggerHandler.findText("AMQ222107")); */
+
+      Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+      Wait.assertEquals(0, server::getConnectionCount, 5000);
+
+   }
+
+   @Test
+   public void testConsumerDroppedAMQP() throws Throwable {
+      testConsumerDroppedWithRegularClient("AMQP");
+
+   }
+
+   @Test
+   public void testConsumerDroppedCORE() throws Throwable {
+      testConsumerDroppedWithRegularClient("CORE");
+   }
+
+   @Test
+   public void testConsumerDroppedOpenWire() throws Throwable {
+      testConsumerDroppedWithRegularClient("OPENWIRE");
+   }
+
+   public void testConsumerDroppedWithRegularClient(final String protocol) 
throws Throwable {
+      int NUMBER_OF_CONNECTIONS = 25;
+      int REPEATS = 10;
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+
+      final AtomicBoolean running = new AtomicBoolean(true);
+
+      runAfter(() -> running.set(false));
+
+      CyclicBarrier flagStart = new CyclicBarrier(NUMBER_OF_CONNECTIONS + 1);
+      flagStart.reset();
+
+      for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) {
+         final int t = i;
+         executorService.execute(() -> {
+            try {
+               boolean alreadyStarted = false;
+               AtomicBoolean ex = new AtomicBoolean(true);
+               while (running.get()) {
+                  try {
+                     // do not be tempted to use try (connection = 
factory.createConnection())
+                     // this is because we don't need to close the connection 
after a network failure on this test.
+                     Connection connection = factory.createConnection();
+
+                     synchronized (ConnectionDroppedTest.this) {
+                        runAfter(connection::close);
+                     }
+                     connection.setExceptionListener(new ExceptionListener() {
+                        @Override
+                        public void onException(JMSException exception) {
+                           ex.set(true);
+                        }
+                     });
+                     flagStart.await(60, TimeUnit.SECONDS);
+
+                     connection.start();
+
+                     Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+                     javax.jms.Queue jmsQueue = 
session.createQueue("test-queue");
+
+                     while (running.get() && !ex.get()) {
+                        if (!alreadyStarted) {
+                           alreadyStarted = true;
+                        }
+                        System.out.println("Consumer");
+                        MessageConsumer consumer = 
session.createConsumer(jmsQueue);
+                        Thread.sleep(500);
+                     }
+
+                     if (!protocol.equals("CORE")) {
+                        connection.close();
+                     }
+                  } catch (Exception e) {
+                     logger.debug(e.getMessage(), e);
+                  }
+               }
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      for (int i = 0; i < REPEATS; i++) {
+         try {
+            flagStart.await(60, TimeUnit.SECONDS); // align all the clients at 
the same spot
+         } catch (Throwable throwable) {
+            logger.info(ThreadDumpUtil.threadDump("timed out flagstart"));
+            throw throwable;
+         }
+
+         
logger.info("*******************************************************************************************************************************\nloop
 kill {}" + 
"\n*******************************************************************************************************************************",
 i);
+         server.getRemotingService().getConnections().forEach(r -> {
+            r.fail(new ActiveMQException("it's a simulation"));
+         });
+
+      }
+
+      running.set(false);
+      try {
+         flagStart.await(1, TimeUnit.SECONDS);
+      } catch (Exception ignored) {
+      }
+      if (!done.await(10, TimeUnit.SECONDS)) {
+         logger.warn(ThreadDumpUtil.threadDump("Threads are still running"));
+         Assert.fail("Threads are still running");
+      }
+
+      Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100);
+
+   }
+
+   @Test
+   public void testDropConsumerProtonJ2TestClient() throws Throwable {
+      ReusableLatch latchCreating = new ReusableLatch(1);
+      ReusableLatch blockCreate = new ReusableLatch(1);
+      ReusableLatch done = new ReusableLatch(1);
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+
+      int TEST_REPEATS = 4;
+
+      server.registerBrokerPlugin(new ActiveMQServerSessionPlugin() {
+         @Override
+         public void beforeCreateSession(String name,
+                                         String username,
+                                         int minLargeMessageSize,
+                                         RemotingConnection connection,
+                                         boolean autoCommitSends,
+                                         boolean autoCommitAcks,
+                                         boolean preAcknowledge,
+                                         boolean xa,
+                                         String defaultAddress,
+                                         SessionCallback callback,
+                                         boolean autoCreateQueues,
+                                         OperationContext context,
+                                         Map<SimpleString, RoutingType> 
prefixes) throws ActiveMQException {
+            latchCreating.countDown();
+            try {
+               blockCreate.await(10, TimeUnit.HOURS);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            } finally {
+               done.countDown();
+            }
+         }
+      });
+
+      AtomicBoolean running = new AtomicBoolean(true);
+      ExecutorService executorService = Executors.newFixedThreadPool(1);
+      runAfter(executorService::shutdownNow);
+      runAfter(() -> running.set(false));
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setAddress(getName()).setAutoCreated(false));
+
+      for (int i = 0; i < TEST_REPEATS; i++) {
+         Assert.assertEquals(0, serverQueue.getConsumerCount());
+         latchCreating.setCount(1);
+         blockCreate.setCount(1);
+         done.setCount(1);
+
+         ProtonTestClient peer = new ProtonTestClient();
+
+         executorService.execute(() -> {
+
+            try {
+               peer.queueClientSaslAnonymousConnect();
+               peer.remoteOpen().queue();
+               peer.remoteBegin().queue();
+               
peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress(getName()).withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list",
 "amqp:rejected:list").also().queue();
+
+               peer.connect("localhost", 61616);
+
+               peer.waitForScriptToCompleteIgnoreErrors();
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+            }
+         });
+
+         Assert.assertTrue(latchCreating.await(10, TimeUnit.SECONDS));
+         server.getRemotingService().getConnections().forEach(r -> {
+            r.fail(new ActiveMQException("it's a simulation"));
+         });
+         blockCreate.countDown();
+         Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+         { // double checking the executor is done
+            CountDownLatch check = new CountDownLatch(1);
+            executorService.execute(check::countDown);
+            Assert.assertTrue(check.await(1, TimeUnit.SECONDS));
+         }
+
+         Thread.sleep(100); // I need some time for the error condition to 
kick in
+
+         Wait.assertEquals(0, server::getConnectionCount, 5000);
+
+         Wait.assertEquals(0, serverQueue::getConsumerCount, 5000);
+      }
+
+   }
+
+   @Test
+   public void testDropCreateConsumerAMQP() throws Throwable {
+      testDropCreateConsumer("AMQP");
+   }
+
+   @Test
+   public void testDropCreateConsumerOPENWIRE() throws Throwable {
+      testDropCreateConsumer("OPENWIRE");
+   }
+
+   @Test
+   public void testDropCreateConsumerCORE() throws Throwable {
+      testDropCreateConsumer("CORE");
+   }
+
+   private void testDropCreateConsumer(String protocol) throws Throwable {
+      CountDownLatch latchCreating = new CountDownLatch(1);
+      CountDownLatch blockCreate = new CountDownLatch(1);
+      CountDownLatch done = new CountDownLatch(1);
+      CountDownLatch doneConsumer = new CountDownLatch(1);
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setAddress(getName()).setAutoCreated(false));
+
+      AtomicBoolean running = new AtomicBoolean(true);
+      ExecutorService executorService = Executors.newFixedThreadPool(1);
+      runAfter(executorService::shutdownNow);
+      runAfter(() -> running.set(false));
+
+      CountDownLatch received = new CountDownLatch(1);
+
+      executorService.execute(() -> {
+         ConnectionFactory connectionFactory;
+
+         switch (protocol) {
+            case "AMQP":
+               connectionFactory = CFUtil.createConnectionFactory(protocol, 
"failover:(amqp://localhost:61616)?failover.maxReconnectAttempts=20");
+               break;
+            case "OPENWIRE":
+               connectionFactory = new 
org.apache.activemq.ActiveMQConnectionFactory( 
"failover:(tcp://localhost:61616)?maxReconnectAttempts=20");
+               break;
+            case "CORE":
+               connectionFactory = new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=true;reconnectAttempts=20;callTimeout=1000");
+               break;
+            default:
+               logger.warn("Invalid protocol {}", protocol);
+               connectionFactory = null;
+         }
+
+         try (Connection connection = connectionFactory.createConnection()) {
+            logger.info("Connected on thread..");
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(getName()));
+            connection.start();
+            while (running.get()) {
+               try {
+                  logger.info("Receiving");
+                  received.countDown();
+                  Message message = consumer.receive(100);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         } finally {
+            doneConsumer.countDown();
+         }
+      });
+
+      Assert.assertTrue(received.await(10, TimeUnit.SECONDS));
+
+      server.registerBrokerPlugin(new ActiveMQServerSessionPlugin() {
+         @Override
+         public void beforeCreateSession(String name,
+                                         String username,
+                                         int minLargeMessageSize,
+                                         RemotingConnection connection,
+                                         boolean autoCommitSends,
+                                         boolean autoCommitAcks,
+                                         boolean preAcknowledge,
+                                         boolean xa,
+                                         String defaultAddress,
+                                         SessionCallback callback,
+                                         boolean autoCreateQueues,
+                                         OperationContext context,
+                                         Map<SimpleString, RoutingType> 
prefixes) throws ActiveMQException {
+            latchCreating.countDown();
+            try {
+               blockCreate.await(10, TimeUnit.HOURS);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            } finally {
+               done.countDown();
+            }
+         }
+      });
+
+
+      server.getRemotingService().getConnections().forEach(r -> {
+         r.fail(new ActiveMQException("it's a simulation"));
+      });
+      Assert.assertTrue(latchCreating.await(10, TimeUnit.SECONDS));
+      server.getRemotingService().getConnections().forEach(r -> {
+         r.fail(new ActiveMQException("it's a simulation 2nd time"));
+      });
+      blockCreate.countDown();
+      Assert.assertTrue(done.await(10, TimeUnit.SECONDS));
+
+      running.set(false);
+      Assert.assertTrue(doneConsumer.await(40, TimeUnit.SECONDS));
+
+      Thread.sleep(100);
+
+      { // double checking the executor is done
+         CountDownLatch check = new CountDownLatch(1);
+         executorService.execute(check::countDown);
+         Assert.assertTrue(check.await(1, TimeUnit.SECONDS));
+      }
+
+      Wait.assertEquals(0, server::getConnectionCount, 5000);
+      Wait.assertEquals(0, serverQueue::getConsumerCount, 5000);
+
+   }
+
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index 7ab6126907..1cdac26a12 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -219,6 +219,7 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
                                        .withNullTarget();
          peer.remoteDetach().withErrorCondition("amqp:unauthorized-access", 
"Not authroized").queue();
          peer.expectDetach().optional();
+         peer.expectClose().optional();
          // Broker reconnect and allow it to attach this time.
          peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
          peer.expectOpen().respond();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index 32ccaec405..fe982c6c9d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -937,6 +937,7 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
             peer.expectDetach().optional(); // Broker is not consistent on 
sending the detach
+            peer.expectClose().optional();
             peer.expectSASLAnonymousConnect();
             peer.expectOpen().respond();
             peer.expectBegin().respond();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
index 770c40db2e..de10d6b6f7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
@@ -90,9 +90,6 @@ public class AMQPMirrorConnectionTest extends 
AmqpClientTestSupport {
          peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
          server.stop();
-
-         // should be no more interactions
-         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
       }
    }
 
@@ -128,9 +125,6 @@ public class AMQPMirrorConnectionTest extends 
AmqpClientTestSupport {
          peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
          server.stop();
-
-         // should be no more interactions
-         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
       }
    }
 
@@ -160,9 +154,6 @@ public class AMQPMirrorConnectionTest extends 
AmqpClientTestSupport {
          peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
          server.stop();
-
-         // should be no more interactions
-         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
       }
    }
 
@@ -205,9 +196,6 @@ public class AMQPMirrorConnectionTest extends 
AmqpClientTestSupport {
          peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
          server.stop();
-
-         // should be no more interactions
-         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
       }
    }
 
@@ -258,9 +246,6 @@ public class AMQPMirrorConnectionTest extends 
AmqpClientTestSupport {
          peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
          server.stop();
-
-         // should be no more interactions
-         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
       }
    }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
new file mode 100644
index 0000000000..707d87c2e4
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.consumer;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.json.JsonArray;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test is simulating an orphaned consumer situation that was fixed in 
ARTEMIS-4476.
+ * the method QueueControl::listConsumersAsJSON should add a field 
orphaned=true in case the consumer is orphaned.
+ */
+public class DetectOrphanedConsumerTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Test
+   public void testOrphanedConsumerCORE() throws Exception {
+      testOrphanedConsumer("CORE");
+   }
+
+   @Test
+   public void testOrphanedConsumerAMQP() throws Exception {
+      testOrphanedConsumer("AMQP");
+   }
+
+   @Test
+   public void testOrphanedConsumerOpenWire() throws Exception {
+      testOrphanedConsumer("OPENWIRE");
+   }
+
+   private void testOrphanedConsumer(String protocol) throws Exception {
+
+      ActiveMQServer server = createServer(false, createDefaultConfig(true));
+      server.start();
+
+      Queue queue = server.createQueue(new 
QueueConfiguration(getName()).setDurable(true).setName(getName()).setRoutingType(RoutingType.ANYCAST));
+
+      ConnectionFactory connectionFactory = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+      Connection connection = connectionFactory.createConnection();
+
+      //////////////////////////////////////////////////////
+      // this close is to be done after the test is done
+      runAfter(connection::close);
+      //////////////////////////////////////////////////////
+
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+      // this consumer is never used here.
+      MessageConsumer willBeOrphaned = 
session.createConsumer(session.createQueue(getName()));
+
+      Wait.assertEquals(1, queue::getConsumerCount, 5000);
+
+      QueueControl queueControl = (QueueControl) 
server.getManagementService().getResource("queue." + 
queue.getName().toString());
+      Assert.assertNotNull(queueControl);
+
+      String result = queueControl.listConsumersAsJSON();
+      logger.debug("json: {}", result);
+
+      JsonArray resultArray = JsonUtil.readJsonArray(result);
+      Assert.assertEquals(1, resultArray.size());
+      
Assert.assertFalse(resultArray.getJsonObject(0).containsKey(ConsumerField.ORPHANED.getName()));
+
+      queue.getConsumers().forEach(c -> {
+         ServerConsumerImpl serverConsumer = (ServerConsumerImpl) c;
+         logger.debug("Removing connection for {} on connectionID {}", 
serverConsumer, serverConsumer.getConnectionID());
+         Object removed = 
server.getRemotingService().removeConnection(serverConsumer.getConnectionID());
+         logger.debug("removed {}", removed);
+      });
+
+      result = queueControl.listConsumersAsJSON();
+
+      logger.debug("json: {}", result);
+
+      resultArray = JsonUtil.readJsonArray(result);
+      Assert.assertEquals(1, resultArray.size());
+      
Assert.assertTrue(resultArray.getJsonObject(0).containsKey(ConsumerField.ORPHANED.getName()));
+      
Assert.assertTrue(resultArray.getJsonObject(0).getBoolean(ConsumerField.ORPHANED.getName()));
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/OrphanedConsumerDefenseTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/OrphanedConsumerDefenseTest.java
new file mode 100644
index 0000000000..2852dde1b9
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/OrphanedConsumerDefenseTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.consumer;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection;
+import org.apache.activemq.artemis.core.security.SecurityStore;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
+import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.transaction.ResourceManager;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
+import 
org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
+import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import 
org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
+import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Sender;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * as worked through ARTEMIS-4476 we fixed the possibility of a ghost consumer 
(a term coined by a user),
+ * where the connection is gone but the consumer still in memory.
+ * <p>
+ * The fix involved on calling the disconnect from the proper threads.
+ * <p>
+ * And as a line of defense the ServerConsumer and AMQP handler are also 
validating the connection states.
+ * If a connection is in destroyed state tries to create a consumer, the 
system should throw an error.
+ */
+public class OrphanedConsumerDefenseTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Test
+   public void testDefendNPESession() throws Exception {
+      ActiveMQServerImpl server = Mockito.mock(ActiveMQServerImpl.class);
+      SessionCallback sessionCallback = Mockito.mock(SessionCallback.class);
+      ManagementService managementService = 
Mockito.mock(ManagementService.class);
+
+      try {
+         new ServerConsumerImpl(1, null, null, null, 1, true, false, new 
NullStorageManager(), sessionCallback, true, true, managementService, false, 0, 
server);
+         Assert.fail("Exception was expected");
+      } catch (NullPointerException e) {
+         logger.debug("Expected exception", e);
+      }
+   }
+
+   @Test
+   public void testDefendDestroyedConnection() throws Exception {
+      ActiveMQServerImpl server = Mockito.mock(ActiveMQServerImpl.class);
+
+      SessionCallback sessionCallback = Mockito.mock(SessionCallback.class);
+
+      ManagementService managementService = 
Mockito.mock(ManagementService.class);
+
+      StorageManager storageManager = new NullStorageManager();
+
+      ArtemisExecutor artemisExecutor = Mockito.mock(ArtemisExecutor.class);
+      ExecutorFactory executorFactory = Mockito.mock(ExecutorFactory.class);
+      Mockito.when(executorFactory.getExecutor()).thenReturn(artemisExecutor);
+
+      Configuration configuration = new ConfigurationImpl();
+
+      ActiveMQServer mockServer = Mockito.mock(ActiveMQServer.class);
+      
Mockito.when(mockServer.getExecutorFactory()).thenReturn(executorFactory);
+      Mockito.when(mockServer.getConfiguration()).thenReturn(configuration);
+
+      BufferHandler bufferHandler = Mockito.mock(BufferHandler.class);
+      InVMConnection inVMConnection = new InVMConnection(1, bufferHandler, 
Mockito.mock(BaseConnectionLifeCycleListener.class), artemisExecutor);
+
+      RemotingConnectionImpl remotingConnection = new 
RemotingConnectionImpl(new ServerPacketDecoder(storageManager), inVMConnection, 
new ArrayList<>(), new ArrayList<>(), RandomUtil.randomSimpleString(), 
artemisExecutor);
+      remotingConnection.destroy();
+      ServerSessionImpl session = new 
ServerSessionImpl(RandomUtil.randomString(), RandomUtil.randomString(), 
RandomUtil.randomString(),
+                                                        
RandomUtil.randomString(), 1000, true, true, true, true, true,
+                                                        remotingConnection, 
storageManager, Mockito.mock(PostOffice.class), 
Mockito.mock(ResourceManager.class), Mockito.mock(SecurityStore.class), 
Mockito.mock(ManagementService.class), mockServer, 
RandomUtil.randomSimpleString(), RandomUtil.randomSimpleString(), 
Mockito.mock(SessionCallback.class), Mockito.mock(OperationContext.class), 
Mockito.mock(PagingManager.class), new HashMap<>(), "securityDomain", false);
+
+      try {
+         new ServerConsumerImpl(1, session, null, null, 1, true, false, new 
NullStorageManager(), sessionCallback, true, true, managementService, false, 0, 
server);
+         Assert.fail("Exception was expected");
+      } catch (ActiveMQException activeMQException) {
+         logger.info("Expected exception", activeMQException);
+         
Assert.assertTrue(activeMQException.getMessage().contains("AMQ229250"));
+      }
+   }
+
+   @Test
+   public void testDefendAMQPConsumerNullConnection() throws Exception {
+      testDefendAMQPConsumer(true);
+   }
+
+   @Test
+   public void testDefendAMQPConsumer() throws Exception {
+      testDefendAMQPConsumer(false);
+   }
+
+   private void testDefendAMQPConsumer(boolean returnNullConnection) throws 
Exception {
+      ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
+      ProtonProtocolManagerFactory protonProtocolManagerFactory = new 
ProtonProtocolManagerFactory();
+      ProtonProtocolManager protonProtocolManager = new 
ProtonProtocolManager(protonProtocolManagerFactory, server, new ArrayList<>(), 
new ArrayList<>());
+      AMQPConnectionContext connectionContext = 
Mockito.mock(AMQPConnectionContext.class);
+
+      if (returnNullConnection) {
+         Mockito.when(connectionContext.getHandler()).thenReturn(null);
+      } else {
+         ProtonHandler protonHandler = Mockito.mock(ProtonHandler.class);
+         
Mockito.when(connectionContext.getHandler()).thenReturn(protonHandler);
+         Connection qpidConnection = Mockito.mock(Connection.class);
+         
Mockito.when(qpidConnection.getRemoteState()).thenReturn(EndpointState.CLOSED);
+         
Mockito.when(protonHandler.getConnection()).thenReturn(qpidConnection);
+      }
+
+      
Mockito.when(connectionContext.getProtocolManager()).thenReturn(protonProtocolManager);
+      Sender sender = Mockito.mock(Sender.class);
+      AMQPSessionContext sessionContext = 
Mockito.mock(AMQPSessionContext.class);
+      AMQPSessionCallback sessionCallback = 
Mockito.mock(AMQPSessionCallback.class);
+
+      try {
+         ProtonServerSenderContext serverSenderContext = new 
ProtonServerSenderContext(connectionContext, sender, sessionContext, 
sessionCallback);
+         serverSenderContext.initialize();
+      } catch (ActiveMQException e) {
+         Assert.assertTrue(e.getMessage().contains("AMQ119027"));
+         logger.warn(e.getMessage(), e);
+      }
+   }
+}
\ No newline at end of file
diff --git 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionDroppedLeakTest.java
 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionDroppedLeakTest.java
new file mode 100644
index 0000000000..a180c20166
--- /dev/null
+++ 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionDroppedLeakTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.leak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.ServerStatus;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.activemq.artemis.tests.leak.MemoryAssertions.assertMemory;
+import static 
org.apache.activemq.artemis.tests.leak.MemoryAssertions.basicMemoryAsserts;
+
+public class ConnectionDroppedLeakTest extends ActiveMQTestBase {
+
+   private ConnectionFactory createConnectionFactory(String protocol) {
+      if (protocol.equals("AMQP")) {
+         return CFUtil.createConnectionFactory("AMQP", 
"amqp://localhost:61616?amqp.idleTimeout=120000&failover.maxReconnectAttempts=1&jms.prefetchPolicy.all=10&jms.forceAsyncAcks=true");
+      } else {
+         return CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      }
+   }
+
+   private static final String QUEUE_NAME = "QUEUE_DROP";
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   Queue serverQueue;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+      Assume.assumeTrue(CheckLeak.isLoaded());
+   }
+
+   @After
+   public void validateServer() throws Exception {
+      CheckLeak checkLeak = new CheckLeak();
+
+      // I am doing this check here because the test method might hold a 
client connection
+      // so this check has to be done after the test, and before the server is 
stopped
+      assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
+
+      server.stop();
+
+      server = null;
+      serverQueue = null;
+
+      clearServers();
+      ServerStatus.clear();
+
+      assertMemory(checkLeak, 0, ActiveMQServerImpl.class.getName());
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      server = createServer(true, createDefaultConfig(1, true));
+      server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2);
+      server.start();
+      server.addAddressInfo(new 
AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
+
+      serverQueue = server.createQueue(new 
QueueConfiguration().setAddress(QUEUE_NAME).setName(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+   }
+
+   @Test
+   public void testDropConnectionsAMQP() throws Exception {
+      doDropConnections("AMQP");
+   }
+
+   @Test
+   public void testDropConnectionsCORE() throws Exception {
+      doDropConnections("CORE");
+   }
+
+   @Test
+   public void testDropConnectionsOPENWIRE() throws Exception {
+      doDropConnections("OPENWIRE");
+   }
+
+   private void doDropConnections(String protocol) throws Exception {
+      basicMemoryAsserts();
+
+      CountDownLatch latchDone = new CountDownLatch(2);
+      CountDownLatch latchReceived = new CountDownLatch(50);
+      AtomicInteger errors = new AtomicInteger(2);
+      AtomicBoolean running = new AtomicBoolean(true);
+
+      ExecutorService executorService = Executors.newFixedThreadPool(2);
+      runAfter(executorService::shutdownNow);
+      runAfter(() -> running.set(false));
+
+      executorService.execute(() -> {
+         ConnectionFactory cf = createConnectionFactory(protocol);
+         Connection connection = null;
+         try {
+            connection = cf.createConnection(); // I will leave this open on 
purpose
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+            while (running.get()) {
+               Message message = consumer.receive(100);
+               if (message != null) {
+                  latchReceived.countDown();
+                  session.commit();
+               }
+            }
+         } catch (Exception e) {
+            errors.incrementAndGet();
+         } finally {
+            if (protocol.equals("OPENWIRE")) {
+               try {
+                  connection.close();// only closing the openwire as it would 
leave a hanging thread
+               } catch (Throwable ignored) {
+               }
+            }
+            latchDone.countDown();
+         }
+      });
+
+      executorService.execute(() -> {
+         ConnectionFactory cf = createConnectionFactory(protocol);
+         Connection connection = null;
+         try {
+            connection = cf.createConnection(); // I will leave this open on 
purpose
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+            while (running.get()) {
+               producer.send(session.createTextMessage("hello"));
+               session.commit();
+            }
+         } catch (Exception e) {
+            errors.incrementAndGet();
+         } finally {
+            if (protocol.equals("OPENWIRE")) {
+               try {
+                  connection.close();// only closing the openwire as it may 
leave a hanging thread
+               } catch (Throwable ignored) {
+               }
+            }
+            latchDone.countDown();
+         }
+      });
+
+      Assert.assertTrue(latchReceived.await(10, TimeUnit.SECONDS));
+
+      server.getRemotingService().getConnections().forEach(r -> {
+         r.fail(new ActiveMQException("it's a simulation"));
+      });
+
+      Assert.assertTrue(latchDone.await(30, TimeUnit.SECONDS));
+      running.set(false);
+
+      serverQueue.deleteAllReferences();
+
+      basicMemoryAsserts();
+   }
+}
\ No newline at end of file
diff --git 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
index 617933915f..42917495c6 100644
--- 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
+++ 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
@@ -128,6 +128,17 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
       int MESSAGES = 20;
       basicMemoryAsserts();
 
+      // The separate method here exists to ensure basicMemoryAsserts runs 
correctly.
+      // Since I'm evaluating RemotingConnectionImpl the connectionFactory 
inside the method may still
+      // hold a reference that would only be released at the end of the method,
+      // because of that I need to make these calls on a separate method.
+      // I tried not needing this by creating a context but that was not 
enough to release the references.
+      doManyConsumers(protocol, REPEATS, MESSAGES, checkLeak);
+
+      basicMemoryAsserts();
+   }
+
+   private void doManyConsumers(String protocol, int REPEATS, int MESSAGES, 
CheckLeak checkLeak) throws Exception {
       ConnectionFactory cf = createConnectionFactory(protocol);
 
       try (Connection producerConnection = cf.createConnection(); Connection 
consumerConnection = cf.createConnection()) {
@@ -175,7 +186,6 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
 
       assertMemory(new CheckLeak(), 0, ServerConsumerImpl.class.getName());
 
-
       // this is just to drain the messages
       try (Connection targetConnection = cf.createConnection(); Connection 
consumerConnection = cf.createConnection()) {
          Session targetSession = targetConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -205,10 +215,8 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
          ((ActiveMQConnectionFactory)cf).close();
       }
 
-      basicMemoryAsserts();
    }
 
-
    @Test
    public void testCancelledDeliveries() throws Exception {
       doTestCancelledDelivery("AMQP");
diff --git 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
index 47e4cab2b9..dc926e517c 100644
--- 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
+++ 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
@@ -20,10 +20,13 @@ package org.apache.activemq.artemis.tests.leak;
 import java.lang.invoke.MethodHandles;
 
 import io.github.checkleak.core.CheckLeak;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
+import 
org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
@@ -42,6 +45,9 @@ public class MemoryAssertions {
       assertMemory(checkLeak, 0, OpenWireConnection.class.getName());
       assertMemory(checkLeak, 0, ProtonServerSenderContext.class.getName());
       assertMemory(checkLeak, 0, ProtonServerReceiverContext.class.getName());
+      assertMemory(checkLeak, 0, 
ActiveMQProtonRemotingConnection.class.getName());
+      assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
+      assertMemory(checkLeak, 0, ServerSessionImpl.class.getName());
       assertMemory(checkLeak, 0, AMQPSessionContext.class.getName());
       assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName());
       assertMemory(checkLeak, 0, RoutingContextImpl.class.getName());

Reply via email to