This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 4db39520f253eabf3eae408fdf5a00a6944f4bd9
Author: a181321 <anton.roskv...@volvo.com>
AuthorDate: Wed Oct 4 13:44:37 2023 +0200

    ARTEMIS-4450 - Auto-deleted clustered destinations can cause message loss
---
 .../core/postoffice/impl/PostOfficeImpl.java       |   2 +-
 .../AutoDeleteClusteredDestinationTest.java        | 150 +++++++++++++++++++++
 2 files changed, 151 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index bb6502605d..f5d6f0ad23 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1269,7 +1269,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
 
    Bindings simpleRoute(SimpleString address, RoutingContext context, Message 
message, AddressInfo addressInfo) throws Exception {
       Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
-      if (bindings == null && context.getServerSession() != null) {
+      if ((bindings == null || !bindings.contains(LocalQueueBinding.class)) && 
context.getServerSession() != null) {
          AutoCreateResult autoCreateResult = 
context.getServerSession().checkAutoCreate(new 
QueueConfiguration(address).setRoutingType(context.getRoutingType()));
          if (autoCreateResult == AutoCreateResult.NOT_FOUND) {
             ActiveMQException e = 
ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AutoDeleteClusteredDestinationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AutoDeleteClusteredDestinationTest.java
new file mode 100644
index 0000000000..8c6898cffa
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/AutoDeleteClusteredDestinationTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class AutoDeleteClusteredDestinationTest extends ClusterTestBase {
+
+   @Parameterized.Parameter(0)
+   public MessageLoadBalancingType loadBalancingType = 
MessageLoadBalancingType.OFF;
+
+   @Parameterized.Parameters(name = "loadBalancingType = {0}")
+   public static Iterable<? extends Object> loadBalancingType() {
+      return Arrays.asList(new Object[][]{
+         {MessageLoadBalancingType.OFF},
+         {MessageLoadBalancingType.STRICT},
+         {MessageLoadBalancingType.ON_DEMAND},
+         {MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION}});
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+   }
+
+   @Test
+   public void testConnectionLoadBalancingAnonRun() throws Exception {
+      final String queueName = "queue";
+      final String url0 = 
"tcp://localhost:61616?useTopologyForLoadBalancing=false";
+      final String url1 = 
"tcp://localhost:61617?useTopologyForLoadBalancing=false";
+      final SimpleString simpleName = SimpleString.toSimpleString(queueName);
+      final int messageCount = 10;
+      final int TIMEOUT = 5000;
+
+      CountDownLatch latch = new CountDownLatch(messageCount);
+
+      MessageListener listener0 = message -> {
+         latch.countDown();
+      };
+
+      MessageListener listener1 = message -> {
+         latch.countDown();
+      };
+
+      setupClusterConnection("cluster0", queueName, loadBalancingType, 1, 
isNetty(), 0, 1);
+      setupClusterConnection("cluster1", queueName, loadBalancingType, 1, 
isNetty(), 1, 0);
+
+      startServers(0, 1);
+      waitForServerToStart(servers[0]);
+      waitForServerToStart(servers[1]);
+
+      AddressSettings settings = new 
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true).setAutoDeleteQueues(true);
+
+      servers[0].getAddressSettingsRepository().addMatch(queueName, settings);
+      servers[1].getAddressSettingsRepository().addMatch(queueName, settings);
+
+      try (
+         ActiveMQConnectionFactory connectionFactory0 = new 
ActiveMQConnectionFactory(url0);
+         ActiveMQConnectionFactory connectionFactory1 = new 
ActiveMQConnectionFactory(url1);
+         Connection connection0 = connectionFactory0.createConnection();
+         Connection connection1 = connectionFactory1.createConnection();
+         Session consumerSession0 = connection0.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session consumerSession1 = connection1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session producerSession0 = connection0.createSession(false, 
Session.AUTO_ACKNOWLEDGE)) {
+
+         Queue queue = consumerSession0.createQueue(queueName);
+         MessageProducer producer0 = producerSession0.createProducer(queue);
+         MessageConsumer consumer0 = consumerSession0.createConsumer(queue);
+         MessageConsumer consumer1 = consumerSession1.createConsumer(queue);
+
+         consumer0.setMessageListener(listener0);
+         consumer1.setMessageListener(listener1);
+         connection0.start();
+         connection1.start();
+
+         for (int i = 0; i < messageCount; i++) {
+            producer0.send(producerSession0.createTextMessage("Message"));
+
+            if (i == 2) {
+               QueueImpl serverQueue = (QueueImpl) 
servers[0].locateQueue(simpleName);
+               Wait.assertTrue(() -> serverQueue.getMessageCount() == 0, 
TIMEOUT, 100);
+               consumer0.close();
+               //Trigger an auto-delete to not have to wait
+               QueueManagerImpl.performAutoDeleteQueue(servers[0], 
serverQueue);
+               Wait.assertTrue(() -> 
servers[0].getPostOffice().getAddressInfo(simpleName).getBindingRemovedTimestamp()
 != -1, TIMEOUT, 100);
+            }
+
+            if (i == 6) {
+               consumer0 = consumerSession0.createConsumer(queue);
+               consumer0.setMessageListener(listener0);
+               QueueImpl serverQueue = (QueueImpl) 
servers[0].locateQueue(simpleName);
+               Wait.assertTrue(() -> serverQueue.getConsumerCount() == 1, 
TIMEOUT, 100);
+            }
+         }
+
+         Assert.assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS));
+
+      }
+   }
+
+   protected boolean isNetty() {
+      return true;
+   }
+
+   @Override
+   protected boolean isFileStorage() {
+      return false;
+   }
+
+}

Reply via email to