This is an automated email from the ASF dual-hosted git repository.

Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 44cc0133a75 [improve][cli] Add client side looping in "pulsar-admin 
topics analyze-backlog" cli to avoid potential HTTP call timeout (#25126)
44cc0133a75 is described below

commit 44cc0133a75cd91688cfa0ac25cd1fbd6c950884
Author: Oneby Wang <[email protected]>
AuthorDate: Mon Apr 27 20:10:37 2026 +0800

    [improve][cli] Add client side looping in "pulsar-admin topics 
analyze-backlog" cli to avoid potential HTTP call timeout (#25126)
---
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  35 +++++
 .../org/apache/pulsar/admin/cli/CliCommand.java    |  16 ++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  41 +++++-
 .../integration/cli/topic/AnalyzeBacklogTest.java  | 151 +++++++++++++++++++++
 .../integration/src/test/resources/pulsar-cli.xml  |   1 +
 5 files changed, 237 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a06f5d44665..bee1527b955 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -120,6 +120,7 @@ import 
org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
@@ -2317,6 +2318,40 @@ public class PulsarAdminToolTest {
         verify(mockNonPersistentTopics).getListInBundle("myprop/ns1", 
"0x23d70a30_0x26666658");
     }
 
+    @Test
+    public void topicsAnalyzeBacklogParameterParsing() throws Exception {
+        PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+        Topics mockTopics = mock(Topics.class);
+        when(admin.topics()).thenReturn(mockTopics);
+
+        AnalyzeSubscriptionBacklogResult backlogResult = new 
AnalyzeSubscriptionBacklogResult();
+        doReturn(backlogResult).when(mockTopics)
+                .analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"), 
eq("sub1"), Mockito.any());
+        doReturn(backlogResult).when(mockTopics)
+                .analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"), 
eq("sub1"), Mockito.any(),
+                        Mockito.any());
+
+        CmdTopics cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("analyze-backlog persistent://myprop/ns1/ds1 -s 
sub1 --position 1:1"));
+        
verify(mockTopics).analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"),
 eq("sub1"),
+                eq(Optional.of(new MessageIdImpl(1, 1, -1))));
+
+        cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("analyze-backlog persistent://myprop/ns1/ds1 -s 
sub1 -b 100 --plain --quiet"));
+        
verify(mockTopics).analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"),
 eq("sub1"),
+                eq(Optional.empty()), Mockito.any());
+    }
+
+    @Test
+    public void topicsAnalyzeBacklogRejectsNonPositiveBacklogScanMaxEntries() {
+        PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+        Topics mockTopics = mock(Topics.class);
+        when(admin.topics()).thenReturn(mockTopics);
+
+        CmdTopics cmdTopics = new CmdTopics(() -> admin);
+        assertFalse(cmdTopics.run(split("analyze-backlog 
persistent://myprop/ns1/ds1 -s sub1 -b 0")));
+    }
+
     @Test
     public void bookies() throws Exception {
         PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index 41593eb9e23..be314386349 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -110,11 +110,17 @@ public abstract class CliCommand implements 
Callable<Integer> {
     }
 
     <T> void print(T item) {
+        print(item, true);
+    }
+
+    <T> void print(T item, boolean prettyPrint) {
         try {
             if (item instanceof String) {
                 commandSpec.commandLine().getOut().println(item);
-            } else {
+            } else if (prettyPrint) {
                 prettyPrint(item);
+            } else {
+                plainPrint(item);
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -129,6 +135,14 @@ public abstract class CliCommand implements 
Callable<Integer> {
         }
     }
 
+    <T> void plainPrint(T item) {
+        try {
+            
commandSpec.commandLine().getOut().println(MAPPER.writeValueAsString(item));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private static final ObjectMapper MAPPER = ObjectMapperFactory.create();
     private static final ObjectWriter WRITER = 
MAPPER.writerWithDefaultPrettyPrinter();
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index d0bdb4ddaea..8dc7da0febe 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -83,6 +83,7 @@ import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import picocli.CommandLine.Command;
@@ -3001,24 +3002,52 @@ public class CmdTopics extends CmdBase {
         @Parameters(description = "persistent://tenant/namespace/topic", arity 
= "1")
         private String topicName;
 
-        @Option(names = { "-s", "--subscription" }, description = 
"Subscription to be analyzed", required = true)
+        @Option(names = {"-s", "--subscription"}, description = "Subscription 
to be analyzed", required = true)
         private String subName;
 
-        @Option(names = { "--position",
-                "-p" }, description = "message position to start the scan from 
(ledgerId:entryId)", required = false)
+        @Option(names = {"--position",
+                "-p"}, description = "Message position to start the scan from 
(ledgerId:entryId)", required = false)
         private String messagePosition;
 
+        @Option(names = {"--backlog-scan-max-entries",
+                "-b"}, description = "The maximum number of backlog entries 
the client will scan before terminating "
+                + "its loop", required = false)
+        private Long backlogScanMaxEntries;
+
+        @Option(names = {"--quiet", "-q"}, description = "Disable 
analyze-backlog progress reporting", required = false)
+        private boolean quiet = false;
+
+        @Option(names = {"--plain"}, description = "Plain(Non-pretty) print 
backlog results as NDJSON",
+                required = false)
+        private boolean plainPrint = false;
+
         @Override
-        void run() throws PulsarAdminException {
+        void run() throws Exception {
             String persistentTopic = validatePersistentTopic(topicName);
             Optional<MessageId> startPosition = Optional.empty();
+            int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
             if (isNotBlank(messagePosition)) {
-                int partitionIndex = 
TopicName.get(persistentTopic).getPartitionIndex();
                 MessageId messageId = validateMessageIdString(messagePosition, 
partitionIndex);
                 startPosition = Optional.of(messageId);
             }
-            print(getTopics().analyzeSubscriptionBacklog(persistentTopic, 
subName, startPosition));
 
+            AnalyzeSubscriptionBacklogResult backlogResult;
+            if (backlogScanMaxEntries == null) {
+                backlogResult = 
getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition);
+            } else {
+                if (backlogScanMaxEntries <= 0) {
+                    throw new ParameterException("--backlog-scan-max-entries 
must be greater than 0");
+                }
+                backlogResult = 
getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition,
+                        result -> {
+                            boolean terminate = result.getEntries() >= 
backlogScanMaxEntries;
+                            if (!quiet && !terminate) {
+                                print(result, !plainPrint);
+                            }
+                            return terminate;
+                        });
+            }
+            print(backlogResult, !plainPrint);
         }
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topic/AnalyzeBacklogTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topic/AnalyzeBacklogTest.java
new file mode 100644
index 00000000000..55220a6355e
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topic/AnalyzeBacklogTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.cli.topic;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.testng.annotations.Test;
+
+public class AnalyzeBacklogTest extends PulsarTestSuite {
+
+    private static final String PREFIX = "PULSAR_PREFIX_";
+    private static final String ANALYZE_BACKLOG_TOPIC_NAME = 
"public/default/analyze-backlog-topic";
+    private static final String ANALYZE_BACKLOG_SUBSCRIPTION_NAME = "sub1";
+    private static final int SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES = 10;
+    private static final String LINE_SEPARATOR_REGEX = "\\r?\\n";
+    private static final String TOPICS_CMD = "topics";
+
+    @Override
+    public void setupCluster() throws Exception {
+        brokerEnvs.put(PREFIX + "subscriptionBacklogScanMaxEntries",
+                String.valueOf(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES));
+        super.setupCluster();
+    }
+
+    @Test
+    public void testAnalyzeBacklogUsingDefaultConfig() throws Exception {
+        prepareSubscriptionBacklog(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES + 1);
+
+        ContainerExecResult result =
+                pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, 
"analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
+                        "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME);
+
+        String stdout = result.getStdout();
+        AnalyzeSubscriptionBacklogResult backlogResult =
+                jsonMapper().readValue(stdout, 
AnalyzeSubscriptionBacklogResult.class);
+        assertEquals(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES, 
backlogResult.getEntries());
+
+        String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
+        assertTrue(lines.length > 1);
+    }
+
+    @Test
+    public void testAnalyzeBacklogUsingPlainPrint() throws Exception {
+        prepareSubscriptionBacklog(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES + 1);
+
+        ContainerExecResult result =
+                pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, 
"analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
+                        "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "--plain");
+
+        String stdout = result.getStdout();
+        AnalyzeSubscriptionBacklogResult backlogResult =
+                jsonMapper().readValue(stdout, 
AnalyzeSubscriptionBacklogResult.class);
+        assertEquals(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES, 
backlogResult.getEntries());
+
+        String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
+        assertEquals(1, lines.length);
+    }
+
+    @Test
+    public void testAnalyzeBacklogClientSideLoopUsingPlainPrint() throws 
Exception {
+        int backlogNum = 50;
+        prepareSubscriptionBacklog(backlogNum);
+
+        int backlogScanMaxEntries = 40;
+        ContainerExecResult result =
+                pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, 
"analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
+                        "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "-b", 
String.valueOf(backlogScanMaxEntries),
+                        "--plain");
+
+        int expectedResultLines = 4;
+        String stdout = result.getStdout();
+        String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
+        assertEquals(expectedResultLines, lines.length);
+
+        for (int i = 0; i < expectedResultLines; i++) {
+            AnalyzeSubscriptionBacklogResult backlogResult =
+                    jsonMapper().readValue(lines[i], 
AnalyzeSubscriptionBacklogResult.class);
+            assertEquals((long) SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES * (i + 
1), backlogResult.getEntries());
+        }
+    }
+
+    @Test
+    public void testAnalyzeBacklogClientSideLoopUsingQuietPlainPrint() throws 
Exception {
+        int backlogNum = 50;
+        prepareSubscriptionBacklog(backlogNum);
+
+        int backlogScanMaxEntries = 35;
+        ContainerExecResult result =
+                pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, 
"analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
+                        "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "-b", 
String.valueOf(backlogScanMaxEntries), "-q",
+                        "--plain");
+
+        String stdout = result.getStdout();
+        String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
+        assertEquals(1, lines.length);
+
+        int expectedEntries = 40;
+        AnalyzeSubscriptionBacklogResult backlogResult =
+                jsonMapper().readValue(stdout, 
AnalyzeSubscriptionBacklogResult.class);
+        assertEquals(expectedEntries, backlogResult.getEntries());
+    }
+
+    private void prepareSubscriptionBacklog(int backlogNum) throws Exception {
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        @Cleanup
+        Producer<byte[]> producer =
+                
client.newProducer().topic(ANALYZE_BACKLOG_TOPIC_NAME).enableBatching(false).create();
+        @Cleanup
+        Consumer<byte[]> consumer = 
client.newConsumer().topic(ANALYZE_BACKLOG_TOPIC_NAME)
+                
.subscriptionName(ANALYZE_BACKLOG_SUBSCRIPTION_NAME).subscribe();
+
+        List<CompletableFuture<MessageId>> futures = new 
ArrayList<>(backlogNum);
+        for (int i = 0; i < backlogNum; i++) {
+            byte[] msgBytes = ("test" + i).getBytes(StandardCharsets.UTF_8);
+            CompletableFuture<MessageId> future = producer.sendAsync(msgBytes);
+            futures.add(future);
+        }
+        FutureUtil.waitForAll(futures).get();
+    }
+
+}
diff --git a/tests/integration/src/test/resources/pulsar-cli.xml 
b/tests/integration/src/test/resources/pulsar-cli.xml
index af55aca8a00..e32c4649d24 100644
--- a/tests/integration/src/test/resources/pulsar-cli.xml
+++ b/tests/integration/src/test/resources/pulsar-cli.xml
@@ -34,6 +34,7 @@
             <class 
name="org.apache.pulsar.tests.integration.cli.tenant.TenantTest"/>
             <class 
name="org.apache.pulsar.tests.integration.cli.PerfToolTest"/>
             <class 
name="org.apache.pulsar.tests.integration.cli.topicpolicies.SchemaCompatibilityStrategyTest"/>
+            <class 
name="org.apache.pulsar.tests.integration.cli.topic.AnalyzeBacklogTest"/>
         </classes>
     </test>
 </suite>

Reply via email to