This is an automated email from the ASF dual-hosted git repository. mattrpav pushed a commit to branch activemq-5.19.x in repository https://gitbox.apache.org/repos/asf/activemq.git
commit 808d22bc0203c5f04d06f7187545046c5a05e19d Author: Matt Pavlovich <[email protected]> AuthorDate: Mon Jun 22 19:22:11 2026 -0500 [#2137] Fix maxUncommittedCount usage across a network of brokers (cherry picked from commit cda48d5d2fa6509194202231775814d38b5bdd01) --- .../apache/activemq/broker/TransactionBroker.java | 4 +- .../usecases/MaxUncommittedCountNetworkTest.java | 154 +++++++++++++++++++++ 2 files changed, 156 insertions(+), 2 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java index dcee33602a..ac7b119e64 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -311,9 +311,9 @@ public class TransactionBroker extends BrokerFilter { } protected void verifyUncommittedCount(ProducerBrokerExchange producerExchange, Transaction transaction, Message message) throws Exception { - // maxUncommittedCount <= 0 disables + // maxUncommittedCount <= 0 disables; null transaction means non-transactional send int maxUncommittedCount = this.getBrokerService().getMaxUncommittedCount(); - if (maxUncommittedCount > 0 && transaction.size() >= maxUncommittedCount) { + if (maxUncommittedCount > 0 && transaction != null && transaction.size() >= maxUncommittedCount) { try { // Rollback as we are throwing an error the client as throwing the error will cause diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountNetworkTest.java new file mode 100644 index 0000000000..5e2fd208f9 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountNetworkTest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.jms.JMSContext; +import jakarta.jms.Message; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.apache.activemq.test.annotations.ParallelTest; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Verifies that maxUncommittedCount does not break non-transactional + * message forwarding across network connectors. + * + * Prior to the fix, network-forwarded messages (which have a null + * transactionId) caused a NullPointerException in + * TransactionBroker.verifyUncommittedCount() when maxUncommittedCount + * was set to a positive value. + */ +@RunWith(value = Parameterized.class) +@Category(ParallelTest.class) +public class MaxUncommittedCountNetworkTest { + + private BrokerService brokerA; + private BrokerService brokerB; + + @Parameterized.Parameters(name="transacted={0}") + public static Collection<Object> data() { + return Arrays.asList(new Object[] { true, false }); + } + + private final boolean transacted; + + public MaxUncommittedCountNetworkTest(boolean transacted) { + this.transacted = transacted; + } + + @Before + public void setUp() throws Exception { + brokerA = new BrokerService(); + brokerA.setBrokerName("brokerA"); + brokerA.setUseJmx(false); + brokerA.setPersistent(false); + brokerA.addConnector("tcp://localhost:0"); + brokerA.setMaxUncommittedCount(10); + brokerA.start(); + brokerA.waitUntilStarted(); + + var brokerAUri = brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + + brokerB = new BrokerService(); + brokerB.setBrokerName("brokerB"); + brokerB.setUseJmx(false); + brokerB.setPersistent(false); + brokerB.addConnector("tcp://localhost:0"); + brokerB.setMaxUncommittedCount(10); + + var nc = brokerB.addNetworkConnector("static:(" + brokerAUri + ")"); + nc.setDuplex(true); + + brokerB.start(); + brokerB.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (brokerB != null) { + brokerB.stop(); + brokerB.waitUntilStopped(); + } + if (brokerA != null) { + brokerA.stop(); + brokerA.waitUntilStopped(); + } + } + + @Test + public void testSendAcrossNetworkConnector() throws Exception { + var queueName = "test.max.uncommited.network.tx." + transacted; + int messageCount = transacted ? 5 : 20; + + var brokerAUri = brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + var brokerBUri = brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + var consumerFactory = new ActiveMQConnectionFactory(brokerBUri); + var producerFactory = new ActiveMQConnectionFactory(brokerAUri); + + try (final var consumerContext = consumerFactory.createContext(transacted ? JMSContext.SESSION_TRANSACTED : JMSContext.AUTO_ACKNOWLEDGE); + final var consumer = consumerContext.createConsumer(consumerContext.createQueue(queueName)); + final var producerContext = producerFactory.createContext(transacted ? JMSContext.SESSION_TRANSACTED : JMSContext.AUTO_ACKNOWLEDGE)) { + + assertTrue("Remote consumer should come online", + Wait.waitFor(() -> { + return !brokerA.getDestination(new ActiveMQQueue(queueName)).getConsumers().isEmpty(); + }, 2_000L, 10L)); + + + for (var i = 0; i < messageCount; i++) { + var msg = producerContext + .createProducer() + .send(producerContext.createQueue(queueName), + producerContext.createTextMessage("msg-" + i)); + } + + if (transacted) { + producerContext.commit(); + } + + final var count = new AtomicInteger(0); + assertTrue("All messages should arrive across the network connector", + Wait.waitFor(() -> { + Message msg; + while ((msg = consumer.receive(1_000L)) != null) { + if (transacted) { + consumerContext.commit(); + } + count.incrementAndGet(); + } + return count.get() >= messageCount; + }, 15_000L, 10L)); + + assertEquals(messageCount, count.get()); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
