[ https://issues.apache.org/jira/browse/ARTEMIS-4247?focusedWorklogId=858505&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-858505 ]
ASF GitHub Bot logged work on ARTEMIS-4247: ------------------------------------------- Author: ASF GitHub Bot Created on: 21/Apr/23 20:08 Start Date: 21/Apr/23 20:08 Worklog Time Spent: 10m Work Description: clebertsuconic commented on code in PR #4443: URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174126520 ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java: ########## @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MirrorOption; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl; +import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQPRedistributeClusterTest extends AmqpTestSupport { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String QUEUE_NAME = "REDIST_QUEUE"; + private static final String TOPIC_NAME = "REDIST_TOPIC"; + private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC"); + + protected static final int A_1_PORT = 5673; + protected static final int A_2_PORT = 5674; + + ActiveMQServer a1; + ActiveMQServer a2; + + protected static final int B_1_PORT = 5773; + protected static final int B_2_PORT = 5774; + + ActiveMQServer b1; + ActiveMQServer b2; + + @Before + public void setCluster() throws Exception { + a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT); + a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT); + + a1.start(); + a2.start(); + + b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1); + b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1); + + b1.start(); + b2.start(); + } + + private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception { + ActiveMQServer server = createServer(thisPort, false); + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST))); + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST)); + server.getConfiguration().clearAddressSettings(); + server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0)); + + + server.setIdentity(name); + server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort); + + ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode")); + server.getConfiguration().addClusterConfiguration(clusterConfiguration); + + if (mirrorPort > 0) { + server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort)))); + } + + return server; + } + + @Test + public void testQueueRedistributionAMQP() throws Exception { + internalQueueRedistribution("AMQP"); + } + + @Test + public void testQueueRedistributionCORE() throws Exception { + internalQueueRedistribution("CORE"); + } + + public void internalQueueRedistribution(String protocol) throws Exception { + AssertionLoggerHandler.startCapture(); + runAfter((AssertionLoggerHandler::stopCapture)); + + ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT); + ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT); + try (Connection conn = cfA1.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } + } + + try (Connection connA1 = cfA1.createConnection(); + Connection connA2 = cfA2.createConnection()) { + + connA1.start(); + connA2.start(); + + Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + for (int i = 0; i < 100; i++) { + MessageConsumer consumer; + String place; + if (i % 2 == 0) { + place = "A1"; + consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME)); + } else { + place = "A2"; + consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME)); + } + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(message); + logger.debug("Received message {} from {}", message, place); + consumer.close(); + } + } + + assertEmptyQueue(a1.locateQueue(QUEUE_NAME)); + assertEmptyQueue(a2.locateQueue(QUEUE_NAME)); + assertEmptyQueue(b1.locateQueue(QUEUE_NAME)); + assertEmptyQueue(b2.locateQueue(QUEUE_NAME)); + + // if you see this message, most likely the notifications are being copied to the mirror + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196")); + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037")); + } + + @Test + public void testTopicRedistributionAMQP() throws Exception { + internalTopicRedistribution("AMQP"); + } + + @Test + public void testTopicRedistributionCORE() throws Exception { + internalTopicRedistribution("CORE"); + } + + public void internalTopicRedistribution(String protocol) throws Exception { + + AssertionLoggerHandler.startCapture(); + runAfter((AssertionLoggerHandler::stopCapture)); + + final int numMessages = 100; + + String subscriptionName = "my-topic-shared-subscription"; + + ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT); + ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT); + + Topic topic; + + try (Connection conn = cfA1.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(TOPIC_NAME); + MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName); + consumer.close(); + } + + try (Connection conn = cfA2.createConnection()) { + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(TOPIC_NAME); + MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName); + consumer.close(); + } + + Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2); + Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2); + Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2); + Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2); + + // naming convention is different between the protocols, I'm navigating through the bindings to find the actual queue name + String subscriptionQueueName; + + { + HashSet<String> subscriptionSet = new HashSet<>(); + // making sure the queues created on a1 are propaged into b1 + a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> { + logger.debug("{} = {}", n, b); + if (b instanceof LocalQueueBinding) { + QueueBinding qb = (QueueBinding) b; + subscriptionSet.add(qb.getUniqueName().toString()); + Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != null); + } + }); + Assert.assertEquals(1, subscriptionSet.size()); + subscriptionQueueName = subscriptionSet.iterator().next(); + } + + // making sure the queues created on a2 are propaged into b2 + a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> { + logger.debug("{} = {}", n, b); + if (b instanceof LocalQueueBinding) { + QueueBinding qb = (QueueBinding) b; + Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null); + } + }); Review Comment: a1.locateQueue a2.locateQueue b1.locateQueue b2.locateQueue these calls are already doing that... they would return null if that didn't succeed. Issue Time Tracking ------------------- Worklog Id: (was: 858505) Time Spent: 2h 20m (was: 2h 10m) > Inconsistencies between AMQP Mirror and Artemis Clustering > ---------------------------------------------------------- > > Key: ARTEMIS-4247 > URL: https://issues.apache.org/jira/browse/ARTEMIS-4247 > Project: ActiveMQ Artemis > Issue Type: Bug > Reporter: Clebert Suconic > Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > - activemq.notifications are being transferred to the target node, unless an > ignore is setup > - topics are being duplicated after redistribution > - topics sends are being duplicated when a 2 node cluster mirrors to another > 2 node cluster, and both nodes are mirrored. -- This message was sent by Atlassian Jira (v8.20.10#820010)