This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new ddac006161 ARTEMIS-4585 Previously installed mirror queues with metrics plugin would make mirror non usable ddac006161 is described below commit ddac006161587a75b6907bf9ca78b8006ccf8b74 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Fri Jan 26 12:38:22 2024 -0500 ARTEMIS-4585 Previously installed mirror queues with metrics plugin would make mirror non usable --- .../amqp/connect/AMQPBrokerConnection.java | 6 +- .../mirror/InterruptedLargeMessageTest.java | 128 ++++++++++++++++++--- 2 files changed, 118 insertions(+), 16 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index 97a6287f26..2a08d0ec5e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -507,7 +507,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, mirrorControlQueue = server.createQueue(new QueueConfiguration(getMirrorSNF(replicaConfig)).setAddress(getMirrorSNF(replicaConfig)).setRoutingType(RoutingType.ANYCAST).setDurable(replicaConfig.isDurable()).setInternal(true), true); } - server.registerQueueOnManagement(mirrorControlQueue, true); + try { + server.registerQueueOnManagement(mirrorControlQueue, true); + } catch (Throwable ignored) { + logger.debug(ignored.getMessage(), ignored); + } logger.debug("Mirror queue {}", mirrorControlQueue.getName()); diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java index eb58f1a065..25f060e5ad 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java @@ -27,11 +27,19 @@ import javax.jms.TextMessage; import java.io.File; import java.io.StringWriter; import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Properties; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.management.SimpleManagement; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.soak.SoakTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; @@ -63,6 +71,8 @@ public class InterruptedLargeMessageTest extends SoakTestBase { public static final String DC1_NODE_A = "interruptLarge/DC1"; public static final String DC2_NODE_A = "interruptLarge/DC2"; + private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; + Process processDC1_node_A; Process processDC2_node_A; @@ -94,6 +104,36 @@ public class InterruptedLargeMessageTest extends SoakTestBase { brokerProperties.put("largeMessageSync", "false"); File brokerPropertiesFile = new File(serverLocation, "broker.properties"); saveProperties(brokerProperties, brokerPropertiesFile); + + Path configPath = new File(getServerLocation(serverName), "./etc/broker.xml").toPath(); + + String brokerXML = Files.readString(configPath); + + // the SimpleMetricsPlugin needs to be added throught the XML + String insert; + { + StringWriter insertWriter = new StringWriter(); + + insertWriter.write("\n"); + insertWriter.write(" <metrics>\n"); + insertWriter.write(" <jvm-memory>false</jvm-memory>\n"); + insertWriter.write(" <jvm-gc>true</jvm-gc>\n"); + insertWriter.write(" <jvm-threads>true</jvm-threads>\n"); + insertWriter.write(" <netty-pool>true</netty-pool>\n"); + insertWriter.write(" <plugin class-name=\"org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin\">\n"); + insertWriter.write(" <property key=\"foo\" value=\"x\"/>\n"); + insertWriter.write(" <property key=\"bar\" value=\"y\"/>\n"); + insertWriter.write(" <property key=\"baz\" value=\"z\"/>\n"); + insertWriter.write(" </plugin>\n"); + insertWriter.write(" </metrics>\n"); + insertWriter.write(" </core>\n"); + insert = insertWriter.toString(); + } + + brokerXML = brokerXML.replace("</core>", insert); + Assert.assertTrue(brokerXML.contains("SimpleMetricsPlugin")); + + Files.writeString(configPath, brokerXML); } @BeforeClass @@ -102,34 +142,50 @@ public class InterruptedLargeMessageTest extends SoakTestBase { createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2); } - private void startDC1() throws Exception { - processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties")); - ServerUtil.waitForServerToStart(0, 10_000); - } - @Before public void cleanupServers() { cleanupData(DC1_NODE_A); cleanupData(DC2_NODE_A); } - @Test + @Test(timeout = 240_000L) public void testAMQP() throws Exception { testInterrupt("AMQP"); } - @Test + @Test(timeout = 240_000L) public void testCORE() throws Exception { testInterrupt("CORE"); } + private void preCreateInternalQueues(String serverLocation) throws Exception { + Configuration configuration = createDefaultConfig(0, false); + configuration.setJournalDirectory(getServerLocation(serverLocation) + "/data/journal"); + configuration.setJournalFileSize(ActiveMQDefaultConfiguration.getDefaultJournalFileSize()); + configuration.setBindingsDirectory(getServerLocation(serverLocation) + "/data/bindings"); + configuration.setLargeMessagesDirectory(getServerLocation(serverLocation) + "/data/large-messages"); + + ActiveMQServer server = createServer(true, configuration); + server.start(); + try { + server.addAddressInfo(new AddressInfo(SNF_QUEUE).addRoutingType(RoutingType.ANYCAST).setInternal(false)); + server.createQueue(new QueueConfiguration(SNF_QUEUE).setRoutingType(RoutingType.ANYCAST).setAddress(SNF_QUEUE).setDurable(true).setInternal(false)); + } catch (Throwable error) { + logger.warn(error.getMessage(), error); + } + server.stop(); + } + private void testInterrupt(final String protocol) throws Exception { + // This will force internal queues as "non internal" + // this is in an attempt to create issues between versions of the broker + preCreateInternalQueues(DC1_NODE_A); + preCreateInternalQueues(DC2_NODE_A); + startDC1(); final int numberOfMessages = 400; - String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror"; - ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI); ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI); @@ -154,11 +210,43 @@ public class InterruptedLargeMessageTest extends SoakTestBase { startDC2(); - // Waiting for at least one large message file in the target server - Wait.assertTrue(() -> getNumberOfLargeMessages(DC2_NODE_A) > 0, 5000, 100); + // We will keep interrupting the servers alternatively until all messages were transferred + boolean interruptSource = true; + while (getNumberOfLargeMessages(DC2_NODE_A) < numberOfMessages) { + if (interruptSource) { + stopDC1(); + } else { + stopDC2(); + } + + long messagesBeforeStart = getNumberOfLargeMessages(DC2_NODE_A); + + if (interruptSource) { + startDC1(); + } else { + startDC2(); + } + + interruptSource = !interruptSource; // switch which side we are interrupting next time + + long currentMessages = messagesBeforeStart; + + // Waiting some progress + while (currentMessages == messagesBeforeStart && currentMessages < numberOfMessages) { + currentMessages = getNumberOfLargeMessages(DC2_NODE_A); + Thread.sleep(100); + } + + Thread.sleep(2000); + + currentMessages = getNumberOfLargeMessages(DC2_NODE_A); + if (logger.isDebugEnabled()) { + logger.debug("*******************************************************************************************************************************"); + logger.debug("There are currently {} in the broker", currentMessages); + logger.debug("*******************************************************************************************************************************"); + } + } - stopDC2(); - startDC2(); } try (Connection connection = connectionFactoryDC2A.createConnection()) { @@ -177,8 +265,8 @@ public class InterruptedLargeMessageTest extends SoakTestBase { session.commit(); } - Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, snfQueue)); - Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, snfQueue)); + Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, SNF_QUEUE)); + Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, SNF_QUEUE)); Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, QUEUE_NAME)); Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, QUEUE_NAME)); @@ -192,6 +280,16 @@ public class InterruptedLargeMessageTest extends SoakTestBase { return lmFolder.list().length; } + private void startDC1() throws Exception { + processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties")); + ServerUtil.waitForServerToStart(0, 10_000); + } + + private void stopDC1() throws Exception { + processDC1_node_A.destroyForcibly(); + Assert.assertTrue(processDC1_node_A.waitFor(10, TimeUnit.SECONDS)); + } + private void stopDC2() throws Exception { processDC2_node_A.destroyForcibly(); Assert.assertTrue(processDC2_node_A.waitFor(10, TimeUnit.SECONDS));