This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d29742d40 ARTEMIS-3862 Adding Topic Durable Subscription to 
RemoveSubscriptionRaceTest
8d29742d40 is described below

commit 8d29742d406500bd86e50e57928e7e9ac8598440
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Jun 21 13:24:30 2022 -0400

    ARTEMIS-3862 Adding Topic Durable Subscription to RemoveSubscriptionRaceTest
    
    This commit just contains test additions
---
 .../client/RemoveSubscriptionRaceTest.java         | 37 ++++++++++++++++++----
 1 file changed, 31 insertions(+), 6 deletions(-)

diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java
index 0911d69e5b..6517219425 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java
@@ -62,25 +62,35 @@ public class RemoveSubscriptionRaceTest extends 
ActiveMQTestBase {
 
    @Test
    public void testCreateSubscriptionCoreNoFiles() throws Exception {
-      internalTest("core", false, 5, 1000);
+      internalTest("core", false, 5, 1000, false);
    }
 
    @Test
    public void testCreateSubscriptionAMQPNoFiles() throws Exception {
-      internalTest("amqp", false, 5, 1000);
+      internalTest("amqp", false, 5, 1000, false);
    }
 
    @Test
    public void testCreateSubscriptionCoreRealFiles() throws Exception {
-      internalTest("core", true, 2, 200);
+      internalTest("core", true, 2, 200, false);
    }
 
    @Test
    public void testCreateSubscriptionAMQPRealFiles() throws Exception {
-      internalTest("amqp", true, 2, 200);
+      internalTest("amqp", true, 2, 200, false);
    }
 
-   public void internalTest(String protocol, boolean realFiles, int threads, 
int numberOfMessages) throws Exception {
+   @Test
+   public void testCreateSubscriptionCoreRealFilesDurable() throws Exception {
+      internalTest("core", true, 2, 200, true);
+   }
+
+   @Test
+   public void testCreateSubscriptionAMQPRealFilesDurable() throws Exception {
+      internalTest("amqp", true, 2, 200, true);
+   }
+
+   public void internalTest(String protocol, boolean realFiles, int threads, 
int numberOfMessages, boolean durableSub) throws Exception {
       server = createServer(realFiles, true);
       server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(SUB_NAME).addRoutingType(RoutingType.MULTICAST));
       server.getConfiguration().addQueueConfiguration(new 
QueueConfiguration().setName("Sub_1").setAddress(SUB_NAME).setRoutingType(RoutingType.MULTICAST));
@@ -99,19 +109,32 @@ public class RemoveSubscriptionRaceTest extends 
ActiveMQTestBase {
       CyclicBarrier flagStart = new CyclicBarrier(threads + 1);
 
       for (int i = 0; i < threads; i++) {
+         final int threadNumber = i;
          executorService.execute(() -> {
             try {
                flagStart.await(10, TimeUnit.SECONDS);
                for (int n = 0; n < numberOfMessages && running.get(); n++) {
                   Connection connection = factory.createConnection();
+                  if (durableSub) {
+                     connection.setClientID("t" + threadNumber);
+                  }
                   connection.start();
                   Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
                   Topic topic = session.createTopic(SUB_NAME);
-                  MessageConsumer consumer = session.createConsumer(topic);
+                  MessageConsumer consumer;
+                  if (durableSub) {
+                     consumer = session.createDurableSubscriber(topic, "t" + 
threadNumber);
+                  } else {
+                     consumer = session.createConsumer(topic);
+                  }
                   Message message = consumer.receiveNoWait();
                   if (message != null) {
                      message.acknowledge();
                   }
+                  consumer.close();
+                  if (durableSub) {
+                     session.unsubscribe("t" + threadNumber);
+                  }
                   connection.close();
                }
             } catch (Throwable e) {
@@ -150,6 +173,8 @@ public class RemoveSubscriptionRaceTest extends 
ActiveMQTestBase {
       Wait.assertEquals(0, this::countAddMessage, 5000, 100);
 
       Wait.assertEquals(0L, queue.getPagingStore()::getAddressSize, 2000, 100);
+
+      Assert.assertEquals(0, errors.get());
    }
 
    int countAddMessage() throws Exception {

Reply via email to