This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new cda48d5d2f [#2137] Fix maxUncommittedCount usage across a network of
brokers
cda48d5d2f is described below
commit cda48d5d2fa6509194202231775814d38b5bdd01
Author: Matt Pavlovich <[email protected]>
AuthorDate: Mon Jun 22 19:22:11 2026 -0500
[#2137] Fix maxUncommittedCount usage across a network of brokers
---
.../apache/activemq/broker/TransactionBroker.java | 4 +-
.../usecases/MaxUncommittedCountNetworkTest.java | 154 +++++++++++++++++++++
2 files changed, 156 insertions(+), 2 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
index c8af4a6144..1b1e50f5f8 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
@@ -312,9 +312,9 @@ public class TransactionBroker extends BrokerFilter {
}
protected void verifyUncommittedCount(ProducerBrokerExchange
producerExchange, Transaction transaction, Message message) throws Exception {
- // maxUncommittedCount <= 0 disables
+ // maxUncommittedCount <= 0 disables; null transaction means
non-transactional send
int maxUncommittedCount =
this.getBrokerService().getMaxUncommittedCount();
- if (maxUncommittedCount > 0 && transaction.size() >=
maxUncommittedCount) {
+ if (maxUncommittedCount > 0 && transaction != null &&
transaction.size() >= maxUncommittedCount) {
try {
// Rollback as we are throwing an error the client as throwing
the error will cause
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountNetworkTest.java
new file mode 100644
index 0000000000..5e2fd208f9
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountNetworkTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import jakarta.jms.JMSContext;
+import jakarta.jms.Message;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.activemq.test.annotations.ParallelTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Verifies that maxUncommittedCount does not break non-transactional
+ * message forwarding across network connectors.
+ *
+ * Prior to the fix, network-forwarded messages (which have a null
+ * transactionId) caused a NullPointerException in
+ * TransactionBroker.verifyUncommittedCount() when maxUncommittedCount
+ * was set to a positive value.
+ */
+@RunWith(value = Parameterized.class)
+@Category(ParallelTest.class)
+public class MaxUncommittedCountNetworkTest {
+
+ private BrokerService brokerA;
+ private BrokerService brokerB;
+
+ @Parameterized.Parameters(name="transacted={0}")
+ public static Collection<Object> data() {
+ return Arrays.asList(new Object[] { true, false });
+ }
+
+ private final boolean transacted;
+
+ public MaxUncommittedCountNetworkTest(boolean transacted) {
+ this.transacted = transacted;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ brokerA = new BrokerService();
+ brokerA.setBrokerName("brokerA");
+ brokerA.setUseJmx(false);
+ brokerA.setPersistent(false);
+ brokerA.addConnector("tcp://localhost:0");
+ brokerA.setMaxUncommittedCount(10);
+ brokerA.start();
+ brokerA.waitUntilStarted();
+
+ var brokerAUri =
brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+
+ brokerB = new BrokerService();
+ brokerB.setBrokerName("brokerB");
+ brokerB.setUseJmx(false);
+ brokerB.setPersistent(false);
+ brokerB.addConnector("tcp://localhost:0");
+ brokerB.setMaxUncommittedCount(10);
+
+ var nc = brokerB.addNetworkConnector("static:(" + brokerAUri + ")");
+ nc.setDuplex(true);
+
+ brokerB.start();
+ brokerB.waitUntilStarted();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (brokerB != null) {
+ brokerB.stop();
+ brokerB.waitUntilStopped();
+ }
+ if (brokerA != null) {
+ brokerA.stop();
+ brokerA.waitUntilStopped();
+ }
+ }
+
+ @Test
+ public void testSendAcrossNetworkConnector() throws Exception {
+ var queueName = "test.max.uncommited.network.tx." + transacted;
+ int messageCount = transacted ? 5 : 20;
+
+ var brokerAUri =
brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+ var brokerBUri =
brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+ var consumerFactory = new ActiveMQConnectionFactory(brokerBUri);
+ var producerFactory = new ActiveMQConnectionFactory(brokerAUri);
+
+ try (final var consumerContext =
consumerFactory.createContext(transacted ? JMSContext.SESSION_TRANSACTED :
JMSContext.AUTO_ACKNOWLEDGE);
+ final var consumer =
consumerContext.createConsumer(consumerContext.createQueue(queueName));
+ final var producerContext =
producerFactory.createContext(transacted ? JMSContext.SESSION_TRANSACTED :
JMSContext.AUTO_ACKNOWLEDGE)) {
+
+ assertTrue("Remote consumer should come online",
+ Wait.waitFor(() -> {
+ return !brokerA.getDestination(new
ActiveMQQueue(queueName)).getConsumers().isEmpty();
+ }, 2_000L, 10L));
+
+
+ for (var i = 0; i < messageCount; i++) {
+ var msg = producerContext
+ .createProducer()
+ .send(producerContext.createQueue(queueName),
+ producerContext.createTextMessage("msg-" +
i));
+ }
+
+ if (transacted) {
+ producerContext.commit();
+ }
+
+ final var count = new AtomicInteger(0);
+ assertTrue("All messages should arrive across the network
connector",
+ Wait.waitFor(() -> {
+ Message msg;
+ while ((msg = consumer.receive(1_000L)) != null) {
+ if (transacted) {
+ consumerContext.commit();
+ }
+ count.incrementAndGet();
+ }
+ return count.get() >= messageCount;
+ }, 15_000L, 10L));
+
+ assertEquals(messageCount, count.get());
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact