This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 1149a9ff1e AMQ-9829 Track prefetched messages for duplicate 
suppression during failover (#1616)
1149a9ff1e is described below

commit 1149a9ff1e74148b8c7c46bccca52a92c813afc6
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Thu Jan 22 07:28:53 2026 +0100

    AMQ-9829 Track prefetched messages for duplicate suppression during 
failover (#1616)
    
    During failover in transacted sessions with async dispatch 
(MessageListener),
      prefetched messages sitting in the unconsumedMessages buffer were not 
being
      tracked in previouslyDeliveredMessages. This caused them to be incorrectly
      identified as duplicates on redelivery and poison-acked to the DLQ.
---
 .../apache/activemq/ActiveMQMessageConsumer.java   |  59 +++-
 .../ActiveMQMessageConsumerClearMessagesTest.java  | 342 +++++++++++++++++++++
 .../FailoverDurableSubTransactionTest.java         |  10 +-
 3 files changed, 409 insertions(+), 2 deletions(-)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 298ce38ddc..a6bf1b2095 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -113,6 +113,7 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
     class PreviouslyDelivered {
         org.apache.activemq.command.Message message;
         boolean redelivered;
+        boolean prefetchedOnly;  // true if message was only prefetched, not 
delivered to application
 
         PreviouslyDelivered(MessageDispatch messageDispatch) {
             message = messageDispatch.getMessage();
@@ -122,6 +123,12 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
             message = messageDispatch.getMessage();
             this.redelivered = redelivered;
         }
+
+        PreviouslyDelivered(MessageDispatch messageDispatch, boolean 
redelivered, boolean prefetchedOnly) {
+            message = messageDispatch.getMessage();
+            this.redelivered = redelivered;
+            this.prefetchedOnly = prefetchedOnly;
+        }
     }
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
@@ -770,8 +777,12 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
                     LOG.debug("{} clearing unconsumed list ({}) on transport 
interrupt", getConsumerId(), unconsumedMessages.size());
                     // ensure unconsumed are rolledback up front as they may 
get redelivered to another consumer
                     List<MessageDispatch> list = 
unconsumedMessages.removeAll();
+                    final boolean isTransacted = session.isTransacted();
                     if (!this.info.isBrowser()) {
                         for (MessageDispatch old : list) {
+                            if (isTransacted) {
+                                
capturePrefetchedMessagesForDuplicateSuppression(old);
+                            }
                             session.connection.rollbackDuplicate(this, 
old.getMessage());
                         }
                     }
@@ -933,6 +944,16 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
         if (!isAutoAcknowledgeBatch()) {
             synchronized(deliveredMessages) {
                 deliveredMessages.addFirst(md);
+                if (session.isTransacted()) {
+                    PreviouslyDelivered entry = null;
+                    if (previouslyDeliveredMessages != null) {
+                        entry = 
previouslyDeliveredMessages.get(md.getMessage().getMessageId());
+                    }
+                    if (entry != null && entry.prefetchedOnly) {
+                        entry.prefetchedOnly = false;
+                        entry.redelivered = true;
+                    }
+                }
             }
             if (session.getTransacted()) {
                 if (transactedIndividualAck) {
@@ -1382,6 +1403,7 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
                     removeFromDeliveredMessages(entry.message.getMessageId());
                 }
             }
+            // Clear everything on rollback - prefetched messages will be 
redelivered by broker
             clearPreviouslyDelivered();
         }
     }
@@ -1420,7 +1442,8 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
             synchronized (unconsumedMessages.getMutex()) {
                 if (!unconsumedMessages.isClosed()) {
                     // deliverySequenceId non zero means previously queued 
dispatch
-                    if (this.info.isBrowser() || md.getDeliverySequenceId() != 
0l || !session.connection.isDuplicate(this, md.getMessage())) {
+                    if (this.info.isBrowser() || md.getDeliverySequenceId() != 
0l || isPrefetchedRedelivery(md)
+                        || !session.connection.isDuplicate(this, 
md.getMessage())) {
                         if (listener != null && 
unconsumedMessages.isRunning()) {
                             if (redeliveryExceeded(md)) {
                                 poisonAck(md, "listener dispatch[" + 
md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery 
policy limit:" + redeliveryPolicy);
@@ -1570,6 +1593,33 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
         LOG.trace("{} tracking existing transacted {} delivered list({})", 
getConsumerId(), previouslyDeliveredMessages.transactionId, 
deliveredMessages.size());
     }
 
+    private void capturePrefetchedMessagesForDuplicateSuppression(final 
MessageDispatch pending) {
+        if (pending.getMessage() == null) {
+            return; // nothing to track
+        }
+        if (previouslyDeliveredMessages == null) {
+            previouslyDeliveredMessages = new 
PreviouslyDeliveredMap<>(session.getTransactionContext().getTransactionId());
+        }
+        previouslyDeliveredMessages.put(pending.getMessage().getMessageId(), 
new PreviouslyDelivered(pending, false, true));
+        LOG.trace("{} tracking existing transacted {} prefetched ({})", 
getConsumerId(), previouslyDeliveredMessages.transactionId, pending);
+    }
+
+    private boolean isPrefetchedRedelivery(final MessageDispatch md) {
+        if (!session.isTransacted()) {
+            return false;
+        }
+        if (md.getMessage() == null) {
+            return false;
+        }
+        synchronized (deliveredMessages) {
+            if (previouslyDeliveredMessages != null) {
+                PreviouslyDelivered entry = 
previouslyDeliveredMessages.get(md.getMessage().getMessageId());
+                return entry != null && entry.prefetchedOnly;
+            }
+        }
+        return false;
+    }
+
     public int getMessageSize() {
         return unconsumedMessages.size();
     }
@@ -1689,4 +1739,11 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
     public void setConsumerExpiryCheckEnabled(boolean 
consumerExpiryCheckEnabled) {
         this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
     }
+
+    // Protected method for testing
+    protected int getPreviouslyDeliveredMessagesSize() {
+        synchronized(deliveredMessages) {
+            return previouslyDeliveredMessages != null ? 
previouslyDeliveredMessages.size() : 0;
+        }
+    }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageConsumerClearMessagesTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageConsumerClearMessagesTest.java
new file mode 100644
index 0000000000..410dbce67c
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageConsumerClearMessagesTest.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import jakarta.jms.Connection;
+import jakarta.jms.Destination;
+import jakarta.jms.MessageListener;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.test.annotations.ParallelTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * AMQ-9829 Track prefetched messages for duplicate suppression during failover
+ *
+ * Test for clearMessagesInProgress() behavior in transacted sessions. Bug 
revealed with FailoverDurableSubTransactionTest
+ *
+ * This test verifies that when clearMessagesInProgress() is called (during 
failover),
+ * BOTH delivered messages AND prefetched (unconsumed) messages are tracked in
+ * previouslyDeliveredMessages for duplicate suppression.
+ */
+@Category(ParallelTest.class)
+public class ActiveMQMessageConsumerClearMessagesTest {
+
+    private BrokerService brokerService;
+    private String brokerURI;
+
+    @Rule
+    public TestName name = new TestName();
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        brokerURI = "vm://localhost?create=false";
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    /**
+     * Helper to create a TestableConsumer for the given session and 
destination.
+     */
+    private TestableConsumer createTestableConsumer(final ActiveMQSession 
session, final Destination destination) throws Exception {
+        final ConsumerId consumerId = session.getNextConsumerId();
+
+        return new TestableConsumer(
+            session,
+            consumerId,
+            ActiveMQMessageTransformation.transformDestination(destination),
+            null,  // name (not durable)
+            null,  // selector
+            1000,  // prefetch
+            -1,    // maximumPendingMessageCount
+            false, // noLocal
+            false, // browser
+            true,  // dispatchAsync
+            null   // messageListener
+        );
+    }
+
+    /**
+     * Test that clearMessagesInProgress() captures both delivered AND 
unconsumed messages
+     * into previouslyDeliveredMessages for transacted sessions.
+     *
+     * Scenario:
+     * 1. Create transacted consumer
+     * 2. Receive 3 messages (they go to deliveredMessages since session is 
transacted)
+     * 3. Manually add 5 messages to unconsumedMessages (simulating prefetched 
but not yet dispatched)
+     * 4. Trigger clearMessagesInProgress() (simulating transport 
interrupt/failover)
+     * 5. WITHOUT FIX: Only 3 messages tracked in previouslyDeliveredMessages
+     * 6. WITH FIX: All 8 messages (3 delivered + 5 prefetched) tracked in 
previouslyDeliveredMessages
+     */
+    @Test
+    public void testClearMessagesInProgressCapturesPrefetchedMessages() throws 
Exception {
+        final ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerURI);
+
+        final Connection connection = factory.createConnection();
+        connection.start();
+
+        // Create TRANSACTED session - this is critical for the bug
+        final Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        final Destination destination = 
session.createQueue(name.getMethodName());
+
+        // Create testable consumer that exposes protected fields
+        final TestableConsumer consumer = 
createTestableConsumer((ActiveMQSession) session, destination);
+
+        // Produce 3 messages that will be received and go to deliveredMessages
+        final MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < 3; i++) {
+            producer.send(session.createTextMessage("Delivered message " + i));
+        }
+        session.commit();
+
+        // Receive the 3 messages (they stay in deliveredMessages because 
session is transacted)
+        for (int i = 0; i < 3; i++) {
+            consumer.receive(1000);
+        }
+
+        // Now manually add 5 messages to unconsumedMessages to simulate 
prefetched messages
+        // that haven't been dispatched to the MessageListener yet
+        for (int i = 0; i < 5; i++) {
+            final ActiveMQTextMessage msg = new ActiveMQTextMessage();
+            msg.setMessageId(new MessageId("TEST:1:1:1:" + (100 + i)));
+            msg.setText("Prefetched message " + i);
+
+            final MessageDispatch dispatch = new MessageDispatch();
+            dispatch.setConsumerId(consumer.getConsumerId());
+            dispatch.setMessage(msg);
+
+            // Add to unconsumedMessages buffer (simulating prefetch)
+            consumer.addToUnconsumedMessages(dispatch);
+        }
+
+        // Verify setup: should have 3 delivered, 5 unconsumed, 0 previously 
delivered
+        assertEquals("Should have 3 delivered messages", 3, 
consumer.getDeliveredMessagesSize());
+        assertEquals("Should have 5 unconsumed (prefetched) messages", 5, 
consumer.getUnconsumedMessagesSize());
+        assertEquals("Should have 0 previously delivered before 
clearMessagesInProgress",
+                     0, consumer.getPreviouslyDeliveredMessagesSize());
+
+        // Now simulate transport interrupt / failover
+        consumer.triggerTransportInterrupt();
+
+        // The key assertion: without the fix for AMQ-9829, it would contain 
all delivered but the prefetched
+        final int previouslyDeliveredCount = 
consumer.getPreviouslyDeliveredMessagesSize();
+
+        assertEquals("previouslyDeliveredMessages should contain BOTH 
delivered (3) and prefetched (5) messages",
+                     8, previouslyDeliveredCount);
+
+        // Verify unconsumed buffer was cleared
+        assertEquals("unconsumedMessages should be cleared after 
clearMessagesInProgress",
+                     0, consumer.getUnconsumedMessagesSize());
+
+        connection.close();
+    }
+
+    /**
+     * Test that clearMessagesInProgress() works correctly for NON-transacted 
sessions.
+     * In non-transacted sessions, previouslyDeliveredMessages should not be 
populated.
+     */
+    @Test
+    public void testClearMessagesInProgressNonTransacted() throws Exception {
+        final ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerURI);
+
+        final Connection connection = factory.createConnection();
+        connection.start();
+
+        // Create NON-transacted session
+        final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        final Destination destination = 
session.createQueue(name.getMethodName());
+
+        final TestableConsumer consumer = 
createTestableConsumer((ActiveMQSession) session, destination);
+
+        // Simulate 5 prefetched messages (no broker messages needed for this 
test)
+        for (int i = 0; i < 5; i++) {
+            final ActiveMQTextMessage msg = new ActiveMQTextMessage();
+            msg.setMessageId(new MessageId("TEST:1:1:1:" + (200 + i)));
+            msg.setText("Prefetched message " + i);
+
+            final MessageDispatch dispatch = new MessageDispatch();
+            dispatch.setConsumerId(consumer.getConsumerId());
+            dispatch.setMessage(msg);
+
+            consumer.addToUnconsumedMessages(dispatch);
+        }
+
+        assertEquals("Should have 5 unconsumed messages", 5, 
consumer.getUnconsumedMessagesSize());
+
+        // Trigger clearMessagesInProgress
+        consumer.triggerTransportInterrupt();
+
+        // For non-transacted sessions, previouslyDeliveredMessages should 
remain 0
+        // (no duplicate suppression needed for non-transacted)
+        assertEquals("previouslyDeliveredMessages should be 0 for 
non-transacted session",
+                     0, consumer.getPreviouslyDeliveredMessagesSize());
+
+        connection.close();
+    }
+
+    /**
+     * Test edge case: clearMessagesInProgress() with only delivered messages 
(no prefetch).
+     */
+    @Test
+    public void testClearMessagesInProgressOnlyDeliveredMessages() throws 
Exception {
+        final ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerURI);
+
+        final Connection connection = factory.createConnection();
+        connection.start();
+
+        final Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        final Destination destination = 
session.createQueue(name.getMethodName());
+
+        final TestableConsumer consumer = 
createTestableConsumer((ActiveMQSession) session, destination);
+
+        // Produce and receive 3 messages - they go to deliveredMessages
+        final MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < 3; i++) {
+            producer.send(session.createTextMessage("Delivered message " + i));
+        }
+        session.commit();
+
+        // Receive all 3 messages (they stay in deliveredMessages)
+        for (int i = 0; i < 3; i++) {
+            consumer.receive(1000);
+        }
+
+        // Don't add any to unconsumedMessages - testing only delivered 
messages case
+        assertEquals("Should have 3 delivered messages", 3, 
consumer.getDeliveredMessagesSize());
+        assertEquals("Should have 0 unconsumed messages", 0, 
consumer.getUnconsumedMessagesSize());
+
+        // Trigger clearMessagesInProgress
+        consumer.triggerTransportInterrupt();
+
+        // Should have captured the 3 delivered messages
+        assertEquals("Should have captured 3 delivered messages",
+                     3, consumer.getPreviouslyDeliveredMessagesSize());
+
+        connection.close();
+    }
+
+    /**
+     * Test edge case: clearMessagesInProgress() with only prefetched messages 
(none delivered yet).
+     * This is the most direct test for the bug!
+     */
+    @Test
+    public void testClearMessagesInProgressOnlyPrefetchedMessages() throws 
Exception {
+        final ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerURI);
+
+        final Connection connection = factory.createConnection();
+        connection.start();
+
+        final Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        final Destination destination = 
session.createQueue(name.getMethodName());
+
+        final TestableConsumer consumer = 
createTestableConsumer((ActiveMQSession) session, destination);
+
+        // Simulate only prefetched messages (none delivered/dispatched yet)
+        // Don't produce any messages to the broker - we're only testing the 
prefetch scenario
+        for (int i = 0; i < 7; i++) {
+            final ActiveMQTextMessage msg = new ActiveMQTextMessage();
+            msg.setMessageId(new MessageId("TEST:1:1:1:" + (300 + i)));
+            msg.setText("Prefetched message " + i);
+
+            final MessageDispatch dispatch = new MessageDispatch();
+            dispatch.setConsumerId(consumer.getConsumerId());
+            dispatch.setMessage(msg);
+
+            consumer.addToUnconsumedMessages(dispatch);
+        }
+
+        assertEquals("Should have 0 delivered messages", 0, 
consumer.getDeliveredMessagesSize());
+        assertEquals("Should have 7 unconsumed messages", 7, 
consumer.getUnconsumedMessagesSize());
+
+        // Trigger clearMessagesInProgress
+        consumer.triggerTransportInterrupt();
+
+        // The key assertion, without the fix for AMQ-9829 it would be 0
+        assertEquals("Should have captured all 7 prefetched messages",
+                     7, consumer.getPreviouslyDeliveredMessagesSize());
+
+        connection.close();
+    }
+
+    /**
+     * Test subclass that exposes protected fields for testing 
clearMessagesInProgress().
+     * This avoids adding too many test methods in ActiveMQMessageCOnsumer
+     */
+    static class TestableConsumer extends ActiveMQMessageConsumer {
+
+        TestableConsumer(final ActiveMQSession session, final ConsumerId 
consumerId,
+                        final ActiveMQDestination destination, final String 
name,
+                        final String selector, final int prefetch, final int 
maximumPendingMessageCount,
+                        final boolean noLocal, final boolean browser, final 
boolean dispatchAsync,
+                        final MessageListener messageListener) throws 
Exception {
+            super(session, consumerId, destination, name, selector, prefetch,
+                  maximumPendingMessageCount, noLocal, browser, dispatchAsync, 
messageListener);
+        }
+
+        public int getUnconsumedMessagesSize() {
+            synchronized(unconsumedMessages.getMutex()) {
+                return unconsumedMessages.size();
+            }
+        }
+
+        public int getDeliveredMessagesSize() {
+            synchronized(deliveredMessages) {
+                return deliveredMessages.size();
+            }
+        }
+
+        public void addToUnconsumedMessages(final MessageDispatch dispatch) {
+            synchronized(unconsumedMessages.getMutex()) {
+                unconsumedMessages.enqueue(dispatch);
+            }
+        }
+
+        public void triggerTransportInterrupt() {
+            inProgressClearRequired();
+            clearMessagesInProgress();
+        }
+
+        @Override
+        public int getPreviouslyDeliveredMessagesSize() {
+            return super.getPreviouslyDeliveredMessagesSize();
+        }
+    }
+}
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
index 2d1fcb8528..6b57c6ae19 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.java
@@ -86,11 +86,15 @@ public class FailoverDurableSubTransactionTest {
     public void startBroker(boolean deleteAllMessagesOnStartup) throws 
Exception {
         broker = createBroker(deleteAllMessagesOnStartup);
         broker.start();
+        // Get the actual bound URI after broker starts (important for 
ephemeral ports)
+        url = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
     }
 
     public void startBroker(boolean deleteAllMessagesOnStartup, String 
bindAddress) throws Exception {
         broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
         broker.start();
+        // Get the actual bound URI after broker starts (important for 
ephemeral ports)
+        url = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
     }
 
     public BrokerService createBroker(boolean deleteAllMessagesOnStartup) 
throws Exception {
@@ -112,7 +116,7 @@ public class FailoverDurableSubTransactionTest {
         // faster redispatch
         broker.setKeepDurableSubsActive(true);
 
-        url = 
broker.getTransportConnectors().get(0).getConnectUri().toString();
+        // Do not set url here - need to get it after broker starts when using 
ephemeral ports
 
         return broker;
     }
@@ -174,6 +178,8 @@ public class FailoverDurableSubTransactionTest {
                 }
         });
         broker.start();
+        // Get the actual bound URI after broker starts (important for 
ephemeral ports)
+        url = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
 
         ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("failover:(" + url + ")");
         cf.setAlwaysSyncSend(true);
@@ -279,6 +285,8 @@ public class FailoverDurableSubTransactionTest {
 
         });
         broker.start();
+        // Get the actual bound URI after broker starts (important for 
ephemeral ports)
+        url = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
 
         ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("failover:(" + url + ")");
         cf.setAlwaysSyncSend(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to