This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ddac006161 ARTEMIS-4585 Previously installed mirror queues with 
metrics plugin would make mirror non usable
ddac006161 is described below

commit ddac006161587a75b6907bf9ca78b8006ccf8b74
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Fri Jan 26 12:38:22 2024 -0500

    ARTEMIS-4585 Previously installed mirror queues with metrics plugin would 
make mirror non usable
---
 .../amqp/connect/AMQPBrokerConnection.java         |   6 +-
 .../mirror/InterruptedLargeMessageTest.java        | 128 ++++++++++++++++++---
 2 files changed, 118 insertions(+), 16 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 97a6287f26..2a08d0ec5e 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -507,7 +507,11 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
          mirrorControlQueue = server.createQueue(new 
QueueConfiguration(getMirrorSNF(replicaConfig)).setAddress(getMirrorSNF(replicaConfig)).setRoutingType(RoutingType.ANYCAST).setDurable(replicaConfig.isDurable()).setInternal(true),
 true);
       }
 
-      server.registerQueueOnManagement(mirrorControlQueue, true);
+      try {
+         server.registerQueueOnManagement(mirrorControlQueue, true);
+      } catch (Throwable ignored) {
+         logger.debug(ignored.getMessage(), ignored);
+      }
 
       logger.debug("Mirror queue {}", mirrorControlQueue.getName());
 
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
index eb58f1a065..25f060e5ad 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/InterruptedLargeMessageTest.java
@@ -27,11 +27,19 @@ import javax.jms.TextMessage;
 import java.io.File;
 import java.io.StringWriter;
 import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.core.config.Configuration;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.tests.soak.SoakTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
@@ -63,6 +71,8 @@ public class InterruptedLargeMessageTest extends SoakTestBase 
{
    public static final String DC1_NODE_A = "interruptLarge/DC1";
    public static final String DC2_NODE_A = "interruptLarge/DC2";
 
+   private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
+
    Process processDC1_node_A;
    Process processDC2_node_A;
 
@@ -94,6 +104,36 @@ public class InterruptedLargeMessageTest extends 
SoakTestBase {
       brokerProperties.put("largeMessageSync", "false");
       File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
       saveProperties(brokerProperties, brokerPropertiesFile);
+
+      Path configPath = new File(getServerLocation(serverName), 
"./etc/broker.xml").toPath();
+
+      String brokerXML = Files.readString(configPath);
+
+      // the SimpleMetricsPlugin needs to be added throught the XML
+      String insert;
+      {
+         StringWriter insertWriter = new StringWriter();
+
+         insertWriter.write("\n");
+         insertWriter.write("      <metrics>\n");
+         insertWriter.write("         <jvm-memory>false</jvm-memory>\n");
+         insertWriter.write("         <jvm-gc>true</jvm-gc>\n");
+         insertWriter.write("         <jvm-threads>true</jvm-threads>\n");
+         insertWriter.write("         <netty-pool>true</netty-pool>\n");
+         insertWriter.write("         <plugin 
class-name=\"org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin\">\n");
+         insertWriter.write("            <property key=\"foo\" 
value=\"x\"/>\n");
+         insertWriter.write("            <property key=\"bar\" 
value=\"y\"/>\n");
+         insertWriter.write("            <property key=\"baz\" 
value=\"z\"/>\n");
+         insertWriter.write("         </plugin>\n");
+         insertWriter.write("      </metrics>\n");
+         insertWriter.write("  </core>\n");
+         insert = insertWriter.toString();
+      }
+
+      brokerXML = brokerXML.replace("</core>", insert);
+      Assert.assertTrue(brokerXML.contains("SimpleMetricsPlugin"));
+
+      Files.writeString(configPath, brokerXML);
    }
 
    @BeforeClass
@@ -102,34 +142,50 @@ public class InterruptedLargeMessageTest extends 
SoakTestBase {
       createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2);
    }
 
-   private void startDC1() throws Exception {
-      processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new 
File(getServerLocation(DC1_NODE_A), "broker.properties"));
-      ServerUtil.waitForServerToStart(0, 10_000);
-   }
-
    @Before
    public void cleanupServers() {
       cleanupData(DC1_NODE_A);
       cleanupData(DC2_NODE_A);
    }
 
-   @Test
+   @Test(timeout = 240_000L)
    public void testAMQP() throws Exception {
       testInterrupt("AMQP");
    }
 
-   @Test
+   @Test(timeout = 240_000L)
    public void testCORE() throws Exception {
       testInterrupt("CORE");
    }
 
