This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d5e1d7e06dd [improve][broker] Cache last publish timestamp for idle 
topics to reduce storage reads (#24825)
d5e1d7e06dd is described below

commit d5e1d7e06dd27c15c549e80d30b86da73bad4197
Author: Penghui Li <[email protected]>
AuthorDate: Wed Oct 8 20:10:30 2025 -0700

    [improve][broker] Cache last publish timestamp for idle topics to reduce 
storage reads (#24825)
---
 .../broker/service/persistent/PersistentTopic.java | 41 ++++++++---
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  6 ++
 .../pulsar/broker/service/PersistentTopicTest.java | 79 ++++++++++++++++++++++
 3 files changed, 118 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 92a93a6b974..58ca93a1b3c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -323,6 +323,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     // The last position that can be dispatched to consumers
     private volatile Position lastDispatchablePosition;
 
+    // Cache the latest publish timestamp for idle topics to avoid repeated 
storage reads.
+    // This cache is only used when the ledger doesn't have the timestamp 
(ledgerLastAddTime <= 0),
+    // which typically happens for topics with no recent write activity.
+    // For active topics, the ledger provides the timestamp directly, so this 
cache is cleared.
+    private volatile long cachedLastPublishTimestamp;
+
     /***
      * We use 3 futures to prevent a new closing if there is an in-progress 
deletion or closing.  We make Pulsar return
      * the in-progress one when it is called the second time.
@@ -2857,21 +2863,33 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             ? null
             : oldestPositionInfo.getCursorName();
 
-        // Set the last publish timestamp using a hybrid approach:
-        // 1. First try ledger.getLastAddEntryTime() if available
-        // 2. If needed, read the last message to get actual publish time
+        // Set the last publish timestamp using a hybrid approach to minimize 
storage reads:
+        // 1. If ledger has the timestamp (ledgerLastAddTime > 0), use it 
directly - this means
+        //    the topic has recent write activity and the ledger's cached 
value is fresh
+        // 2. If ledger doesn't have it (topic is idle), try to use our cached 
value first
+        // 3. If no cache available, read from storage and cache the result 
for future calls
         long ledgerLastAddTime = ledger.getLastAddEntryTime();
         CompletableFuture<Long> lastPublishTimeFuture;
 
+        boolean usedLedgerValue = false;
         if (ledgerLastAddTime > 0) {
-            // Use ledger's last add time as a good approximation
+            // Topic has recent activity - use ledger's value and clear the 
cache since
+            // we don't need it for active topics (ledger will keep providing 
the value)
+            cachedLastPublishTimestamp = 0;
             stats.lastPublishTimeStamp = ledgerLastAddTime;
             lastPublishTimeFuture = 
CompletableFuture.completedFuture(ledgerLastAddTime);
+            usedLedgerValue = true;
         } else {
-            // Fallback to reading the last message to get actual publish time
-            stats.lastPublishTimeStamp = 0; // Will be updated below if we can 
read the message
-            lastPublishTimeFuture = getLastMessagePublishTime();
+            // Topic is idle - try to use cached value to avoid storage read
+            if (cachedLastPublishTimestamp > 0) {
+                lastPublishTimeFuture = 
CompletableFuture.completedFuture(cachedLastPublishTimestamp);
+            } else {
+                // No cache available - read from storage and cache the result
+                stats.lastPublishTimeStamp = 0; // Will be updated below if we 
can read the message
+                lastPublishTimeFuture = getLastMessagePublishTime();
+            }
         }
+        boolean finalUsedLedgerValue = usedLedgerValue;
 
         // Set the topic creation timestamp - get it directly since it's 
synchronous
         stats.topicCreationTimeStamp = getTopicCreationTimeStamp();
@@ -2893,7 +2911,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         // Combine all async operations: last publish time and subscription 
stats
         CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(
-            lastPublishTimeFuture.thenAccept(time -> 
stats.lastPublishTimeStamp = time)
+            lastPublishTimeFuture.thenAccept(time -> {
+                // Only update the cache if we didn't use the ledger value
+                // (i.e., we used the cached value or read from storage)
+                if (!finalUsedLedgerValue) {
+                    cachedLastPublishTimestamp = time;
+                }
+                stats.lastPublishTimeStamp = time;
+            })
         );
 
         return combinedFutures.thenCompose(ignore ->
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index db1f679404a..f889f1375a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -4185,6 +4185,12 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         admin.topics().truncate(topic);
         admin.topics().truncate(partitionedTopic);
 
+        assertTrue(statsAfterReload.getLastPublishTimeStamp() > 0);
+        assertTrue(partitionedStatsAfterReload.getLastPublishTimeStamp() > 0);
+
+        admin.topics().unload(topic);
+        admin.topics().unload(partitionedTopic);
+
         statsAfterReload = admin.topics().getStats(topic);
         partitionedStatsAfterReload = 
admin.topics().getPartitionedStats(partitionedTopic, true);
         assertTrue(statsAfterReload.getTopicCreationTimeStamp() > 0);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 3741c4093fe..d4306a11859 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -2308,4 +2308,83 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         assertTrue(latch.await(5, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testLastPublishTimestampCaching() throws Exception {
+        // This test verifies that the lastPublishTimestamp is cached for idle 
topics
+        // to avoid repeated storage reads when stats are polled frequently.
+
+        // Mock broker ID which is required for stats
+        
doReturn("test-broker-id").when(pulsarTestContext.getPulsarService()).getBrokerId();
+
+        // Setup: Create topic with mocked ledger
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        topic.initialize().join();
+
+        // Mock ledger to return 0 for getLastAddEntryTime (simulating idle 
topic)
+        when(ledgerMock.getLastAddEntryTime()).thenReturn(0L);
+
+        // Mock a timestamp that will be returned when reading from storage
+        long timestampFromStorage = 1000000L;
+
+        // Mock the last confirmed entry position
+        Position lastPosition = PositionFactory.create(1, 0);
+        when(ledgerMock.getLastConfirmedEntry()).thenReturn(lastPosition);
+        when(ledgerMock.getLedgersInfo()).thenReturn(new 
java.util.TreeMap<>(Map.of(1L,
+                
mock(org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo.class))));
+
+        // Mock the last entry to return a timestamp
+        Entry entryMock = mock(Entry.class);
+        MessageMetadata metadata = new MessageMetadata();
+        metadata.setPublishTime(timestampFromStorage);
+        when(entryMock.getMessageMetadata()).thenReturn(metadata);
+
+        // Mock asyncReadEntry to invoke callback with the mocked entry
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntryCallback callback = 
invocation.getArgument(1);
+            callback.readEntryComplete(entryMock, null);
+            return null;
+        }).when(ledgerMock).asyncReadEntry(any(Position.class), 
any(AsyncCallbacks.ReadEntryCallback.class), any());
+
+        GetStatsOptions options = new GetStatsOptions(false, false, false, 
false, false);
+
+        // First call: Should read from storage since cache is empty and 
ledger has no value
+        var stats1 = topic.asyncGetStats(options).get();
+        assertEquals(stats1.lastPublishTimeStamp, timestampFromStorage);
+
+        // Verify asyncReadEntry was called to read from storage
+        verify(ledgerMock, times(1)).asyncReadEntry(any(Position.class),
+                any(AsyncCallbacks.ReadEntryCallback.class), any());
+
+        // Second call: Should use cached value, not read from storage again
+        var stats2 = topic.asyncGetStats(options).get();
+        assertEquals(stats2.lastPublishTimeStamp, timestampFromStorage);
+
+        // Verify asyncReadEntry was NOT called again (cache is working)
+        verify(ledgerMock, times(1)).asyncReadEntry(any(Position.class),
+                any(AsyncCallbacks.ReadEntryCallback.class), any()); // Still 
only 1 call
+
+        // Simulate topic becoming active: ledger now has a new timestamp
+        long newTimestamp = 2000000L;
+        when(ledgerMock.getLastAddEntryTime()).thenReturn(newTimestamp);
+
+        // Third call: Should use ledger's value and clear the cache
+        var stats3 = topic.asyncGetStats(options).get();
+        assertEquals(stats3.lastPublishTimeStamp, newTimestamp);
+
+        // Verify asyncReadEntry was still NOT called (used ledger value)
+        verify(ledgerMock, times(1)).asyncReadEntry(any(Position.class),
+                any(AsyncCallbacks.ReadEntryCallback.class), any()); // Still 
only 1 call
+
+        // Simulate topic becoming idle again: ledger no longer has the 
timestamp
+        when(ledgerMock.getLastAddEntryTime()).thenReturn(0L);
+
+        // Fourth call: Cache was cleared when topic was active, so should 
read from storage again
+        var stats4 = topic.asyncGetStats(options).get();
+        assertEquals(stats4.lastPublishTimeStamp, timestampFromStorage);
+
+        // Verify asyncReadEntry was called again (cache was cleared)
+        verify(ledgerMock, times(2)).asyncReadEntry(any(Position.class),
+                any(AsyncCallbacks.ReadEntryCallback.class), any()); // Now 2 
calls
+    }
+
 }

Reply via email to