[ 
https://issues.apache.org/jira/browse/ARTEMIS-4247?focusedWorklogId=858505&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-858505
 ]

ASF GitHub Bot logged work on ARTEMIS-4247:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Apr/23 20:08
            Start Date: 21/Apr/23 20:08
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174126520


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = 
SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int 
clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new
 
QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new 
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", 
"tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", 
"tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new 
ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new 
AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + 
mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
 AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new 
SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = 
sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = 
sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being 
copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating 
through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != 
null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2
+      
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
+         logger.debug("{} = {}", n, b);
+         if (b instanceof LocalQueueBinding) {
+            QueueBinding qb = (QueueBinding) b;
+            Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null);
+         }
+      });

Review Comment:
   a1.locateQueue
   a2.locateQueue
   b1.locateQueue
   b2.locateQueue
   
   
   these calls are already doing that... they would return null if that didn't 
succeed.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 858505)
    Time Spent: 2h 20m  (was: 2h 10m)

> Inconsistencies between AMQP Mirror and Artemis Clustering
> ----------------------------------------------------------
>
>                 Key: ARTEMIS-4247
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4247
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Clebert Suconic
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> - activemq.notifications are being transferred to the target node, unless an 
> ignore is setup
> - topics are being duplicated after redistribution
> - topics sends are being duplicated when a 2 node cluster mirrors to another 
> 2 node cluster, and both nodes are mirrored. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to