This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 18692ec3c4ba2662050def03471c7cb25d93b4b1 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Sun Nov 5 12:19:53 2023 -0500 ARTEMIS-4476 Fixing Ghost consumer situation with AMQP --- .../apache/activemq/artemis/logs/AuditLogger.java | 7 + .../broker/ActiveMQProtonRemotingConnection.java | 35 +- .../amqp/logger/ActiveMQAMQPProtocolLogger.java | 3 + .../logger/ActiveMQAMQPProtocolMessageBundle.java | 3 + .../amqp/proton/ProtonServerSenderContext.java | 26 +- .../amqp/proton/ProtonServerSenderContextTest.java | 10 + .../core/protocol/openwire/OpenWireConnection.java | 4 +- .../management/impl/ActiveMQServerControlImpl.java | 2 +- .../core/management/impl/QueueControlImpl.java | 5 + .../core/management/impl/view/ConsumerField.java | 4 +- .../core/remoting/server/RemotingService.java | 2 + .../remoting/server/impl/RemotingServiceImpl.java | 3 +- .../artemis/core/server/ActiveMQMessageBundle.java | 3 + .../artemis/core/server/impl/QueueImpl.java | 2 +- .../core/server/impl/ServerConsumerImpl.java | 20 +- .../management/impl/ManagementServiceImpl.java | 2 + tests/integration-tests-isolated/pom.xml | 33 ++ .../isolated/client/ConnectionDroppedTest.java | 502 +++++++++++++++++++++ .../amqp/connect/AMQPFederationConnectTest.java | 1 + .../connect/AMQPFederationQueuePolicyTest.java | 1 + .../amqp/connect/AMQPMirrorConnectionTest.java | 15 - .../consumer/DetectOrphanedConsumerTest.java | 115 +++++ .../consumer/OrphanedConsumerDefenseTest.java | 169 +++++++ .../tests/leak/ConnectionDroppedLeakTest.java | 202 +++++++++ .../artemis/tests/leak/ConnectionLeakTest.java | 14 +- .../artemis/tests/leak/MemoryAssertions.java | 6 + 26 files changed, 1153 insertions(+), 36 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 62fa7ca62e..ee42aa5c14 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2675,4 +2675,11 @@ public interface AuditLogger { @LogMessage(id = 601771, value = "User {} is getting name on target resource: {}", level = LogMessage.Level.INFO) void getCurrentTimeMillis(Object source); + static void verifyConnections(Object source) { + BASE_LOGGER.verifyConnections(getCaller(), source); + } + + @LogMessage(id = 601772, value = "User {} is calling verifyConnections on resource: {}", level = LogMessage.Level.INFO) + void verifyConnections(String user, Object queue); + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java index 9ceea15655..bb6ad2e4ba 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java @@ -17,11 +17,11 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import javax.security.auth.Subject; +import java.lang.invoke.MethodHandles; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; @@ -29,12 +29,17 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.EndpointState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is a Server's Connection representation used by ActiveMQ Artemis. */ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final AMQPConnectionContext amqpConnection; private final ProtonProtocolManager manager; @@ -73,17 +78,31 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection destroyed = true; - //filter it like the other protocols - if (!(me instanceof ActiveMQRemoteDisconnectException)) { - ActiveMQClientLogger.LOGGER.connectionFailureDetected(amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress(), me.getMessage(), me.getType()); + if (logger.isDebugEnabled()) { + try { + logger.debug("Connection failure detected. amqpConnection.getHandler().getConnection().getRemoteState() = {}, remoteIP={}", amqpConnection.getHandler().getConnection().getRemoteState(), amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress()); + } catch (Throwable e) { // just to avoid a possible NPE from the debug statement itself + logger.debug(e.getMessage(), e); + } } - // Then call the listeners - callFailureListeners(me, scaleDownTargetNodeID); + try { + if (amqpConnection.getHandler().getConnection().getRemoteState() != EndpointState.CLOSED) { + // A remote close was received on the client, on that case it's just a normal operation and we don't need to log this. + ActiveMQClientLogger.LOGGER.connectionFailureDetected(amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress(), me.getMessage(), me.getType()); + } + } catch (Throwable e) { // avoiding NPEs from te logging statement. I don't think this would happen, but just in case + logger.warn(e.getMessage(), e); + } - callClosingListeners(); + amqpConnection.runNow(() -> { + // Then call the listeners + callFailureListeners(me, scaleDownTargetNodeID); - internalClose(); + callClosingListeners(); + + internalClose(); + }); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java index 7d29be97ac..4ab69e6a55 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java @@ -55,4 +55,7 @@ public interface ActiveMQAMQPProtocolLogger { @LogMessage(id = 111006, value = "Unable to send message {} to Dead Letter Address.", level = LogMessage.Level.WARN) void unableToSendMessageToDLA(MessageReference ref, Throwable t); + + @LogMessage(id = 111007, value = "Invalid Connection State: {} for remote IP {}", level = LogMessage.Level.WARN) + void invalidAMQPConnectionState(Object state, Object remoteIP); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java index 561122f02c..110b1d5662 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java @@ -108,4 +108,7 @@ public interface ActiveMQAMQPProtocolMessageBundle { @Message(id = 119026, value = "Malformed Federation control message: {}") ActiveMQException malformedFederationControlMessage(String address); + + @Message(id = 119027, value = "Invalid AMQPConnection Remote State: {}") + ActiveMQException invalidAMQPConnectionState(Object state); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index fe92213930..12e905b656 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -60,6 +60,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; +import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler; import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; @@ -92,6 +93,7 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.WritableBuffer; +import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Link; @@ -295,7 +297,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } catch (ActiveMQException e) { throw e; } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage()); + ActiveMQAMQPInternalErrorException internalErrorException = ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage()); + internalErrorException.initCause(e); + throw internalErrorException; } } @@ -980,6 +984,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr @Override public Consumer init(ProtonServerSenderContext senderContext) throws Exception { + validateConnectionState(); + Source source = (Source) sender.getRemoteSource(); final Map<Symbol, Object> supportedFilters = new HashMap<>(); @@ -1334,4 +1340,22 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } } + + private void validateConnectionState() throws ActiveMQException { + ProtonHandler handler = null; + Connection qpidConnection = null; + + if (connection == null || (handler = connection.getHandler()) == null || (qpidConnection = handler.getConnection()) == null) { + if (logger.isDebugEnabled()) { + logger.debug("validateConnectionState:: connection={}, handler={}, qpidConnection={}", connection, handler, qpidConnection); + } + + ActiveMQAMQPProtocolLogger.LOGGER.invalidAMQPConnectionState("null", "null"); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.invalidAMQPConnectionState("null"); + } + if (qpidConnection.getRemoteState() == EndpointState.CLOSED) { + ActiveMQAMQPProtocolLogger.LOGGER.invalidAMQPConnectionState(qpidConnection.getRemoteState(), connection.getRemoteAddress()); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.invalidAMQPConnectionState(qpidConnection.getRemoteState()); + } + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java index bd8070ad29..0f9ed45af0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContextTest.java @@ -21,7 +21,10 @@ import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; +import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler; import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Sender; import org.junit.Test; @@ -41,6 +44,13 @@ public class ProtonServerSenderContextTest { when(mock.getServer()).thenReturn(mock(ActiveMQServer.class)); Sender mockSender = mock(Sender.class); AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class); + + ProtonHandler handler = mock(ProtonHandler.class); + Connection connection = mock(Connection.class); + when(connection.getRemoteState()).thenReturn(EndpointState.ACTIVE); + when(mockConnContext.getHandler()).thenReturn(handler); + when(handler.getConnection()).thenReturn(connection); + when(mockConnContext.getProtocolManager()).thenReturn(mock); AMQPSessionCallback mockSessionCallback = mock(AMQPSessionCallback.class); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 0956c2adca..77a095ae63 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -148,8 +148,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private final OpenWireProtocolManager protocolManager; - private boolean destroyed = false; - private volatile ScheduledFuture ttlCheck; //separated in/out wireFormats allow deliveries (eg async and consumers) to not slow down bufferReceived @@ -1233,6 +1231,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } dispatchAsync(command); } + // During a chanceClientID a disconnect could have been sent by the client, and the client will then re-issue a connect packet + destroyed = false; return null; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index b433f8140a..0cd2e229f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -2297,7 +2297,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active RemotingConnection connection = null; for (RemotingConnection potentialConnection : remotingService.getConnections()) { - if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) { + if (potentialConnection.getID().toString().equals(String.valueOf(serverConsumer.getConnectionID()))) { connection = potentialConnection; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 5b37b6570d..e7be97e596 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -1841,6 +1841,11 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { .add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), serverConsumer.getMessagesAcknowledgedAwaitingCommit()) .add(ConsumerField.LAST_DELIVERED_TIME.getName(), serverConsumer.getLastDeliveredTime()) .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), serverConsumer.getLastAcknowledgedTime()); + + if (server.getRemotingService().getConnection(((ServerConsumer) consumer).getConnectionID()) == null) { + obj.add(ConsumerField.ORPHANED.getName(), true); + } + jsonArray.add(obj); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java index c4066a19a5..5c7c90b921 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java @@ -42,7 +42,9 @@ public enum ConsumerField { MESSAGES_ACKNOWLEDGED("messagesAcknowledged"), MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT("messagesAcknowledgedAwaitingCommit"), LAST_DELIVERED_TIME("lastDeliveredTime"), - LAST_ACKNOWLEDGED_TIME("lastAcknowledgedTime"); + LAST_ACKNOWLEDGED_TIME("lastAcknowledgedTime"), + ORPHANED("orphaned"); + private static final Map<String, ConsumerField> lookup = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java index 5014d114d7..478146d733 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java @@ -44,6 +44,8 @@ public interface RemotingService { */ RemotingConnection removeConnection(Object remotingConnectionID); + RemotingConnection getConnection(Object remotingConnectionID); + Set<RemotingConnection> getConnections(); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 20d777bd43..b0fb4efd00 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -460,7 +460,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif return started; } - private RemotingConnection getConnection(final Object remotingConnectionID) { + @Override + public RemotingConnection getConnection(final Object remotingConnectionID) { ConnectionEntry entry = connections.get(remotingConnectionID); if (entry != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 5de85adb7f..36477dd974 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -539,4 +539,7 @@ public interface ActiveMQMessageBundle { @Message(id = 229249, value = "Invalid Store property, only DATABASE property is supported") RuntimeException unsupportedStorePropertyType(); + + @Message(id = 229250, value = "Connection has been marked as destroyed for remote connection {}.") + ActiveMQException connectionDestroyed(String remoteAddress); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index a50938ffe7..7f7cae750e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -4762,7 +4762,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { RemotingService remotingService = server.getRemotingService(); for (RemotingConnection potentialConnection : remotingService.getConnections()) { - if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) { + if (potentialConnection.getID().toString().equals(String.valueOf(serverConsumer.getConnectionID()))) { connection = potentialConnection; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index c9d1215ae1..f534795c5d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -219,6 +219,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { final boolean supportLargeMessage, final Integer credits, final ActiveMQServer server) throws Exception { + + + if (session == null || session.getRemotingConnection() == null) { + throw new NullPointerException("session = " + session); + } + + if (session != null && session.getRemotingConnection() != null && session.getRemotingConnection().isDestroyed()) { + throw ActiveMQMessageBundle.BUNDLE.connectionDestroyed(session.getRemotingConnection().getRemoteAddress()); + } + this.id = id; this.sequentialID = server.getStorageManager().generateID(); @@ -356,8 +366,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } @Override - public String getConnectionID() { - return this.session.getConnectionID().toString(); + public Object getConnectionID() { + return this.session.getConnectionID(); } @Override @@ -1575,7 +1585,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public String getConnectionRemoteAddress() { - return this.session.getRemotingConnection().getTransportConnection().getRemoteAddress(); + if (this.session == null || this.session.getRemotingConnection() == null || this.session.getRemotingConnection().getTransportConnection() == null) { + return null; + } else { + return this.session.getRemotingConnection().getTransportConnection().getRemoteAddress(); + } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 92c523d1c5..6a1785d793 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -619,6 +619,8 @@ public class ManagementServiceImpl implements ManagementService { public synchronized void registerInRegistry(final String resourceName, final Object managedResource) { unregisterFromRegistry(resourceName); + logger.debug("Registering {} as {}", resourceName, managedResource); + registry.put(resourceName, managedResource); } diff --git a/tests/integration-tests-isolated/pom.xml b/tests/integration-tests-isolated/pom.xml index 7317fb4e40..d0db972418 100644 --- a/tests/integration-tests-isolated/pom.xml +++ b/tests/integration-tests-isolated/pom.xml @@ -70,6 +70,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>artemis-openwire-protocol</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-unit-test-support</artifactId> @@ -194,6 +200,33 @@ <artifactId>jakarta.json-api</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>protonj2-test-driver</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>jakarta.jms</groupId> + <artifactId>jakarta.jms-api</artifactId> + </dependency> + <dependency> + <groupId>jakarta.management.j2ee</groupId> + <artifactId>jakarta.management.j2ee-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-j2ee-management_1.1_spec</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> diff --git a/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java new file mode 100644 index 0000000000..b65cf47c0e --- /dev/null +++ b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.isolated.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.activemq.artemis.utils.ThreadDumpUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.qpid.protonj2.test.driver.ProtonTestClient; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectionDroppedTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public ConnectionDroppedTest() { + // this is the reason why I'm putting this test on the "isolated" package. + disableCheckThread(); + } + + @Test(timeout = 20_000) + public void testConsumerDroppedWithProtonTestClient() throws Exception { + int NUMBER_OF_CONNECTIONS = 100; + ActiveMQServer server = createServer(true, createDefaultConfig(true)); + server.start(); + Queue serverQueue = server.createQueue(new QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false)); + + ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS); + runAfter(executorService::shutdownNow); + + CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS); + AtomicInteger errors = new AtomicInteger(0); + + for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) { + executorService.execute(() -> { + try (ProtonTestClient peer = new ProtonTestClient()) { + peer.queueClientSaslAnonymousConnect(); + peer.remoteOpen().queue(); + peer.expectOpen(); + peer.remoteBegin().queue(); + peer.expectBegin(); + peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list", "amqp:rejected:list").also().queue(); + peer.dropAfterLastHandler(1000); // This closes the netty connection after the attach is written + peer.connect("localhost", 61616); + + // Waits for all the commands to fire and the drop action to be run. + peer.waitForScriptToComplete(); + } catch (Throwable e) { + errors.incrementAndGet(); + logger.warn(e.getMessage(), e); + } finally { + done.countDown(); + } + }); + } + + Assert.assertTrue(done.await(10, TimeUnit.SECONDS)); + + Assert.assertEquals(0, errors.get()); + + Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100); + } + + @Test(timeout = 20_000) + public void testRegularClose() throws Exception { + int NUMBER_OF_CONNECTIONS = 100; + int REPEATS = 10; + ActiveMQServer server = createServer(true, createDefaultConfig(true)); + server.start(); + Queue serverQueue = server.createQueue(new QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false)); + + ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS); + runAfter(executorService::shutdownNow); + + CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS); + AtomicInteger errors = new AtomicInteger(0); + AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler(); + runAfter(loggerHandler::stop); + + for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) { + executorService.execute(() -> { + for (int r = 0; r < REPEATS; r++) { + try (ProtonTestClient peer = new ProtonTestClient()) { + peer.queueClientSaslAnonymousConnect(); + peer.remoteOpen().queue(); + peer.expectOpen(); + peer.remoteBegin().queue(); + peer.expectBegin(); + peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("test-queue").withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list", "amqp:rejected:list").also().queue(); + peer.expectAttach(); + peer.remoteClose().queue(); + peer.expectClose(); + + peer.connect("localhost", 61616); + + peer.waitForScriptToComplete(); + } catch (Throwable e) { + errors.incrementAndGet(); + logger.warn(e.getMessage(), e); + break; + } + } + done.countDown(); + }); + } + + Assert.assertTrue(done.await(10, TimeUnit.SECONDS)); + + Assert.assertEquals(0, errors.get()); + + Assert.assertFalse(loggerHandler.findText("AMQ212037")); + + // TODO: Fix these as part of ARTEMIS-4483 + /*Assert.assertFalse(loggerHandler.findText("Connection failure")); + Assert.assertFalse(loggerHandler.findText("REMOTE_DISCONNECT")); + Assert.assertFalse(loggerHandler.findText("AMQ222061")); + Assert.assertFalse(loggerHandler.findText("AMQ222107")); */ + + Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100); + Wait.assertEquals(0, server::getConnectionCount, 5000); + + } + + @Test + public void testConsumerDroppedAMQP() throws Throwable { + testConsumerDroppedWithRegularClient("AMQP"); + + } + + @Test + public void testConsumerDroppedCORE() throws Throwable { + testConsumerDroppedWithRegularClient("CORE"); + } + + @Test + public void testConsumerDroppedOpenWire() throws Throwable { + testConsumerDroppedWithRegularClient("OPENWIRE"); + } + + public void testConsumerDroppedWithRegularClient(final String protocol) throws Throwable { + int NUMBER_OF_CONNECTIONS = 25; + int REPEATS = 10; + ActiveMQServer server = createServer(true, createDefaultConfig(true)); + server.start(); + Queue serverQueue = server.createQueue(new QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false)); + + ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_CONNECTIONS); + runAfter(executorService::shutdownNow); + + CountDownLatch done = new CountDownLatch(NUMBER_OF_CONNECTIONS); + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + + final AtomicBoolean running = new AtomicBoolean(true); + + runAfter(() -> running.set(false)); + + CyclicBarrier flagStart = new CyclicBarrier(NUMBER_OF_CONNECTIONS + 1); + flagStart.reset(); + + for (int i = 0; i < NUMBER_OF_CONNECTIONS; i++) { + final int t = i; + executorService.execute(() -> { + try { + boolean alreadyStarted = false; + AtomicBoolean ex = new AtomicBoolean(true); + while (running.get()) { + try { + // do not be tempted to use try (connection = factory.createConnection()) + // this is because we don't need to close the connection after a network failure on this test. + Connection connection = factory.createConnection(); + + synchronized (ConnectionDroppedTest.this) { + runAfter(connection::close); + } + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + ex.set(true); + } + }); + flagStart.await(60, TimeUnit.SECONDS); + + connection.start(); + + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue jmsQueue = session.createQueue("test-queue"); + + while (running.get() && !ex.get()) { + if (!alreadyStarted) { + alreadyStarted = true; + } + System.out.println("Consumer"); + MessageConsumer consumer = session.createConsumer(jmsQueue); + Thread.sleep(500); + } + + if (!protocol.equals("CORE")) { + connection.close(); + } + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } + } + } finally { + done.countDown(); + } + }); + } + + for (int i = 0; i < REPEATS; i++) { + try { + flagStart.await(60, TimeUnit.SECONDS); // align all the clients at the same spot + } catch (Throwable throwable) { + logger.info(ThreadDumpUtil.threadDump("timed out flagstart")); + throw throwable; + } + + logger.info("*******************************************************************************************************************************\nloop kill {}" + "\n*******************************************************************************************************************************", i); + server.getRemotingService().getConnections().forEach(r -> { + r.fail(new ActiveMQException("it's a simulation")); + }); + + } + + running.set(false); + try { + flagStart.await(1, TimeUnit.SECONDS); + } catch (Exception ignored) { + } + if (!done.await(10, TimeUnit.SECONDS)) { + logger.warn(ThreadDumpUtil.threadDump("Threads are still running")); + Assert.fail("Threads are still running"); + } + + Wait.assertEquals(0, () -> serverQueue.getConsumers().size(), 5000, 100); + + } + + @Test + public void testDropConsumerProtonJ2TestClient() throws Throwable { + ReusableLatch latchCreating = new ReusableLatch(1); + ReusableLatch blockCreate = new ReusableLatch(1); + ReusableLatch done = new ReusableLatch(1); + ActiveMQServer server = createServer(true, createDefaultConfig(true)); + server.start(); + + int TEST_REPEATS = 4; + + server.registerBrokerPlugin(new ActiveMQServerSessionPlugin() { + @Override + public void beforeCreateSession(String name, + String username, + int minLargeMessageSize, + RemotingConnection connection, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + boolean xa, + String defaultAddress, + SessionCallback callback, + boolean autoCreateQueues, + OperationContext context, + Map<SimpleString, RoutingType> prefixes) throws ActiveMQException { + latchCreating.countDown(); + try { + blockCreate.await(10, TimeUnit.HOURS); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } finally { + done.countDown(); + } + } + }); + + AtomicBoolean running = new AtomicBoolean(true); + ExecutorService executorService = Executors.newFixedThreadPool(1); + runAfter(executorService::shutdownNow); + runAfter(() -> running.set(false)); + Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setAddress(getName()).setAutoCreated(false)); + + for (int i = 0; i < TEST_REPEATS; i++) { + Assert.assertEquals(0, serverQueue.getConsumerCount()); + latchCreating.setCount(1); + blockCreate.setCount(1); + done.setCount(1); + + ProtonTestClient peer = new ProtonTestClient(); + + executorService.execute(() -> { + + try { + peer.queueClientSaslAnonymousConnect(); + peer.remoteOpen().queue(); + peer.remoteBegin().queue(); + peer.remoteAttach().ofReceiver().withName(RandomUtil.randomString()).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress(getName()).withExpiryPolicyOnLinkDetach().withDurabilityOfNone().withCapabilities("queue").withOutcomes("amqp:accepted:list", "amqp:rejected:list").also().queue(); + + peer.connect("localhost", 61616); + + peer.waitForScriptToCompleteIgnoreErrors(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + } + }); + + Assert.assertTrue(latchCreating.await(10, TimeUnit.SECONDS)); + server.getRemotingService().getConnections().forEach(r -> { + r.fail(new ActiveMQException("it's a simulation")); + }); + blockCreate.countDown(); + Assert.assertTrue(done.await(10, TimeUnit.SECONDS)); + + { // double checking the executor is done + CountDownLatch check = new CountDownLatch(1); + executorService.execute(check::countDown); + Assert.assertTrue(check.await(1, TimeUnit.SECONDS)); + } + + Thread.sleep(100); // I need some time for the error condition to kick in + + Wait.assertEquals(0, server::getConnectionCount, 5000); + + Wait.assertEquals(0, serverQueue::getConsumerCount, 5000); + } + + } + + @Test + public void testDropCreateConsumerAMQP() throws Throwable { + testDropCreateConsumer("AMQP"); + } + + @Test + public void testDropCreateConsumerOPENWIRE() throws Throwable { + testDropCreateConsumer("OPENWIRE"); + } + + @Test + public void testDropCreateConsumerCORE() throws Throwable { + testDropCreateConsumer("CORE"); + } + + private void testDropCreateConsumer(String protocol) throws Throwable { + CountDownLatch latchCreating = new CountDownLatch(1); + CountDownLatch blockCreate = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(1); + CountDownLatch doneConsumer = new CountDownLatch(1); + ActiveMQServer server = createServer(true, createDefaultConfig(true)); + server.start(); + + Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setAddress(getName()).setAutoCreated(false)); + + AtomicBoolean running = new AtomicBoolean(true); + ExecutorService executorService = Executors.newFixedThreadPool(1); + runAfter(executorService::shutdownNow); + runAfter(() -> running.set(false)); + + CountDownLatch received = new CountDownLatch(1); + + executorService.execute(() -> { + ConnectionFactory connectionFactory; + + switch (protocol) { + case "AMQP": + connectionFactory = CFUtil.createConnectionFactory(protocol, "failover:(amqp://localhost:61616)?failover.maxReconnectAttempts=20"); + break; + case "OPENWIRE": + connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory( "failover:(tcp://localhost:61616)?maxReconnectAttempts=20"); + break; + case "CORE": + connectionFactory = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory("tcp://localhost:61616?ha=true;reconnectAttempts=20;callTimeout=1000"); + break; + default: + logger.warn("Invalid protocol {}", protocol); + connectionFactory = null; + } + + try (Connection connection = connectionFactory.createConnection()) { + logger.info("Connected on thread.."); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(getName())); + connection.start(); + while (running.get()) { + try { + logger.info("Receiving"); + received.countDown(); + Message message = consumer.receive(100); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } finally { + doneConsumer.countDown(); + } + }); + + Assert.assertTrue(received.await(10, TimeUnit.SECONDS)); + + server.registerBrokerPlugin(new ActiveMQServerSessionPlugin() { + @Override + public void beforeCreateSession(String name, + String username, + int minLargeMessageSize, + RemotingConnection connection, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + boolean xa, + String defaultAddress, + SessionCallback callback, + boolean autoCreateQueues, + OperationContext context, + Map<SimpleString, RoutingType> prefixes) throws ActiveMQException { + latchCreating.countDown(); + try { + blockCreate.await(10, TimeUnit.HOURS); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } finally { + done.countDown(); + } + } + }); + + + server.getRemotingService().getConnections().forEach(r -> { + r.fail(new ActiveMQException("it's a simulation")); + }); + Assert.assertTrue(latchCreating.await(10, TimeUnit.SECONDS)); + server.getRemotingService().getConnections().forEach(r -> { + r.fail(new ActiveMQException("it's a simulation 2nd time")); + }); + blockCreate.countDown(); + Assert.assertTrue(done.await(10, TimeUnit.SECONDS)); + + running.set(false); + Assert.assertTrue(doneConsumer.await(40, TimeUnit.SECONDS)); + + Thread.sleep(100); + + { // double checking the executor is done + CountDownLatch check = new CountDownLatch(1); + executorService.execute(check::countDown); + Assert.assertTrue(check.await(1, TimeUnit.SECONDS)); + } + + Wait.assertEquals(0, server::getConnectionCount, 5000); + Wait.assertEquals(0, serverQueue::getConsumerCount, 5000); + + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java index 7ab6126907..1cdac26a12 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java @@ -219,6 +219,7 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport { .withNullTarget(); peer.remoteDetach().withErrorCondition("amqp:unauthorized-access", "Not authroized").queue(); peer.expectDetach().optional(); + peer.expectClose().optional(); // Broker reconnect and allow it to attach this time. peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS"); peer.expectOpen().respond(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java index 32ccaec405..fe982c6c9d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java @@ -937,6 +937,7 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectDetach().optional(); // Broker is not consistent on sending the detach + peer.expectClose().optional(); peer.expectSASLAnonymousConnect(); peer.expectOpen().respond(); peer.expectBegin().respond(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java index 770c40db2e..de10d6b6f7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java @@ -90,9 +90,6 @@ public class AMQPMirrorConnectionTest extends AmqpClientTestSupport { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); server.stop(); - - // should be no more interactions - peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } @@ -128,9 +125,6 @@ public class AMQPMirrorConnectionTest extends AmqpClientTestSupport { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); server.stop(); - - // should be no more interactions - peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } @@ -160,9 +154,6 @@ public class AMQPMirrorConnectionTest extends AmqpClientTestSupport { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); server.stop(); - - // should be no more interactions - peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } @@ -205,9 +196,6 @@ public class AMQPMirrorConnectionTest extends AmqpClientTestSupport { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); server.stop(); - - // should be no more interactions - peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } @@ -258,9 +246,6 @@ public class AMQPMirrorConnectionTest extends AmqpClientTestSupport { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); server.stop(); - - // should be no more interactions - peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java new file mode 100644 index 0000000000..707d87c2e4 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.consumer; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.JsonUtil; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.core.management.impl.view.ConsumerField; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; +import org.apache.activemq.artemis.json.JsonArray; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test is simulating an orphaned consumer situation that was fixed in ARTEMIS-4476. + * the method QueueControl::listConsumersAsJSON should add a field orphaned=true in case the consumer is orphaned. + */ +public class DetectOrphanedConsumerTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Test + public void testOrphanedConsumerCORE() throws Exception { + testOrphanedConsumer("CORE"); + } + + @Test + public void testOrphanedConsumerAMQP() throws Exception { + testOrphanedConsumer("AMQP"); + } + + @Test + public void testOrphanedConsumerOpenWire() throws Exception { + testOrphanedConsumer("OPENWIRE"); + } + + private void testOrphanedConsumer(String protocol) throws Exception { + + ActiveMQServer server = createServer(false, createDefaultConfig(true)); + server.start(); + + Queue queue = server.createQueue(new QueueConfiguration(getName()).setDurable(true).setName(getName()).setRoutingType(RoutingType.ANYCAST)); + + ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + Connection connection = connectionFactory.createConnection(); + + ////////////////////////////////////////////////////// + // this close is to be done after the test is done + runAfter(connection::close); + ////////////////////////////////////////////////////// + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // this consumer is never used here. + MessageConsumer willBeOrphaned = session.createConsumer(session.createQueue(getName())); + + Wait.assertEquals(1, queue::getConsumerCount, 5000); + + QueueControl queueControl = (QueueControl) server.getManagementService().getResource("queue." + queue.getName().toString()); + Assert.assertNotNull(queueControl); + + String result = queueControl.listConsumersAsJSON(); + logger.debug("json: {}", result); + + JsonArray resultArray = JsonUtil.readJsonArray(result); + Assert.assertEquals(1, resultArray.size()); + Assert.assertFalse(resultArray.getJsonObject(0).containsKey(ConsumerField.ORPHANED.getName())); + + queue.getConsumers().forEach(c -> { + ServerConsumerImpl serverConsumer = (ServerConsumerImpl) c; + logger.debug("Removing connection for {} on connectionID {}", serverConsumer, serverConsumer.getConnectionID()); + Object removed = server.getRemotingService().removeConnection(serverConsumer.getConnectionID()); + logger.debug("removed {}", removed); + }); + + result = queueControl.listConsumersAsJSON(); + + logger.debug("json: {}", result); + + resultArray = JsonUtil.readJsonArray(result); + Assert.assertEquals(1, resultArray.size()); + Assert.assertTrue(resultArray.getJsonObject(0).containsKey(ConsumerField.ORPHANED.getName())); + Assert.assertTrue(resultArray.getJsonObject(0).getBoolean(ConsumerField.ORPHANED.getName())); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/OrphanedConsumerDefenseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/OrphanedConsumerDefenseTest.java new file mode 100644 index 0000000000..2852dde1b9 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/OrphanedConsumerDefenseTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.consumer; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.HashMap; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder; +import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; +import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection; +import org.apache.activemq.artemis.core.security.SecurityStore; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; +import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; +import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.apache.activemq.artemis.core.transaction.ResourceManager; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; +import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; +import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Sender; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * as worked through ARTEMIS-4476 we fixed the possibility of a ghost consumer (a term coined by a user), + * where the connection is gone but the consumer still in memory. + * <p> + * The fix involved on calling the disconnect from the proper threads. + * <p> + * And as a line of defense the ServerConsumer and AMQP handler are also validating the connection states. + * If a connection is in destroyed state tries to create a consumer, the system should throw an error. + */ +public class OrphanedConsumerDefenseTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Test + public void testDefendNPESession() throws Exception { + ActiveMQServerImpl server = Mockito.mock(ActiveMQServerImpl.class); + SessionCallback sessionCallback = Mockito.mock(SessionCallback.class); + ManagementService managementService = Mockito.mock(ManagementService.class); + + try { + new ServerConsumerImpl(1, null, null, null, 1, true, false, new NullStorageManager(), sessionCallback, true, true, managementService, false, 0, server); + Assert.fail("Exception was expected"); + } catch (NullPointerException e) { + logger.debug("Expected exception", e); + } + } + + @Test + public void testDefendDestroyedConnection() throws Exception { + ActiveMQServerImpl server = Mockito.mock(ActiveMQServerImpl.class); + + SessionCallback sessionCallback = Mockito.mock(SessionCallback.class); + + ManagementService managementService = Mockito.mock(ManagementService.class); + + StorageManager storageManager = new NullStorageManager(); + + ArtemisExecutor artemisExecutor = Mockito.mock(ArtemisExecutor.class); + ExecutorFactory executorFactory = Mockito.mock(ExecutorFactory.class); + Mockito.when(executorFactory.getExecutor()).thenReturn(artemisExecutor); + + Configuration configuration = new ConfigurationImpl(); + + ActiveMQServer mockServer = Mockito.mock(ActiveMQServer.class); + Mockito.when(mockServer.getExecutorFactory()).thenReturn(executorFactory); + Mockito.when(mockServer.getConfiguration()).thenReturn(configuration); + + BufferHandler bufferHandler = Mockito.mock(BufferHandler.class); + InVMConnection inVMConnection = new InVMConnection(1, bufferHandler, Mockito.mock(BaseConnectionLifeCycleListener.class), artemisExecutor); + + RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(new ServerPacketDecoder(storageManager), inVMConnection, new ArrayList<>(), new ArrayList<>(), RandomUtil.randomSimpleString(), artemisExecutor); + remotingConnection.destroy(); + ServerSessionImpl session = new ServerSessionImpl(RandomUtil.randomString(), RandomUtil.randomString(), RandomUtil.randomString(), + RandomUtil.randomString(), 1000, true, true, true, true, true, + remotingConnection, storageManager, Mockito.mock(PostOffice.class), Mockito.mock(ResourceManager.class), Mockito.mock(SecurityStore.class), Mockito.mock(ManagementService.class), mockServer, RandomUtil.randomSimpleString(), RandomUtil.randomSimpleString(), Mockito.mock(SessionCallback.class), Mockito.mock(OperationContext.class), Mockito.mock(PagingManager.class), new HashMap<>(), "securityDomain", false); + + try { + new ServerConsumerImpl(1, session, null, null, 1, true, false, new NullStorageManager(), sessionCallback, true, true, managementService, false, 0, server); + Assert.fail("Exception was expected"); + } catch (ActiveMQException activeMQException) { + logger.info("Expected exception", activeMQException); + Assert.assertTrue(activeMQException.getMessage().contains("AMQ229250")); + } + } + + @Test + public void testDefendAMQPConsumerNullConnection() throws Exception { + testDefendAMQPConsumer(true); + } + + @Test + public void testDefendAMQPConsumer() throws Exception { + testDefendAMQPConsumer(false); + } + + private void testDefendAMQPConsumer(boolean returnNullConnection) throws Exception { + ActiveMQServer server = Mockito.mock(ActiveMQServer.class); + ProtonProtocolManagerFactory protonProtocolManagerFactory = new ProtonProtocolManagerFactory(); + ProtonProtocolManager protonProtocolManager = new ProtonProtocolManager(protonProtocolManagerFactory, server, new ArrayList<>(), new ArrayList<>()); + AMQPConnectionContext connectionContext = Mockito.mock(AMQPConnectionContext.class); + + if (returnNullConnection) { + Mockito.when(connectionContext.getHandler()).thenReturn(null); + } else { + ProtonHandler protonHandler = Mockito.mock(ProtonHandler.class); + Mockito.when(connectionContext.getHandler()).thenReturn(protonHandler); + Connection qpidConnection = Mockito.mock(Connection.class); + Mockito.when(qpidConnection.getRemoteState()).thenReturn(EndpointState.CLOSED); + Mockito.when(protonHandler.getConnection()).thenReturn(qpidConnection); + } + + Mockito.when(connectionContext.getProtocolManager()).thenReturn(protonProtocolManager); + Sender sender = Mockito.mock(Sender.class); + AMQPSessionContext sessionContext = Mockito.mock(AMQPSessionContext.class); + AMQPSessionCallback sessionCallback = Mockito.mock(AMQPSessionCallback.class); + + try { + ProtonServerSenderContext serverSenderContext = new ProtonServerSenderContext(connectionContext, sender, sessionContext, sessionCallback); + serverSenderContext.initialize(); + } catch (ActiveMQException e) { + Assert.assertTrue(e.getMessage().contains("AMQ119027")); + logger.warn(e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionDroppedLeakTest.java b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionDroppedLeakTest.java new file mode 100644 index 0000000000..a180c20166 --- /dev/null +++ b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionDroppedLeakTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.leak; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import io.github.checkleak.core.CheckLeak; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.ServerStatus; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.activemq.artemis.tests.leak.MemoryAssertions.assertMemory; +import static org.apache.activemq.artemis.tests.leak.MemoryAssertions.basicMemoryAsserts; + +public class ConnectionDroppedLeakTest extends ActiveMQTestBase { + + private ConnectionFactory createConnectionFactory(String protocol) { + if (protocol.equals("AMQP")) { + return CFUtil.createConnectionFactory("AMQP", "amqp://localhost:61616?amqp.idleTimeout=120000&failover.maxReconnectAttempts=1&jms.prefetchPolicy.all=10&jms.forceAsyncAcks=true"); + } else { + return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + } + } + + private static final String QUEUE_NAME = "QUEUE_DROP"; + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + ActiveMQServer server; + + Queue serverQueue; + + @BeforeClass + public static void beforeClass() throws Exception { + Assume.assumeTrue(CheckLeak.isLoaded()); + } + + @After + public void validateServer() throws Exception { + CheckLeak checkLeak = new CheckLeak(); + + // I am doing this check here because the test method might hold a client connection + // so this check has to be done after the test, and before the server is stopped + assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName()); + + server.stop(); + + server = null; + serverQueue = null; + + clearServers(); + ServerStatus.clear(); + + assertMemory(checkLeak, 0, ActiveMQServerImpl.class.getName()); + } + + @Override + @Before + public void setUp() throws Exception { + server = createServer(true, createDefaultConfig(1, true)); + server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2); + server.start(); + server.addAddressInfo(new AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST)); + + serverQueue = server.createQueue(new QueueConfiguration().setAddress(QUEUE_NAME).setName(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true)); + } + + @Test + public void testDropConnectionsAMQP() throws Exception { + doDropConnections("AMQP"); + } + + @Test + public void testDropConnectionsCORE() throws Exception { + doDropConnections("CORE"); + } + + @Test + public void testDropConnectionsOPENWIRE() throws Exception { + doDropConnections("OPENWIRE"); + } + + private void doDropConnections(String protocol) throws Exception { + basicMemoryAsserts(); + + CountDownLatch latchDone = new CountDownLatch(2); + CountDownLatch latchReceived = new CountDownLatch(50); + AtomicInteger errors = new AtomicInteger(2); + AtomicBoolean running = new AtomicBoolean(true); + + ExecutorService executorService = Executors.newFixedThreadPool(2); + runAfter(executorService::shutdownNow); + runAfter(() -> running.set(false)); + + executorService.execute(() -> { + ConnectionFactory cf = createConnectionFactory(protocol); + Connection connection = null; + try { + connection = cf.createConnection(); // I will leave this open on purpose + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + while (running.get()) { + Message message = consumer.receive(100); + if (message != null) { + latchReceived.countDown(); + session.commit(); + } + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + if (protocol.equals("OPENWIRE")) { + try { + connection.close();// only closing the openwire as it would leave a hanging thread + } catch (Throwable ignored) { + } + } + latchDone.countDown(); + } + }); + + executorService.execute(() -> { + ConnectionFactory cf = createConnectionFactory(protocol); + Connection connection = null; + try { + connection = cf.createConnection(); // I will leave this open on purpose + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + while (running.get()) { + producer.send(session.createTextMessage("hello")); + session.commit(); + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + if (protocol.equals("OPENWIRE")) { + try { + connection.close();// only closing the openwire as it may leave a hanging thread + } catch (Throwable ignored) { + } + } + latchDone.countDown(); + } + }); + + Assert.assertTrue(latchReceived.await(10, TimeUnit.SECONDS)); + + server.getRemotingService().getConnections().forEach(r -> { + r.fail(new ActiveMQException("it's a simulation")); + }); + + Assert.assertTrue(latchDone.await(30, TimeUnit.SECONDS)); + running.set(false); + + serverQueue.deleteAllReferences(); + + basicMemoryAsserts(); + } +} \ No newline at end of file diff --git a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java index 617933915f..42917495c6 100644 --- a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java +++ b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java @@ -128,6 +128,17 @@ public class ConnectionLeakTest extends ActiveMQTestBase { int MESSAGES = 20; basicMemoryAsserts(); + // The separate method here exists to ensure basicMemoryAsserts runs correctly. + // Since I'm evaluating RemotingConnectionImpl the connectionFactory inside the method may still + // hold a reference that would only be released at the end of the method, + // because of that I need to make these calls on a separate method. + // I tried not needing this by creating a context but that was not enough to release the references. + doManyConsumers(protocol, REPEATS, MESSAGES, checkLeak); + + basicMemoryAsserts(); + } + + private void doManyConsumers(String protocol, int REPEATS, int MESSAGES, CheckLeak checkLeak) throws Exception { ConnectionFactory cf = createConnectionFactory(protocol); try (Connection producerConnection = cf.createConnection(); Connection consumerConnection = cf.createConnection()) { @@ -175,7 +186,6 @@ public class ConnectionLeakTest extends ActiveMQTestBase { assertMemory(new CheckLeak(), 0, ServerConsumerImpl.class.getName()); - // this is just to drain the messages try (Connection targetConnection = cf.createConnection(); Connection consumerConnection = cf.createConnection()) { Session targetSession = targetConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -205,10 +215,8 @@ public class ConnectionLeakTest extends ActiveMQTestBase { ((ActiveMQConnectionFactory)cf).close(); } - basicMemoryAsserts(); } - @Test public void testCancelledDeliveries() throws Exception { doTestCancelledDelivery("AMQP"); diff --git a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java index 47e4cab2b9..dc926e517c 100644 --- a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java +++ b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java @@ -20,10 +20,13 @@ package org.apache.activemq.artemis.tests.leak; import java.lang.invoke.MethodHandles; import io.github.checkleak.core.CheckLeak; +import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; +import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; +import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; @@ -42,6 +45,9 @@ public class MemoryAssertions { assertMemory(checkLeak, 0, OpenWireConnection.class.getName()); assertMemory(checkLeak, 0, ProtonServerSenderContext.class.getName()); assertMemory(checkLeak, 0, ProtonServerReceiverContext.class.getName()); + assertMemory(checkLeak, 0, ActiveMQProtonRemotingConnection.class.getName()); + assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName()); + assertMemory(checkLeak, 0, ServerSessionImpl.class.getName()); assertMemory(checkLeak, 0, AMQPSessionContext.class.getName()); assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName()); assertMemory(checkLeak, 0, RoutingContextImpl.class.getName());