+   private void preCreateInternalQueues(String serverLocation) throws 
Exception {
+      Configuration configuration = createDefaultConfig(0, false);
+      configuration.setJournalDirectory(getServerLocation(serverLocation) + 
"/data/journal");
+      
configuration.setJournalFileSize(ActiveMQDefaultConfiguration.getDefaultJournalFileSize());
+      configuration.setBindingsDirectory(getServerLocation(serverLocation) + 
"/data/bindings");
+      
configuration.setLargeMessagesDirectory(getServerLocation(serverLocation) + 
"/data/large-messages");
+
+      ActiveMQServer server = createServer(true, configuration);
+      server.start();
+      try {
+         server.addAddressInfo(new 
AddressInfo(SNF_QUEUE).addRoutingType(RoutingType.ANYCAST).setInternal(false));
+         server.createQueue(new 
QueueConfiguration(SNF_QUEUE).setRoutingType(RoutingType.ANYCAST).setAddress(SNF_QUEUE).setDurable(true).setInternal(false));
+      } catch (Throwable error) {
+         logger.warn(error.getMessage(), error);
+      }
+      server.stop();
+   }
+
    private void testInterrupt(final String protocol) throws Exception {
+      // This will force internal queues as "non internal"
+      // this is in an attempt to create issues between versions of the broker
+      preCreateInternalQueues(DC1_NODE_A);
+      preCreateInternalQueues(DC2_NODE_A);
+
       startDC1();
 
       final int numberOfMessages = 400;
 
-      String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
-
       ConnectionFactory connectionFactoryDC1A = 
CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI);
       ConnectionFactory connectionFactoryDC2A = 
CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI);
 
@@ -154,11 +210,43 @@ public class InterruptedLargeMessageTest extends 
SoakTestBase {
 
          startDC2();
 
-         // Waiting for at least one large message file in the target server
-         Wait.assertTrue(() -> getNumberOfLargeMessages(DC2_NODE_A) > 0, 5000, 
100);
+         // We will keep interrupting the servers alternatively until all 
messages were transferred
+         boolean interruptSource = true;
+         while (getNumberOfLargeMessages(DC2_NODE_A) < numberOfMessages) {
+            if (interruptSource) {
+               stopDC1();
+            } else {
+               stopDC2();
+            }
+
+            long messagesBeforeStart = getNumberOfLargeMessages(DC2_NODE_A);
+
+            if (interruptSource) {
+               startDC1();
+            } else {
+               startDC2();
+            }
+
+            interruptSource = !interruptSource; // switch which side we are 
interrupting next time
+
+            long currentMessages = messagesBeforeStart;
+
+            // Waiting some progress
+            while (currentMessages == messagesBeforeStart && currentMessages < 
numberOfMessages) {
+               currentMessages = getNumberOfLargeMessages(DC2_NODE_A);
+               Thread.sleep(100);
+            }
+
+            Thread.sleep(2000);
+
+            currentMessages = getNumberOfLargeMessages(DC2_NODE_A);
+            if (logger.isDebugEnabled()) {
+               
logger.debug("*******************************************************************************************************************************");
+               logger.debug("There are currently {} in the broker", 
currentMessages);
+               
logger.debug("*******************************************************************************************************************************");
+            }
+         }
 
-         stopDC2();
-         startDC2();
       }
 
       try (Connection connection = connectionFactoryDC2A.createConnection()) {
@@ -177,8 +265,8 @@ public class InterruptedLargeMessageTest extends 
SoakTestBase {
          session.commit();
       }
 
-      Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, snfQueue));
-      Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, snfQueue));
+      Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, SNF_QUEUE));
+      Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, SNF_QUEUE));
       Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, QUEUE_NAME));
       Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, QUEUE_NAME));
 
@@ -192,6 +280,16 @@ public class InterruptedLargeMessageTest extends 
SoakTestBase {
       return lmFolder.list().length;
    }
 
+   private void startDC1() throws Exception {
+      processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new 
File(getServerLocation(DC1_NODE_A), "broker.properties"));
+      ServerUtil.waitForServerToStart(0, 10_000);
+   }
+
+   private void stopDC1() throws Exception {
+      processDC1_node_A.destroyForcibly();
+      Assert.assertTrue(processDC1_node_A.waitFor(10, TimeUnit.SECONDS));
+   }
+
    private void stopDC2() throws Exception {
       processDC2_node_A.destroyForcibly();
       Assert.assertTrue(processDC2_node_A.waitFor(10, TimeUnit.SECONDS));

Reply via email to