This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new c3bef84be AMQ-9343 - Reduce memory used for in flight transactions
     new 961067ec1 Merge pull request #1075 from cshannon/AMQ-9343
c3bef84be is described below

commit c3bef84be5717d91e81b196fac23800097737c5a
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
AuthorDate: Wed Oct 18 10:35:20 2023 -0400

    AMQ-9343 - Reduce memory used for in flight transactions
    
    This commit will reduce the memory required in KahaDB for long running
    transactions and transactions with a lot of pending message sends by
    clearing out the message memory when no longer needed instead of keeping
    it tracked in the pending map
---
 .../apache/activemq/store/kahadb/KahaDBStore.java  |  23 ++++
 .../kahadb/KahaDBInFlightTxMemoryUsageTest.java    | 142 +++++++++++++++++++++
 2 files changed, 165 insertions(+)

diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 06dd0966b..bde01d040 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -572,6 +572,29 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                     }
                 }
             }, null);
+
+            /*
+             * After we store the command in the journal we no longer need to 
keep the message
+             * on the command, and we can clear the field here.
+             *
+             * The reason to clear the message is that for messages added as 
part of a transaction the command
+             * will be added to the inflightTransactions map as a pending add 
operation.
+             * For long-running transactions and/or transactions with a lot of 
pending messages
+             * (or large messages) this can use up a decent amount of memory 
which can increase GC pressure.
+             *
+             * The commands are only tracked in the map so that the KahaDB 
index can be updated later
+             * on transaction commit, but updating the index only requires 
metadata from the command
+             * such as message id or destination and not the message itself, 
so we can safely clear the field.
+             *
+             * Note that on broker restart and recovery of the KahaDB journal 
the pending message
+             * adds for transactions will be loaded again and the memory won't 
be cleared in that case.
+             * This could be revisited in the future if an issue but that 
should not be a large
+             * issue because that's only done on first startup and during 
recovery and then
+             * after the broker is recovered the memory footprint will drop. 
Also, as of now, recovering
+             * XA transactions in the transaction broker requires loading the 
messages and acks anyway
+             * for processing, so we need to load the full message and keep it 
in the pending operation.
+             */
+            command.clearMessage();
         }
 
         @Override
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java
new file mode 100644
index 000000000..ba9ecc5b7
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadb;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import jakarta.jms.JMSException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.kahadb.MessageDatabase.AddOperation;
+import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
+import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KahaDBInFlightTxMemoryUsageTest {
+    static final Logger LOG = 
LoggerFactory.getLogger(KahaDBInFlightTxMemoryUsageTest.class);
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new 
File("target"));
+
+    private BrokerService broker;
+    private URI brokerConnectURI;
+
+    private Map<TransactionId, List<Operation<?>>> inflightTransactions;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setDataDirectoryFile(dataFileDir.getRoot());
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        //set up a transport
+        TransportConnector connector = broker
+            .addConnector(new TransportConnector());
+        connector.setUri(new URI("tcp://0.0.0.0:0"));
+        connector.setName("tcp");
+
+        broker.start();
+        broker.waitUntilStarted();
+        brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+
+        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter();
+        Field inflightField = 
MessageDatabase.class.getDeclaredField("inflightTransactions");
+        inflightField.setAccessible(true);
+
+        inflightTransactions = (LinkedHashMap<TransactionId, 
List<Operation<?>>>)
+            inflightField.get(adapter.getStore());
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+
+    @Test
+    public void testKahaDBInFlightTxMessagesClearedFromMemory() throws 
JMSException {
+        final String queueName = "test.queue";
+
+        final ActiveMQConnection connection = (ActiveMQConnection) new 
ActiveMQConnectionFactory(brokerConnectURI)
+            .createConnection();
+        // sync send so messages are immediately sent for testing, normally 
transacted sessions async send
+        connection.setAlwaysSyncSend(true);
+        connection.start();
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(queueName);
+
+        try {
+            // Should be empty before any sends
+            assertTrue(inflightTransactions.isEmpty());
+
+            // Send 10 transacted messages but don't commit so they are pending
+            MessageProducer prod = session.createProducer(queue);
+            for (int i = 0; i < 10; i++) {
+                prod.send(session.createTextMessage("test"));
+            }
+
+            // Check the inflight transactions map to verify the pending 
operations had messages cleared
+            assertEquals(inflightTransactions.size(), 1);
+            List<Operation<?>> pendingOps = 
inflightTransactions.values().stream().findFirst().orElseThrow();
+            assertEquals(10, pendingOps.size());
+
+            for (Operation<?> pendingOp : pendingOps) {
+                // all 10 ops should be AddOperation
+                assertTrue(pendingOp instanceof AddOperation);
+                KahaAddMessageCommand command = 
((AddOperation)pendingOp).getCommand();
+                assertNotNull(pendingOp.getLocation());
+                assertNotNull(command);
+                assertNotNull(command.getMessageId());
+                assertNotNull(command.getDestination());
+
+                // Message should now be null when in the pending list
+                assertNull(command.getMessage());
+            }
+
+            // assert cleared after successful commit
+            session.commit();
+            assertTrue(inflightTransactions.isEmpty());
+        } finally {
+            connection.close();
+        }
+    }
+}

Reply via email to