This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ccc4c006d9 ARTEMIS-5801 Add a subscription name option to the consumer 
CLI tool
ccc4c006d9 is described below

commit ccc4c006d9567d5e2767052957ce7d3edb5e6c89
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Dec 9 13:52:15 2025 -0500

    ARTEMIS-5801 Add a subscription name option to the consumer CLI tool
    
    Adds option 'subscriptionName' to the consumer CLI options to allow
    consuming from an existing durable subscription not created by the CLI
---
 .../artemis/cli/commands/messages/Consumer.java    | 27 +++++++++++-
 .../cli/commands/messages/ConsumerThread.java      |  7 ++-
 .../apache/activemq/cli/test/CliConsumerTest.java  | 50 +++++++++++++++++++++-
 3 files changed, 81 insertions(+), 3 deletions(-)

diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
index ef81f1215b..1965d893f6 100644
--- 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
@@ -50,6 +50,9 @@ public class Consumer extends DestAbstract {
    @Option(names = "--data", description = "Serialize the messages to the 
specified file as they are consumed.")
    String file;
 
+   @Option(names = "--subscriptionName", description = "The subscription name 
to use for a durable consumer.")
+   String subscriptionName;
+
    @Override
    public Object execute(ActionContext context) throws Exception {
       super.execute(context);
@@ -78,6 +81,15 @@ public class Consumer extends DestAbstract {
          serializer.start();
       }
 
+      if (subscriptionName != null) {
+         if (threads != 1) {
+            context.err.println("Error: Cannot assign a subscription name when 
multiple threads are also configured.");
+            return null;
+         } else {
+            context.out.println("Consumer:: subscription name = " + 
subscriptionName);
+         }
+      }
+
       ConnectionFactory factory = createConnectionFactory();
 
       try (Connection connection = factory.createConnection()) {
@@ -92,7 +104,11 @@ public class Consumer extends DestAbstract {
             }
 
             Destination dest = getDestination(session);
-            threadsArray[i] = new ConsumerThread(session, dest, i, context);
+            if (subscriptionName == null) {
+               threadsArray[i] = new ConsumerThread(session, dest, i, context);
+            } else {
+               threadsArray[i] = new ConsumerThread(session, dest, 
subscriptionName, context);
+            }
 
             threadsArray[i]
                .setVerbose(verbose)
@@ -191,4 +207,13 @@ public class Consumer extends DestAbstract {
       this.file = file;
       return this;
    }
+
+   public String getSubscriptionName() {
+      return subscriptionName;
+   }
+
+   public Consumer setSubscriptionName(String subscriptionName) {
+      this.subscriptionName = subscriptionName;
+      return this;
+   }
 }
diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
index 391cded2b1..2bdac84d76 100644
--- 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
@@ -57,7 +57,12 @@ public class ConsumerThread extends Thread {
    MessageListener listener;
 
    public ConsumerThread(Session session, Destination destination, int 
threadNr, ActionContext context) {
-      super("Consumer " + destination.toString() + ", thread=" + threadNr);
+      this(session, destination, "Consumer " + destination.toString() + ", 
thread=" + threadNr, context);
+   }
+
+   public ConsumerThread(Session session, Destination destination, String 
subscriptionName, ActionContext context) {
+      super(subscriptionName);
+
       this.destination = destination;
       this.session = session;
       this.context = context;
diff --git 
a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java 
b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java
index f43e1f5d2b..4484acec28 100644
--- 
a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java
+++ 
b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java
@@ -17,8 +17,9 @@
 package org.apache.activemq.cli.test;
 
 import javax.jms.Connection;
-
+import javax.jms.Session;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.cli.commands.messages.Consumer;
 import org.apache.activemq.artemis.cli.commands.messages.Producer;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CliConsumerTest extends CliTestBase {
@@ -113,4 +115,50 @@ public class CliConsumerTest extends CliTestBase {
 
       Wait.assertEquals(0L, () -> 
server.locateQueue(address).getMessageCount(), 2000, 50);
    }
+
+   @Test
+   public void testConsumeFromExistingDurableSubscription() throws Exception {
+      final String address = "test-topic";
+      final String addressPrefix = "topic://";
+      final String clientID = "test-client";
+      final String subscriptionName = "test-sub";
+      final String credentials = "admin";
+      final TestActionContext context = new TestActionContext();
+
+      // Creates the durable subscription to consumer from using the CLI tool.
+      try (Connection connection = cf.createConnection(credentials, 
credentials)) {
+
+         connection.setClientID(clientID);
+         connection.start();
+
+         final Session session = createSession(connection);
+
+         session.createDurableConsumer(session.createTopic(address), 
subscriptionName);
+      }
+
+      produceMessages(addressPrefix + address, TEST_MESSAGE_COUNT);
+
+      server.addressQuery(SimpleString.of(address));
+
+      final String subscriptionQueueName = 
server.bindingQuery(SimpleString.of(address)).getQueueNames().get(0).toString();
+      assertNotNull(subscriptionQueueName);
+      final org.apache.activemq.artemis.core.server.Queue subscriptionQueue = 
server.locateQueue(subscriptionQueueName);
+      Wait.assertEquals((long) TEST_MESSAGE_COUNT, () -> 
subscriptionQueue.getMessageCount(), 2000, 50);
+
+      // Consume from the durable subscription with messages added.
+      new Consumer()
+         .setSubscriptionName(subscriptionName)
+         .setReceiveTimeout(100)
+         .setBreakOnNull(true)
+         .setDurable(true)
+         .setMessageCount(TEST_MESSAGE_COUNT)
+         .setDestination(addressPrefix + address)
+         .setClientID(clientID)
+         .setUser(credentials)
+         .setPassword(credentials)
+         .execute(context);
+
+      Wait.assertTrue(() -> context.getStdout().contains("subscription name"), 
2000, 100);
+      Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 2000, 
50);
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to