This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit c1fd16d66a506d7878f0da6b71dfeaa096221d8c Author: Clebert Suconic <[email protected]> AuthorDate: Fri Jun 17 15:33:14 2022 -0400 ARTEMIS-3864 StompTransactions leaking on ActiveMQServer.getSessions() After a TX in stomp is committed, a session will never be cleared from ActiveMQServer --- .../core/protocol/stomp/StompProtocolManager.java | 67 ++++++++++++++-------- .../artemis/core/protocol/stomp/StompSession.java | 14 +++++ .../artemis/tests/integration/stomp/StompTest.java | 33 +++++++++++ 3 files changed, 89 insertions(+), 25 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 87d5c28ed7..1f9005a1c4 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -19,10 +19,9 @@ package org.apache.activemq.artemis.core.protocol.stomp; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import io.netty.channel.ChannelPipeline; @@ -63,7 +62,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St private final Executor executor; - private final Map<Object, StompSession> transactedSessions = new HashMap<>(); + // connectionID / Map<SessionId, StompSession> + private final Map<Object, Map<Object, StompSession>> transactedSessions = new ConcurrentHashMap<>(); // key => connection ID, value => Stomp session private final Map<Object, StompSession> sessions = new HashMap<>(); @@ -218,10 +218,29 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St } public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception { - return internalGetSession(connection, transactedSessions, txID, true); + return internalGetSession(connection, getTXMap(connection.getID()), txID, true); } + public Map<Object, Map<Object, StompSession>> getTransactedSessions() { + return transactedSessions; + } + + private Map<Object, StompSession> getTXMap(Object objectID) { + Map<Object, StompSession> sessions = transactedSessions.get(objectID); + if (sessions == null) { + sessions = new HashMap<>(); + Map<Object, StompSession> oldValue = transactedSessions.putIfAbsent(objectID, sessions); + if (oldValue != null) { + sessions = oldValue; + } + } + + return sessions; + } + + private StompSession internalGetSession(StompConnection connection, Map<Object, StompSession> sessions, Object id, boolean transacted) throws Exception { + System.out.println("Looking for sessionID " + id); StompSession stompSession = sessions.get(id); if (stompSession == null) { stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor())); @@ -253,21 +272,18 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St } } - // removed the transacted session belonging to the connection - Iterator<Entry<Object, StompSession>> iterator = transactedSessions.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<Object, StompSession> entry = iterator.next(); - if (entry.getValue().getConnection() == connection) { - ServerSession serverSession = entry.getValue().getCoreSession(); - try { - serverSession.rollback(true); - serverSession.close(false); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e); - } - iterator.remove(); + Map<Object, StompSession> sessionMap = getTXMap(connection.getID()); + sessionMap.values().forEach(ss -> { + try { + ss.getCoreSession().rollback(false); + ss.getCoreSession().close(false); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e); } - } + }); + sessionMap.clear(); + + transactedSessions.remove(connection.getID()); } }); } @@ -323,20 +339,20 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St public void commitTransaction(StompConnection connection, String txID) throws Exception { StompSession session = getTransactedSession(connection, txID); - if (session == null) { + if (session == null || !session.isTxPending()) { throw new ActiveMQStompException(connection, "No transaction started: " + txID); } - transactedSessions.remove(txID); session.getCoreSession().commit(); + session.end(); } public void abortTransaction(StompConnection connection, String txID) throws Exception { StompSession session = getTransactedSession(connection, txID); - if (session == null) { + if (session == null || !session.isTxPending()) { throw new ActiveMQStompException(connection, "No transaction started: " + txID); } - transactedSessions.remove(txID); session.getCoreSession().rollback(false); + session.end(); } public StompPostReceiptFunction subscribe(StompConnection connection, @@ -374,12 +390,13 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St public void beginTransaction(StompConnection connection, String txID) throws Exception { ActiveMQServerLogger.LOGGER.debugf("-------------------------------Stomp begin tx: %s", txID); - if (transactedSessions.containsKey(txID)) { + // create the transacted session + StompSession session = getTransactedSession(connection, txID); + if (session.isTxPending()) { ActiveMQServerLogger.LOGGER.stompErrorTXExists(txID); throw new ActiveMQStompException(connection, "Transaction already started: " + txID); } - // create the transacted session - getTransactedSession(connection, txID); + session.begin(); } public boolean destinationExists(String destination) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 8cdc9de769..10240f2993 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -72,6 +72,20 @@ public class StompSession implements SessionCallback { private volatile boolean noLocal = false; + private boolean txPending = false; + + public synchronized void begin() { + txPending = true; + } + + public synchronized boolean isTxPending() { + return txPending; + } + + public synchronized void end() { + txPending = false; + } + StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext) { this.connection = connection; this.manager = manager; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 30c37b090d..0075990605 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManager; import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; @@ -60,6 +61,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.tests.integration.mqtt.FuseMQTTClientProvider; import org.apache.activemq.artemis.tests.integration.mqtt.MQTTClientProvider; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; @@ -662,6 +664,37 @@ public class StompTest extends StompTestBase { } + @Test + public void testTransactedSessionLeak() throws Exception { + for (int i = 0; i < 10; i++) { + conn = StompClientConnectionFactory.createClientConnection(uri); + conn.connect(defUser, defPass); + + + for (int s = 0; s < 10; s++) { + String txId = "tx" + i + "_" + s; + beginTransaction(conn, txId); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, txId); + commitTransaction(conn, txId, true); + } + + Wait.assertEquals(13, () -> server.getSessions().size(), 1000, 100); + conn.disconnect(); + } + + if (connection != null) { + connection.close(); + } + + Wait.assertEquals(0, () -> server.getSessions().size(), 1000, 100); + + Acceptor stompAcceptor = server.getRemotingService().getAcceptors().get("stomp"); + StompProtocolManager stompProtocolManager = (StompProtocolManager) stompAcceptor.getProtocolHandler().getProtocolMap().get("STOMP"); + Assert.assertNotNull(stompProtocolManager); + + Assert.assertEquals(0, stompProtocolManager.getTransactedSessions().size()); + } + @Test public void testIngressTimestamp() throws Exception { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));
