This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d5e1d7e06dd [improve][broker] Cache last publish timestamp for idle
topics to reduce storage reads (#24825)
d5e1d7e06dd is described below
commit d5e1d7e06dd27c15c549e80d30b86da73bad4197
Author: Penghui Li <[email protected]>
AuthorDate: Wed Oct 8 20:10:30 2025 -0700
[improve][broker] Cache last publish timestamp for idle topics to reduce
storage reads (#24825)
---
.../broker/service/persistent/PersistentTopic.java | 41 ++++++++---
.../apache/pulsar/broker/admin/AdminApi2Test.java | 6 ++
.../pulsar/broker/service/PersistentTopicTest.java | 79 ++++++++++++++++++++++
3 files changed, 118 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 92a93a6b974..58ca93a1b3c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -323,6 +323,12 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// The last position that can be dispatched to consumers
private volatile Position lastDispatchablePosition;
+ // Cache the latest publish timestamp for idle topics to avoid repeated
storage reads.
+ // This cache is only used when the ledger doesn't have the timestamp
(ledgerLastAddTime <= 0),
+ // which typically happens for topics with no recent write activity.
+ // For active topics, the ledger provides the timestamp directly, so this
cache is cleared.
+ private volatile long cachedLastPublishTimestamp;
+
/***
* We use 3 futures to prevent a new closing if there is an in-progress
deletion or closing. We make Pulsar return
* the in-progress one when it is called the second time.
@@ -2857,21 +2863,33 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
? null
: oldestPositionInfo.getCursorName();
- // Set the last publish timestamp using a hybrid approach:
- // 1. First try ledger.getLastAddEntryTime() if available
- // 2. If needed, read the last message to get actual publish time
+ // Set the last publish timestamp using a hybrid approach to minimize
storage reads:
+ // 1. If ledger has the timestamp (ledgerLastAddTime > 0), use it
directly - this means
+ // the topic has recent write activity and the ledger's cached
value is fresh
+ // 2. If ledger doesn't have it (topic is idle), try to use our cached
value first
+ // 3. If no cache available, read from storage and cache the result
for future calls
long ledgerLastAddTime = ledger.getLastAddEntryTime();
CompletableFuture<Long> lastPublishTimeFuture;
+ boolean usedLedgerValue = false;
if (ledgerLastAddTime > 0) {
- // Use ledger's last add time as a good approximation
+ // Topic has recent activity - use ledger's value and clear the
cache since
+ // we don't need it for active topics (ledger will keep providing
the value)
+ cachedLastPublishTimestamp = 0;
stats.lastPublishTimeStamp = ledgerLastAddTime;
lastPublishTimeFuture =
CompletableFuture.completedFuture(ledgerLastAddTime);
+ usedLedgerValue = true;
} else {
- // Fallback to reading the last message to get actual publish time
- stats.lastPublishTimeStamp = 0; // Will be updated below if we can
read the message
- lastPublishTimeFuture = getLastMessagePublishTime();
+ // Topic is idle - try to use cached value to avoid storage read
+ if (cachedLastPublishTimestamp > 0) {
+ lastPublishTimeFuture =
CompletableFuture.completedFuture(cachedLastPublishTimestamp);
+ } else {
+ // No cache available - read from storage and cache the result
+ stats.lastPublishTimeStamp = 0; // Will be updated below if we
can read the message
+ lastPublishTimeFuture = getLastMessagePublishTime();
+ }
}
+ boolean finalUsedLedgerValue = usedLedgerValue;
// Set the topic creation timestamp - get it directly since it's
synchronous
stats.topicCreationTimeStamp = getTopicCreationTimeStamp();
@@ -2893,7 +2911,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// Combine all async operations: last publish time and subscription
stats
CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(
- lastPublishTimeFuture.thenAccept(time ->
stats.lastPublishTimeStamp = time)
+ lastPublishTimeFuture.thenAccept(time -> {
+ // Only update the cache if we didn't use the ledger value
+ // (i.e., we used the cached value or read from storage)
+ if (!finalUsedLedgerValue) {
+ cachedLastPublishTimestamp = time;
+ }
+ stats.lastPublishTimeStamp = time;
+ })
);
return combinedFutures.thenCompose(ignore ->
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index db1f679404a..f889f1375a2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -4185,6 +4185,12 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
admin.topics().truncate(topic);
admin.topics().truncate(partitionedTopic);
+ assertTrue(statsAfterReload.getLastPublishTimeStamp() > 0);
+ assertTrue(partitionedStatsAfterReload.getLastPublishTimeStamp() > 0);
+
+ admin.topics().unload(topic);
+ admin.topics().unload(partitionedTopic);
+
statsAfterReload = admin.topics().getStats(topic);
partitionedStatsAfterReload =
admin.topics().getPartitionedStats(partitionedTopic, true);
assertTrue(statsAfterReload.getTopicCreationTimeStamp() > 0);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 3741c4093fe..d4306a11859 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -2308,4 +2308,83 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testLastPublishTimestampCaching() throws Exception {
+ // This test verifies that the lastPublishTimestamp is cached for idle
topics
+ // to avoid repeated storage reads when stats are polled frequently.
+
+ // Mock broker ID which is required for stats
+
doReturn("test-broker-id").when(pulsarTestContext.getPulsarService()).getBrokerId();
+
+ // Setup: Create topic with mocked ledger
+ PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ topic.initialize().join();
+
+ // Mock ledger to return 0 for getLastAddEntryTime (simulating idle
topic)
+ when(ledgerMock.getLastAddEntryTime()).thenReturn(0L);
+
+ // Mock a timestamp that will be returned when reading from storage
+ long timestampFromStorage = 1000000L;
+
+ // Mock the last confirmed entry position
+ Position lastPosition = PositionFactory.create(1, 0);
+ when(ledgerMock.getLastConfirmedEntry()).thenReturn(lastPosition);
+ when(ledgerMock.getLedgersInfo()).thenReturn(new
java.util.TreeMap<>(Map.of(1L,
+
mock(org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo.class))));
+
+ // Mock the last entry to return a timestamp
+ Entry entryMock = mock(Entry.class);
+ MessageMetadata metadata = new MessageMetadata();
+ metadata.setPublishTime(timestampFromStorage);
+ when(entryMock.getMessageMetadata()).thenReturn(metadata);
+
+ // Mock asyncReadEntry to invoke callback with the mocked entry
+ doAnswer(invocation -> {
+ AsyncCallbacks.ReadEntryCallback callback =
invocation.getArgument(1);
+ callback.readEntryComplete(entryMock, null);
+ return null;
+ }).when(ledgerMock).asyncReadEntry(any(Position.class),
any(AsyncCallbacks.ReadEntryCallback.class), any());
+
+ GetStatsOptions options = new GetStatsOptions(false, false, false,
false, false);
+
+ // First call: Should read from storage since cache is empty and
ledger has no value
+ var stats1 = topic.asyncGetStats(options).get();
+ assertEquals(stats1.lastPublishTimeStamp, timestampFromStorage);
+
+ // Verify asyncReadEntry was called to read from storage
+ verify(ledgerMock, times(1)).asyncReadEntry(any(Position.class),
+ any(AsyncCallbacks.ReadEntryCallback.class), any());
+
+ // Second call: Should use cached value, not read from storage again
+ var stats2 = topic.asyncGetStats(options).get();
+ assertEquals(stats2.lastPublishTimeStamp, timestampFromStorage);
+
+ // Verify asyncReadEntry was NOT called again (cache is working)
+ verify(ledgerMock, times(1)).asyncReadEntry(any(Position.class),
+ any(AsyncCallbacks.ReadEntryCallback.class), any()); // Still
only 1 call
+
+ // Simulate topic becoming active: ledger now has a new timestamp
+ long newTimestamp = 2000000L;
+ when(ledgerMock.getLastAddEntryTime()).thenReturn(newTimestamp);
+
+ // Third call: Should use ledger's value and clear the cache
+ var stats3 = topic.asyncGetStats(options).get();
+ assertEquals(stats3.lastPublishTimeStamp, newTimestamp);
+
+ // Verify asyncReadEntry was still NOT called (used ledger value)
+ verify(ledgerMock, times(1)).asyncReadEntry(any(Position.class),
+ any(AsyncCallbacks.ReadEntryCallback.class), any()); // Still
only 1 call
+
+ // Simulate topic becoming idle again: ledger no longer has the
timestamp
+ when(ledgerMock.getLastAddEntryTime()).thenReturn(0L);
+
+ // Fourth call: Cache was cleared when topic was active, so should
read from storage again
+ var stats4 = topic.asyncGetStats(options).get();
+ assertEquals(stats4.lastPublishTimeStamp, timestampFromStorage);
+
+ // Verify asyncReadEntry was called again (cache was cleared)
+ verify(ledgerMock, times(2)).asyncReadEntry(any(Position.class),
+ any(AsyncCallbacks.ReadEntryCallback.class), any()); // Now 2
calls
+ }
+
}