This is an automated email from the ASF dual-hosted git repository.
Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 44cc0133a75 [improve][cli] Add client side looping in "pulsar-admin
topics analyze-backlog" cli to avoid potential HTTP call timeout (#25126)
44cc0133a75 is described below
commit 44cc0133a75cd91688cfa0ac25cd1fbd6c950884
Author: Oneby Wang <[email protected]>
AuthorDate: Mon Apr 27 20:10:37 2026 +0800
[improve][cli] Add client side looping in "pulsar-admin topics
analyze-backlog" cli to avoid potential HTTP call timeout (#25126)
---
.../pulsar/admin/cli/PulsarAdminToolTest.java | 35 +++++
.../org/apache/pulsar/admin/cli/CliCommand.java | 16 ++-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 41 +++++-
.../integration/cli/topic/AnalyzeBacklogTest.java | 151 +++++++++++++++++++++
.../integration/src/test/resources/pulsar-cli.xml | 1 +
5 files changed, 237 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a06f5d44665..bee1527b955 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -120,6 +120,7 @@ import
org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
@@ -2317,6 +2318,40 @@ public class PulsarAdminToolTest {
verify(mockNonPersistentTopics).getListInBundle("myprop/ns1",
"0x23d70a30_0x26666658");
}
+ @Test
+ public void topicsAnalyzeBacklogParameterParsing() throws Exception {
+ PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+ Topics mockTopics = mock(Topics.class);
+ when(admin.topics()).thenReturn(mockTopics);
+
+ AnalyzeSubscriptionBacklogResult backlogResult = new
AnalyzeSubscriptionBacklogResult();
+ doReturn(backlogResult).when(mockTopics)
+ .analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"),
eq("sub1"), Mockito.any());
+ doReturn(backlogResult).when(mockTopics)
+ .analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"),
eq("sub1"), Mockito.any(),
+ Mockito.any());
+
+ CmdTopics cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("analyze-backlog persistent://myprop/ns1/ds1 -s
sub1 --position 1:1"));
+
verify(mockTopics).analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"),
eq("sub1"),
+ eq(Optional.of(new MessageIdImpl(1, 1, -1))));
+
+ cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("analyze-backlog persistent://myprop/ns1/ds1 -s
sub1 -b 100 --plain --quiet"));
+
verify(mockTopics).analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"),
eq("sub1"),
+ eq(Optional.empty()), Mockito.any());
+ }
+
+ @Test
+ public void topicsAnalyzeBacklogRejectsNonPositiveBacklogScanMaxEntries() {
+ PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+ Topics mockTopics = mock(Topics.class);
+ when(admin.topics()).thenReturn(mockTopics);
+
+ CmdTopics cmdTopics = new CmdTopics(() -> admin);
+ assertFalse(cmdTopics.run(split("analyze-backlog
persistent://myprop/ns1/ds1 -s sub1 -b 0")));
+ }
+
@Test
public void bookies() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index 41593eb9e23..be314386349 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -110,11 +110,17 @@ public abstract class CliCommand implements
Callable<Integer> {
}
<T> void print(T item) {
+ print(item, true);
+ }
+
+ <T> void print(T item, boolean prettyPrint) {
try {
if (item instanceof String) {
commandSpec.commandLine().getOut().println(item);
- } else {
+ } else if (prettyPrint) {
prettyPrint(item);
+ } else {
+ plainPrint(item);
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -129,6 +135,14 @@ public abstract class CliCommand implements
Callable<Integer> {
}
}
+ <T> void plainPrint(T item) {
+ try {
+
commandSpec.commandLine().getOut().println(MAPPER.writeValueAsString(item));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private static final ObjectMapper MAPPER = ObjectMapperFactory.create();
private static final ObjectWriter WRITER =
MAPPER.writerWithDefaultPrettyPrinter();
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index d0bdb4ddaea..8dc7da0febe 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -83,6 +83,7 @@ import
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import picocli.CommandLine.Command;
@@ -3001,24 +3002,52 @@ public class CmdTopics extends CmdBase {
@Parameters(description = "persistent://tenant/namespace/topic", arity
= "1")
private String topicName;
- @Option(names = { "-s", "--subscription" }, description =
"Subscription to be analyzed", required = true)
+ @Option(names = {"-s", "--subscription"}, description = "Subscription
to be analyzed", required = true)
private String subName;
- @Option(names = { "--position",
- "-p" }, description = "message position to start the scan from
(ledgerId:entryId)", required = false)
+ @Option(names = {"--position",
+ "-p"}, description = "Message position to start the scan from
(ledgerId:entryId)", required = false)
private String messagePosition;
+ @Option(names = {"--backlog-scan-max-entries",
+ "-b"}, description = "The maximum number of backlog entries
the client will scan before terminating "
+ + "its loop", required = false)
+ private Long backlogScanMaxEntries;
+
+ @Option(names = {"--quiet", "-q"}, description = "Disable
analyze-backlog progress reporting", required = false)
+ private boolean quiet = false;
+
+ @Option(names = {"--plain"}, description = "Plain(Non-pretty) print
backlog results as NDJSON",
+ required = false)
+ private boolean plainPrint = false;
+
@Override
- void run() throws PulsarAdminException {
+ void run() throws Exception {
String persistentTopic = validatePersistentTopic(topicName);
Optional<MessageId> startPosition = Optional.empty();
+ int partitionIndex =
TopicName.get(persistentTopic).getPartitionIndex();
if (isNotBlank(messagePosition)) {
- int partitionIndex =
TopicName.get(persistentTopic).getPartitionIndex();
MessageId messageId = validateMessageIdString(messagePosition,
partitionIndex);
startPosition = Optional.of(messageId);
}
- print(getTopics().analyzeSubscriptionBacklog(persistentTopic,
subName, startPosition));
+ AnalyzeSubscriptionBacklogResult backlogResult;
+ if (backlogScanMaxEntries == null) {
+ backlogResult =
getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition);
+ } else {
+ if (backlogScanMaxEntries <= 0) {
+ throw new ParameterException("--backlog-scan-max-entries
must be greater than 0");
+ }
+ backlogResult =
getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition,
+ result -> {
+ boolean terminate = result.getEntries() >=
backlogScanMaxEntries;
+ if (!quiet && !terminate) {
+ print(result, !plainPrint);
+ }
+ return terminate;
+ });
+ }
+ print(backlogResult, !plainPrint);
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topic/AnalyzeBacklogTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topic/AnalyzeBacklogTest.java
new file mode 100644
index 00000000000..55220a6355e
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topic/AnalyzeBacklogTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.cli.topic;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.testng.annotations.Test;
+
+public class AnalyzeBacklogTest extends PulsarTestSuite {
+
+ private static final String PREFIX = "PULSAR_PREFIX_";
+ private static final String ANALYZE_BACKLOG_TOPIC_NAME =
"public/default/analyze-backlog-topic";
+ private static final String ANALYZE_BACKLOG_SUBSCRIPTION_NAME = "sub1";
+ private static final int SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES = 10;
+ private static final String LINE_SEPARATOR_REGEX = "\\r?\\n";
+ private static final String TOPICS_CMD = "topics";
+
+ @Override
+ public void setupCluster() throws Exception {
+ brokerEnvs.put(PREFIX + "subscriptionBacklogScanMaxEntries",
+ String.valueOf(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES));
+ super.setupCluster();
+ }
+
+ @Test
+ public void testAnalyzeBacklogUsingDefaultConfig() throws Exception {
+ prepareSubscriptionBacklog(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES + 1);
+
+ ContainerExecResult result =
+ pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD,
"analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
+ "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME);
+
+ String stdout = result.getStdout();
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ jsonMapper().readValue(stdout,
AnalyzeSubscriptionBacklogResult.class);
+ assertEquals(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES,
backlogResult.getEntries());
+
+ String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
+ assertTrue(lines.length > 1);
+ }
+
+ @Test
+ public void testAnalyzeBacklogUsingPlainPrint() throws Exception {
+ prepareSubscriptionBacklog(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES + 1);
+
+ ContainerExecResult result =
+ pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD,
"analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
+ "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "--plain");
+
+ String stdout = result.getStdout();
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ jsonMapper().readValue(stdout,
AnalyzeSubscriptionBacklogResult.class);
+ assertEquals(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES,
backlogResult.getEntries());
+
+ String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
+ assertEquals(1, lines.length);
+ }
+
+ @Test
+ public void testAnalyzeBacklogClientSideLoopUsingPlainPrint() throws
Exception {
+ int backlogNum = 50;
+ prepareSubscriptionBacklog(backlogNum);
+
+ int backlogScanMaxEntries = 40;
+ ContainerExecResult result =
+ pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD,
"analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
+ "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "-b",
String.valueOf(backlogScanMaxEntries),
+ "--plain");
+
+ int expectedResultLines = 4;
+ String stdout = result.getStdout();
+ String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
+ assertEquals(expectedResultLines, lines.length);
+
+ for (int i = 0; i < expectedResultLines; i++) {
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ jsonMapper().readValue(lines[i],
AnalyzeSubscriptionBacklogResult.class);
+ assertEquals((long) SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES * (i +
1), backlogResult.getEntries());
+ }
+ }
+
+ @Test
+ public void testAnalyzeBacklogClientSideLoopUsingQuietPlainPrint() throws
Exception {
+ int backlogNum = 50;
+ prepareSubscriptionBacklog(backlogNum);
+
+ int backlogScanMaxEntries = 35;
+ ContainerExecResult result =
+ pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD,
"analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
+ "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "-b",
String.valueOf(backlogScanMaxEntries), "-q",
+ "--plain");
+
+ String stdout = result.getStdout();
+ String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
+ assertEquals(1, lines.length);
+
+ int expectedEntries = 40;
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ jsonMapper().readValue(stdout,
AnalyzeSubscriptionBacklogResult.class);
+ assertEquals(expectedEntries, backlogResult.getEntries());
+ }
+
+ private void prepareSubscriptionBacklog(int backlogNum) throws Exception {
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ @Cleanup
+ Producer<byte[]> producer =
+
client.newProducer().topic(ANALYZE_BACKLOG_TOPIC_NAME).enableBatching(false).create();
+ @Cleanup
+ Consumer<byte[]> consumer =
client.newConsumer().topic(ANALYZE_BACKLOG_TOPIC_NAME)
+
.subscriptionName(ANALYZE_BACKLOG_SUBSCRIPTION_NAME).subscribe();
+
+ List<CompletableFuture<MessageId>> futures = new
ArrayList<>(backlogNum);
+ for (int i = 0; i < backlogNum; i++) {
+ byte[] msgBytes = ("test" + i).getBytes(StandardCharsets.UTF_8);
+ CompletableFuture<MessageId> future = producer.sendAsync(msgBytes);
+ futures.add(future);
+ }
+ FutureUtil.waitForAll(futures).get();
+ }
+
+}
diff --git a/tests/integration/src/test/resources/pulsar-cli.xml
b/tests/integration/src/test/resources/pulsar-cli.xml
index af55aca8a00..e32c4649d24 100644
--- a/tests/integration/src/test/resources/pulsar-cli.xml
+++ b/tests/integration/src/test/resources/pulsar-cli.xml
@@ -34,6 +34,7 @@
<class
name="org.apache.pulsar.tests.integration.cli.tenant.TenantTest"/>
<class
name="org.apache.pulsar.tests.integration.cli.PerfToolTest"/>
<class
name="org.apache.pulsar.tests.integration.cli.topicpolicies.SchemaCompatibilityStrategyTest"/>
+ <class
name="org.apache.pulsar.tests.integration.cli.topic.AnalyzeBacklogTest"/>
</classes>
</test>
</suite>