This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
     new 708c90f2c0 Add flag to optionally enable temp destination stealing 
(#2122) (#2126)
708c90f2c0 is described below

commit 708c90f2c0f0d874649ea0f0454760668a8f90fc
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed Jun 17 17:06:48 2026 -0400

    Add flag to optionally enable temp destination stealing (#2122) (#2126)
    
    This adds a flag to allow other connections to create subscriptions on
    a temp destination, the default has been set to off. Setting to true
    enables some use cases to work such as failover but most of the time it
    is not needed and should be disabled to be JMS client spec compatible.
    
    The java client also already checked this, this now adds a broker side
    check as well.
    
    (cherry picked from commit d8c5672e224883a16465f8550d0e6d900bfa383c)
---
 .../broker/region/DestinationFactoryImpl.java      |  10 +-
 .../activemq/broker/region/TempDestination.java    |  24 ++++
 .../apache/activemq/broker/region/TempQueue.java   |  22 ++-
 .../apache/activemq/broker/region/TempTopic.java   |  21 ++-
 .../activemq/broker/region/policy/PolicyEntry.java |  16 +++
 .../apache/activemq/JmsTempDestinationTest.java    | 160 ++++++++++++++++++---
 6 files changed, 227 insertions(+), 26 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
index 6c146f0c8b..3c7737f0fb 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
@@ -76,6 +76,9 @@ public class DestinationFactoryImpl extends 
DestinationFactory {
         if (destination.isQueue()) {
             if (destination.isTemporary()) {
                 final ActiveMQTempDestination tempDest = 
(ActiveMQTempDestination)destination;
+                if (tempDest.getConnectionId() == null) {
+                    throw new IllegalArgumentException("Temporary queue must 
have a connectionId");
+                }
                 Queue queue = new TempQueue(brokerService, destination, null, 
destinationStatistics, taskRunnerFactory);
                 configureQueue(queue, destination);
                 queue.initialize();
@@ -88,8 +91,11 @@ public class DestinationFactoryImpl extends 
DestinationFactory {
                 return queue;
             }
         } else if (destination.isTemporary()) {
-            
-            Topic topic = new Topic(brokerService, destination, null, 
destinationStatistics, taskRunnerFactory);
+            final ActiveMQTempDestination tempDest = 
(ActiveMQTempDestination)destination;
+            if (tempDest.getConnectionId() == null) {
+                throw new IllegalArgumentException("Temporary topic must have 
a connectionId");
+            }
+            Topic topic = new TempTopic(brokerService, destination, null, 
destinationStatistics, taskRunnerFactory);
             configureTopic(topic, destination);
             topic.initialize();
             return topic;
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempDestination.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempDestination.java
new file mode 100644
index 0000000000..715323ca95
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempDestination.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+public interface TempDestination extends Destination {
+
+    void setAllowTempDestinationStealing(boolean allowTempDestinationStealing);
+
+    boolean isAllowTempDestinationStealing();
+}
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java
index 17eeb05647..be1daf76b1 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
+import javax.jms.InvalidDestinationException;
 import java.io.IOException;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -33,10 +34,12 @@ import org.slf4j.LoggerFactory;
  * 
  * 
  */
-public class TempQueue extends Queue{
+public class TempQueue extends Queue implements TempDestination{
+
     private static final Logger LOG = LoggerFactory.getLogger(TempQueue.class);
+
     private final ActiveMQTempDestination tempDest;
-   
+    private boolean allowTempDestinationStealing = false;
     
     /**
      * @param brokerService
@@ -65,6 +68,11 @@ public class TempQueue extends Queue{
     
     @Override
     public void addSubscription(ConnectionContext context, Subscription sub) 
throws Exception {
+        final String connectionId = 
sub.getConsumerInfo().getConsumerId().getConnectionId();
+        if (!isAllowTempDestinationStealing() && 
!tempDest.getConnectionId().equals(connectionId)) {
+            throw new InvalidDestinationException("Subscribing to a temporary 
queue created by another connection is not permitted");
+        }
+
         // Only consumers on the same connection can consume from
         // the temporary destination
         // However, we could have failed over - and we do this
@@ -97,4 +105,14 @@ public class TempQueue extends Queue{
         }
         super.dispose(context);
     }
+
+    @Override
+    public boolean isAllowTempDestinationStealing() {
+        return allowTempDestinationStealing;
+    }
+
+    @Override
+    public void setAllowTempDestinationStealing(boolean 
allowTempDestinationStealing) {
+        this.allowTempDestinationStealing = allowTempDestinationStealing;
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempTopic.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempTopic.java
index 9bf4658e93..854b17183e 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempTopic.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempTopic.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
+import javax.jms.InvalidDestinationException;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -30,8 +31,11 @@ import org.apache.activemq.thread.TaskRunnerFactory;
  * 
  * 
  */
-public class TempTopic  extends Topic  implements Task{
+public class TempTopic extends Topic implements Task, TempDestination {
+
     private final ActiveMQTempDestination tempDest;
+    private boolean allowTempDestinationStealing = false;
+
     /**
      * @param brokerService
      * @param destination
@@ -50,6 +54,11 @@ public class TempTopic  extends Topic  implements Task{
     }
     
     public void addSubscription(ConnectionContext context, Subscription sub) 
throws Exception {
+        final String connectionId = 
sub.getConsumerInfo().getConsumerId().getConnectionId();
+        if (!isAllowTempDestinationStealing() && 
!tempDest.getConnectionId().equals(connectionId)) {
+            throw new InvalidDestinationException("Subscribing to a temporary 
topic created by another connection is not permitted");
+        }
+
         // Only consumers on the same connection can consume from
         // the temporary destination
         // However, we could have failed over - and we do this
@@ -70,4 +79,14 @@ public class TempTopic  extends Topic  implements Task{
     
     public void initialize() {
     }
+
+    @Override
+    public boolean isAllowTempDestinationStealing() {
+        return allowTempDestinationStealing;
+    }
+
+    @Override
+    public void setAllowTempDestinationStealing(boolean 
allowTempDestinationStealing) {
+        this.allowTempDestinationStealing = allowTempDestinationStealing;
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 8ae052574b..235ab7d5dc 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -27,6 +27,7 @@ import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.QueueBrowserSubscription;
 import org.apache.activemq.broker.region.QueueSubscription;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TempDestination;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -108,6 +109,7 @@ public class PolicyEntry extends DestinationMapEntry {
     private boolean useTopicSubscriptionInflightStats = true;
     private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
     private boolean advancedMessageStatisticsEnabled = false; // [AMQ-8463]
+    private boolean allowTempDestinationStealing = false;
 
     /*
      * percentage of in-flight messages above which optimize message store is 
disabled
@@ -314,6 +316,12 @@ public class PolicyEntry extends DestinationMapEntry {
         if (isUpdate("advancedMessageStatisticsEnabled", includedProperties)) {
             
destination.setAdvancedMessageStatisticsEnabled(isAdvancedMessageStatisticsEnabled());
         }
+        if (destination instanceof TempDestination) {
+            if (isUpdate("allowTempDestinationStealing", includedProperties)) {
+                ((TempDestination) 
destination).setAllowTempDestinationStealing(
+                        isAllowTempDestinationStealing());
+            }
+        }
     }
 
     public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -1200,4 +1208,12 @@ public class PolicyEntry extends DestinationMapEntry {
     public void setAdvancedMessageStatisticsEnabled(boolean 
advancedMessageStatisticsEnabled) {
         this.advancedMessageStatisticsEnabled = 
advancedMessageStatisticsEnabled;
     }
+
+    public boolean isAllowTempDestinationStealing() {
+        return allowTempDestinationStealing;
+    }
+
+    public void setAllowTempDestinationStealing(boolean 
allowTempDestinationStealing) {
+        this.allowTempDestinationStealing = allowTempDestinationStealing;
+    }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
index 0ad37675a6..e962042d97 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
@@ -16,6 +16,29 @@
  */
 package org.apache.activemq;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import javax.jms.Destination;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.Response;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -39,8 +62,6 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.vm.VMTransport;
 import org.apache.activemq.util.Wait;
@@ -50,26 +71,55 @@ import org.slf4j.LoggerFactory;
 /**
  * @version
  */
-public class JmsTempDestinationTest extends TestCase {
+@RunWith(Parameterized.class)
+public class JmsTempDestinationTest {
+
+    @Parameters(name="allowTempDestinationStealing={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {false},
+                {true},
+        });
+    }
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JmsTempDestinationTest.class);
     private Connection connection;
     private ActiveMQConnectionFactory factory;
-    protected List<Connection> connections = Collections.synchronizedList(new 
ArrayList<Connection>());
+    protected List<Connection> connections = Collections.synchronizedList(new 
ArrayList<>());
+    private BrokerService brokerService;
+    private final boolean allowTempDestinationStealing;
+
+    public JmsTempDestinationTest(boolean allowTempDestinationStealing) {
+        this.allowTempDestinationStealing = allowTempDestinationStealing;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+
+        PolicyEntry tempQueueEntry = new PolicyEntry();
+        tempQueueEntry.setTempQueue(true);
+        
tempQueueEntry.setAllowTempDestinationStealing(allowTempDestinationStealing);
+        PolicyEntry tempTopicEntry = new PolicyEntry();
+        tempTopicEntry.setTempTopic(true);
+        
tempTopicEntry.setAllowTempDestinationStealing(allowTempDestinationStealing);
 
-    @Override
-    protected void setUp() throws Exception {
-        factory = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        PolicyMap pMap = new PolicyMap();
+        pMap.setPolicyEntries(List.of(tempQueueEntry, tempTopicEntry));
+
+        brokerService.setDestinationPolicy(pMap);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        factory = new ActiveMQConnectionFactory("vm://localhost");
         factory.setAlwaysSyncSend(true);
         connection = factory.createConnection();
         connections.add(connection);
     }
 
-    /**
-     * @see junit.framework.TestCase#tearDown()
-     */
-    @Override
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         for (Iterator<Connection> iter = connections.iterator(); 
iter.hasNext();) {
             Connection conn = iter.next();
             try {
@@ -78,6 +128,8 @@ public class JmsTempDestinationTest extends TestCase {
             }
             iter.remove();
         }
+        brokerService.stop();
+        brokerService.waitUntilStopped();
     }
 
     /**
@@ -85,6 +137,7 @@ public class JmsTempDestinationTest extends TestCase {
      *
      * @throws JMSException
      */
+    @Test
     public void testTempDestOnlyConsumedByLocalConn() throws JMSException {
         connection.start();
 
@@ -101,11 +154,12 @@ public class JmsTempDestinationTest extends TestCase {
         Session otherSession = otherConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         TemporaryQueue otherQueue = otherSession.createTemporaryQueue();
         MessageConsumer consumer = otherSession.createConsumer(otherQueue);
-        Message msg = consumer.receive(3000);
+        Message msg = consumer.receive(2000);
         assertNull(msg);
 
         // should throw InvalidDestinationException when consuming a temp
-        // destination from another connection
+        // destination from another connection.
+        // Note that this check is done in the client side
         try {
             consumer = otherSession.createConsumer(queue);
             fail("Send should fail since temp destination should be used from 
another connection");
@@ -115,17 +169,66 @@ public class JmsTempDestinationTest extends TestCase {
 
         // should be able to consume temp destination from the same connection
         consumer = tempSession.createConsumer(queue);
-        msg = consumer.receive(3000);
+        msg = consumer.receive(2000);
         assertNotNull(msg);
+    }
 
+    // Test broker checks and enforces allowTempDestinationStealing flag
+    @Test
+    public void testAllowTempDestStealingQueue() throws Exception {
+        testAllowTempDestStealing(false, allowTempDestinationStealing);
     }
 
+    // Test broker checks and enforces allowTempDestinationStealing flag
+    @Test
+    public void testAllowTempDestStealingTopic() throws Exception {
+        testAllowTempDestStealing(true, allowTempDestinationStealing);
+    }
+
+    // Test broker checks and enforces allowTempDestinationStealing flag
+    private void testAllowTempDestStealing(boolean topic, boolean 
tempDestStealing) throws Exception {
+        connection.start();
+
+        // create a temporary queue on the first connection
+        Session tempSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Destination tempDest = topic ? tempSession.createTemporaryTopic() :
+                tempSession.createTemporaryQueue();
+
+        // Create another connection/session
+        ActiveMQConnection otherConnection = (ActiveMQConnection) 
factory.createConnection();
+        connections.add(otherConnection);
+        ActiveMQSession otherSession = (ActiveMQSession) 
otherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Send a direct ConsumerInfo to bypass the client check that would 
normally block this
+        // This will try and subscribe the second connection to the first 
connections
+        // temporary dest
+        ConsumerInfo info = new ConsumerInfo(otherSession.getNextConsumerId());
+        info.setClientId(otherSession.connection.getClientID());
+        
info.setDestination(ActiveMQMessageTransformation.transformDestination(tempDest));
+        Object result = otherConnection.getTransport().request(info, 1000);
+
+        // The broker should allow because allowTempDestinationStealing = true
+        if (tempDestStealing) {
+            assertTrue(result instanceof Response);
+            assertFalse(((Response) result).isException());
+        } else {
+            // The broker should throw an error because 
allowTempDestinationStealing = false
+            assertTrue(result instanceof ExceptionResponse);
+            assertTrue(((Response) result).isException());
+            assertTrue(((ExceptionResponse) result).getException() instanceof 
InvalidDestinationException);
+            assertTrue(((ExceptionResponse) result).getException().getMessage()
+                    .contains("created by another connection is not 
permitted"));
+        }
+    }
+
+
     /**
      * Make sure that a temp queue does not drop message if there is an active
      * consumers.
      *
      * @throws JMSException
      */
+    @Test
     public void testTempQueueHoldsMessagesWithConsumers() throws JMSException {
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createTemporaryQueue();
@@ -140,7 +243,8 @@ public class JmsTempDestinationTest extends TestCase {
         Message message2 = consumer.receive(1000);
         assertNotNull(message2);
         assertTrue("Expected message to be a TextMessage", message2 instanceof 
TextMessage);
-        assertTrue("Expected message to be a '" + message.getText() + "'", 
((TextMessage)message2).getText().equals(message.getText()));
+        assertEquals("Expected message to be a '" + message.getText() + "'",
+                ((TextMessage) message2).getText(), message.getText());
     }
 
     /**
@@ -149,6 +253,7 @@ public class JmsTempDestinationTest extends TestCase {
      *
      * @throws JMSException
      */
+    @Test
     public void testTempQueueHoldsMessagesWithoutConsumers() throws 
JMSException {
 
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
@@ -160,10 +265,11 @@ public class JmsTempDestinationTest extends TestCase {
 
         connection.start();
         MessageConsumer consumer = session.createConsumer(queue);
-        Message message2 = consumer.receive(3000);
+        Message message2 = consumer.receive(1000);
         assertNotNull(message2);
         assertTrue("Expected message to be a TextMessage", message2 instanceof 
TextMessage);
-        assertTrue("Expected message to be a '" + message.getText() + "'", 
((TextMessage)message2).getText().equals(message.getText()));
+        assertEquals("Expected message to be a '" + message.getText() + "'",
+                ((TextMessage) message2).getText(), message.getText());
 
     }
 
@@ -172,11 +278,12 @@ public class JmsTempDestinationTest extends TestCase {
      *
      * @throws JMSException
      */
+    @Test
     public void testTmpQueueWorksUnderLoad() throws JMSException {
         int count = 500;
         int dataSize = 1024;
 
-        ArrayList<BytesMessage> list = new ArrayList<BytesMessage>(count);
+        ArrayList<BytesMessage> list = new ArrayList<>(count);
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         Queue queue = session.createTemporaryQueue();
         MessageProducer producer = session.createProducer(queue);
@@ -195,9 +302,9 @@ public class JmsTempDestinationTest extends TestCase {
         MessageConsumer consumer = session.createConsumer(queue);
         for (int i = 0; i < count; i++) {
             Message message2 = consumer.receive(2000);
-            assertTrue(message2 != null);
+            assertNotNull(message2);
             assertEquals(i, message2.getIntProperty("c"));
-            assertTrue(message2.equals(list.get(i)));
+            assertEquals(message2, list.get(i));
         }
     }
 
@@ -209,7 +316,10 @@ public class JmsTempDestinationTest extends TestCase {
      * @throws InterruptedException
      * @throws URISyntaxException
      */
+    @Test
     public void testPublishFailsForClosedConnection() throws Exception {
+        // This test is slow and we only need to run this test with the default
+        Assume.assumeFalse(allowTempDestinationStealing);
 
         Connection tempConnection = factory.createConnection();
         connections.add(tempConnection);
@@ -252,7 +362,10 @@ public class JmsTempDestinationTest extends TestCase {
      * @throws JMSException
      * @throws InterruptedException
      */
+    @Test
     public void testPublishFailsForDestroyedTempDestination() throws Exception 
{
+        // This test is slow and we only need to run this test with the default
+        Assume.assumeFalse(allowTempDestinationStealing);
 
         Connection tempConnection = factory.createConnection();
         connections.add(tempConnection);
@@ -293,6 +406,7 @@ public class JmsTempDestinationTest extends TestCase {
      *
      * @throws JMSException
      */
+    @Test
     public void testDeleteDestinationWithSubscribersFails() throws 
JMSException {
         Connection connection = factory.createConnection();
         connections.add(connection);
@@ -313,7 +427,11 @@ public class JmsTempDestinationTest extends TestCase {
         }
     }
 
+    @Test
     public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception {
+        // This test is slow and we only need to run this test with the default
+        Assume.assumeFalse(allowTempDestinationStealing);
+
         ActiveMQConnectionFactory advisoryConnFactory = new 
ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20");
         Connection connection = advisoryConnFactory.createConnection();
         connections.add(connection);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to