This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch activemq-6.2.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-6.2.x by this push:
     new e0527433da [#2137] Fix maxUncommittedCount usage across a network of 
brokers
e0527433da is described below

commit e0527433da3ff57b6e7f175a795c6bc14b7f322e
Author: Matt Pavlovich <[email protected]>
AuthorDate: Mon Jun 22 19:22:11 2026 -0500

    [#2137] Fix maxUncommittedCount usage across a network of brokers
    
    (cherry picked from commit cda48d5d2fa6509194202231775814d38b5bdd01)
---
 .../apache/activemq/broker/TransactionBroker.java  |   4 +-
 .../usecases/MaxUncommittedCountNetworkTest.java   | 154 +++++++++++++++++++++
 2 files changed, 156 insertions(+), 2 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
index c8af4a6144..1b1e50f5f8 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
@@ -312,9 +312,9 @@ public class TransactionBroker extends BrokerFilter {
     }
 
     protected void verifyUncommittedCount(ProducerBrokerExchange 
producerExchange, Transaction transaction, Message message) throws Exception {
-        // maxUncommittedCount <= 0 disables
+        // maxUncommittedCount <= 0 disables; null transaction means 
non-transactional send
         int maxUncommittedCount = 
this.getBrokerService().getMaxUncommittedCount();
-        if (maxUncommittedCount > 0 && transaction.size() >= 
maxUncommittedCount) {
+        if (maxUncommittedCount > 0 && transaction != null && 
transaction.size() >= maxUncommittedCount) {
 
             try {
                 // Rollback as we are throwing an error the client as throwing 
the error will cause
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountNetworkTest.java
new file mode 100644
index 0000000000..5e2fd208f9
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountNetworkTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import jakarta.jms.JMSContext;
+import jakarta.jms.Message;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.activemq.test.annotations.ParallelTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Verifies that maxUncommittedCount does not break non-transactional
+ * message forwarding across network connectors.
+ *
+ * Prior to the fix, network-forwarded messages (which have a null
+ * transactionId) caused a NullPointerException in
+ * TransactionBroker.verifyUncommittedCount() when maxUncommittedCount
+ * was set to a positive value.
+ */
+@RunWith(value = Parameterized.class)
+@Category(ParallelTest.class)
+public class MaxUncommittedCountNetworkTest {
+
+    private BrokerService brokerA;
+    private BrokerService brokerB;
+
+    @Parameterized.Parameters(name="transacted={0}")
+    public static Collection<Object> data() {
+        return Arrays.asList(new Object[] { true, false });
+    }
+
+    private final boolean transacted;
+
+    public MaxUncommittedCountNetworkTest(boolean transacted) {
+        this.transacted = transacted;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        brokerA = new BrokerService();
+        brokerA.setBrokerName("brokerA");
+        brokerA.setUseJmx(false);
+        brokerA.setPersistent(false);
+        brokerA.addConnector("tcp://localhost:0");
+        brokerA.setMaxUncommittedCount(10);
+        brokerA.start();
+        brokerA.waitUntilStarted();
+
+        var brokerAUri = 
brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+
+        brokerB = new BrokerService();
+        brokerB.setBrokerName("brokerB");
+        brokerB.setUseJmx(false);
+        brokerB.setPersistent(false);
+        brokerB.addConnector("tcp://localhost:0");
+        brokerB.setMaxUncommittedCount(10);
+
+        var nc = brokerB.addNetworkConnector("static:(" + brokerAUri + ")");
+        nc.setDuplex(true);
+
+        brokerB.start();
+        brokerB.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (brokerB != null) {
+            brokerB.stop();
+            brokerB.waitUntilStopped();
+        }
+        if (brokerA != null) {
+            brokerA.stop();
+            brokerA.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testSendAcrossNetworkConnector() throws Exception {
+        var queueName = "test.max.uncommited.network.tx." + transacted;
+        int messageCount = transacted ? 5 : 20;
+
+        var brokerAUri = 
brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+        var brokerBUri = 
brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+        var consumerFactory = new ActiveMQConnectionFactory(brokerBUri);
+        var producerFactory = new ActiveMQConnectionFactory(brokerAUri);
+
+        try (final var consumerContext = 
consumerFactory.createContext(transacted ? JMSContext.SESSION_TRANSACTED : 
JMSContext.AUTO_ACKNOWLEDGE);
+             final var consumer = 
consumerContext.createConsumer(consumerContext.createQueue(queueName));
+             final var producerContext = 
producerFactory.createContext(transacted ? JMSContext.SESSION_TRANSACTED : 
JMSContext.AUTO_ACKNOWLEDGE)) {
+
+            assertTrue("Remote consumer should come online",
+                    Wait.waitFor(() -> {
+                        return !brokerA.getDestination(new 
ActiveMQQueue(queueName)).getConsumers().isEmpty();
+                    }, 2_000L, 10L));
+
+
+            for (var i = 0; i < messageCount; i++) {
+                var msg = producerContext
+                        .createProducer()
+                            .send(producerContext.createQueue(queueName),
+                                  producerContext.createTextMessage("msg-" + 
i));
+            }
+
+            if (transacted) {
+                producerContext.commit();
+            }
+
+            final var count = new AtomicInteger(0);
+            assertTrue("All messages should arrive across the network 
connector",
+                    Wait.waitFor(() -> {
+                        Message msg;
+                        while ((msg = consumer.receive(1_000L)) != null) {
+                            if (transacted) {
+                                consumerContext.commit();
+                            }
+                            count.incrementAndGet();
+                        }
+                        return count.get() >= messageCount;
+                    }, 15_000L, 10L));
+
+            assertEquals(messageCount, count.get());
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to