This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 0a10ed399e8 [improve][admin] Add counter for marker messages in
PersistentTopics.analyzeSubscriptionBacklog() rest api (#25091)
0a10ed399e8 is described below
commit 0a10ed399e83e162a886501f071a4509d9ec6711
Author: Oneby Wang <[email protected]>
AuthorDate: Fri Jan 2 21:27:55 2026 +0800
[improve][admin] Add counter for marker messages in
PersistentTopics.analyzeSubscriptionBacklog() rest api (#25091)
---
.../broker/admin/impl/PersistentTopicsBase.java | 1 +
.../broker/service/AnalyzeBacklogResult.java | 1 +
.../service/persistent/PersistentSubscription.java | 6 +++
.../admin/AnalyzeBacklogSubscriptionTest.java | 1 +
.../broker/admin/v3/AdminApiTransactionTest.java | 59 ++++++++++++++++++++++
.../broker/service/ReplicatedSubscriptionTest.java | 8 +++
.../broker/service/plugin/FilterEntryTest.java | 11 ++--
.../stats/AnalyzeSubscriptionBacklogResult.java | 1 +
8 files changed, 83 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 85387ccc267..be40132cfbf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1693,6 +1693,7 @@ public class PersistentTopicsBase extends AdminResource {
result.setEntries(rawResult.getEntries());
result.setMessages(rawResult.getMessages());
+
result.setMarkerMessages(rawResult.getMarkerMessages());
result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries());
result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
index e227acf4e8f..b9c279f97ce 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
@@ -29,6 +29,7 @@ public final class AnalyzeBacklogResult {
private long entries;
private long messages;
+ private long markerMessages;
private long filterRejectedEntries;
private long filterAcceptedEntries;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 457bae5e69c..786b902b3ba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -635,6 +635,7 @@ public class PersistentSubscription extends
AbstractSubscription {
AtomicLong rejected = new AtomicLong();
AtomicLong rescheduled = new AtomicLong();
AtomicLong messages = new AtomicLong();
+ AtomicLong markerMessages = new AtomicLong();
AtomicLong acceptedMessages = new AtomicLong();
AtomicLong rejectedMessages = new AtomicLong();
AtomicLong rescheduledMessages = new AtomicLong();
@@ -667,6 +668,10 @@ public class PersistentSubscription extends
AbstractSubscription {
} else {
messageMetadata =
Commands.peekMessageMetadata(metadataAndPayload, "", -1);
}
+ if (messageMetadata.hasMarkerType()) {
+ markerMessages.incrementAndGet();
+ return true;
+ }
int numMessages = 1;
if (messageMetadata.hasNumMessagesInBatch()) {
numMessages = messageMetadata.getNumMessagesInBatch();
@@ -716,6 +721,7 @@ public class PersistentSubscription extends
AbstractSubscription {
result.setLastPosition(lastPosition.get());
result.setEntries(entries.get());
result.setMessages(messages.get());
+ result.setMarkerMessages(markerMessages.get());
result.setFilterAcceptedEntries(accepted.get());
result.setFilterAcceptedMessages(acceptedMessages.get());
result.setFilterRejectedEntries(rejected.get());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index dbc632c6cf3..acea9132049 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -161,6 +161,7 @@ public class AnalyzeBacklogSubscriptionTest extends
ProducerConsumerBase {
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);
assertEquals(analyzeSubscriptionBacklogResult.getMessages(),
numMessages);
+ assertEquals(analyzeSubscriptionBacklogResult.getMarkerMessages(), 0);
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(),
numMessages);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index b3c4b804286..11ef18b8fd0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -31,6 +31,7 @@ import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -79,6 +80,7 @@ import
org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import
org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
import
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
@@ -1053,6 +1055,63 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws
Exception {
+ initTransaction(1);
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/analyze-subscription-backlog");
+ String transactionSubName = "analyze-subscription-backlog-topic-sub";
+
+ // Init subscription and then close the consumer. If consumer is
connected and has available permits,
+ // AbstractBaseDispatcher#filterEntriesForConsumer will auto ack
marker messages
+
pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(transactionSubName).subscribe().close();
+ @Cleanup Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+ int numMessages = 10;
+ List<MessageId> committedMsgIds = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ Transaction txn = pulsarClient.newTransaction().build().get();
+ MessageId messageId =
producer.newMessage(txn).value("commited-msg" + i).send();
+ committedMsgIds.add(messageId);
+ txn.commit().get();
+ }
+
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic,
transactionSubName, Optional.empty());
+ assertEquals(backlogResult.getMessages(), numMessages);
+ assertEquals(backlogResult.getMarkerMessages(), numMessages);
+
+ MessageId committedMiddleMsgId = committedMsgIds.get(numMessages / 2);
+ backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic,
transactionSubName, Optional.of(committedMiddleMsgId));
+ assertEquals(backlogResult.getMessages(), numMessages / 2);
+ assertEquals(backlogResult.getMarkerMessages(), numMessages / 2);
+
+ List<MessageId> abortedMsgIds = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ Transaction txn = pulsarClient.newTransaction().build().get();
+ MessageId messageId = producer.newMessage(txn).value("aborted-msg"
+ i).send();
+ abortedMsgIds.add(messageId);
+ txn.abort();
+ }
+ backlogResult = admin.topics().analyzeSubscriptionBacklog(topic,
transactionSubName, Optional.empty());
+ assertEquals(backlogResult.getMessages(), numMessages * 2);
+ assertEquals(backlogResult.getMarkerMessages(), numMessages * 2);
+
+ MessageId abortedMiddleMsgId = abortedMsgIds.get(numMessages / 2);
+ backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic,
transactionSubName, Optional.of(abortedMiddleMsgId));
+ assertEquals(backlogResult.getMessages(), numMessages / 2);
+ assertEquals(backlogResult.getMarkerMessages(), numMessages / 2);
+
+ Transaction txn = pulsarClient.newTransaction().build().get();
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage(txn).value("uncommitted-msg-" + i).send();
+ }
+ backlogResult = admin.topics().analyzeSubscriptionBacklog(topic,
transactionSubName, Optional.empty());
+ assertEquals(backlogResult.getMessages(), numMessages * 3);
+ assertEquals(backlogResult.getMarkerMessages(), numMessages * 2);
+ }
+
private static void verifyCoordinatorStats(String state,
long sequenceId, long
lowWaterMark) {
assertEquals(state, "Ready");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
index 74dbc5e5291..c538207584d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
@@ -39,6 +39,7 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -70,6 +71,7 @@ import
org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1090,6 +1092,12 @@ public class ReplicatedSubscriptionTest extends
ReplicatorTestBase {
}
Assert.assertEquals(numSnapshotRequest, 1);
+ // Assert analyze backlog total messages and marker messages.
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ admin4.topics().analyzeSubscriptionBacklog(topicName,
subscriptionName, Optional.empty());
+ assertEquals(backlogResult.getMessages(), numMessages);
+ assertEquals(backlogResult.getMarkerMessages(), numSnapshotRequest);
+
// Wait pending snapshot timeout
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds() * 1000);
numSnapshotRequest = 0;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 89b409ae581..a70c2f3cf8d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -253,7 +253,7 @@ public class FilterEntryTest extends BrokerTestBase {
producer.send("test");
}
- verifyBacklog(topic, subName, 10, 10, 10, 10, 0, 0, 0, 0);
+ verifyBacklog(topic, subName, 10, 10, 0, 10, 10, 0, 0, 0, 0);
int counter = 0;
while (true) {
@@ -268,7 +268,7 @@ public class FilterEntryTest extends BrokerTestBase {
// All normal messages can be received
assertEquals(10, counter);
- verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0);
+ verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0);
// stop the consumer
consumer.close();
@@ -280,7 +280,7 @@ public class FilterEntryTest extends BrokerTestBase {
// analyze the subscription and predict that
// 10 messages will be rejected by the filter
- verifyBacklog(topic, subName, 10, 10, 0, 0, 10, 10, 0, 0);
+ verifyBacklog(topic, subName, 10, 10, 0, 0, 0, 10, 10, 0, 0);
consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
@@ -304,7 +304,7 @@ public class FilterEntryTest extends BrokerTestBase {
// now the Filter acknoledged the messages on behalf of the Consumer
// backlog is now zero again
- verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0);
+ verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0);
// All messages should be acked, check the MarkDeletedPosition
assertNotNull(lastMsgId);
@@ -545,7 +545,7 @@ public class FilterEntryTest extends BrokerTestBase {
private void verifyBacklog(String topic, String subscription,
- int numEntries, int numMessages,
+ int numEntries, int numMessages, int
numMarkerMessages,
int numEntriesAccepted, int numMessagesAccepted,
int numEntriesRejected, int numMessagesRejected,
int numEntriesRescheduled, int
numMessagesRescheduled
@@ -559,6 +559,7 @@ public class FilterEntryTest extends BrokerTestBase {
Assert.assertEquals(numEntriesRescheduled,
a1.getFilterRescheduledEntries());
Assert.assertEquals(numMessages, a1.getMessages());
+ Assert.assertEquals(numMarkerMessages, a1.getMarkerMessages());
Assert.assertEquals(numMessagesAccepted,
a1.getFilterAcceptedMessages());
Assert.assertEquals(numMessagesRejected,
a1.getFilterRejectedMessages());
Assert.assertEquals(numMessagesRescheduled,
a1.getFilterRescheduledMessages());
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
index 059026b80c5..622d91e4e98 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
@@ -26,6 +26,7 @@ import lombok.ToString;
public class AnalyzeSubscriptionBacklogResult {
private long entries;
private long messages;
+ private long markerMessages;
private long filterRejectedEntries;
private long filterAcceptedEntries;