This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7be22eb2b23 [improve][broker] Optimize message TTL check (#24271)
7be22eb2b23 is described below

commit 7be22eb2b23057bd5e09c361a43d6ccdcc0c8afd
Author: 道君- Tao Jiuming <dao...@apache.org>
AuthorDate: Wed May 14 12:58:43 2025 +0800

    [improve][broker] Optimize message TTL check (#24271)
---
 .../persistent/PersistentMessageExpiryMonitor.java | 35 +++++++++------------
 .../broker/service/MessageCumulativeAckTest.java   |  8 +++--
 .../service/PersistentMessageFinderTest.java       | 36 +++++++++++++---------
 .../service/TransactionMarkerDeleteTest.java       |  1 +
 .../pulsar/broker/transaction/TransactionTest.java |  2 ++
 5 files changed, 46 insertions(+), 36 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 6404e56d59b..c0d54e7eb34 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
@@ -37,7 +38,6 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.broker.service.MessageExpirer;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 import org.jspecify.annotations.Nullable;
 import org.slf4j.Logger;
@@ -52,6 +52,7 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
     private final Rate msgExpired;
     private final LongAdder totalMsgExpired;
     private final PersistentSubscription subscription;
+    private final PersistentMessageFinder finder;
 
     private static final int FALSE = 0;
     private static final int TRUE = 1;
@@ -70,6 +71,10 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
         this.subscription = subscription;
         this.msgExpired = new Rate();
         this.totalMsgExpired = new LongAdder();
+        int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis = 
topic.getBrokerService().pulsar()
+                
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis();
+        this.finder = new PersistentMessageFinder(topicName, cursor,
+                
managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis);
     }
 
     @VisibleForTesting
@@ -81,31 +86,21 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
 
     @Override
     public boolean expireMessages(int messageTTLInSeconds) {
-        if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) 
{
-            log.info("[{}][{}] Starting message expiry check, ttl= {} 
seconds", topicName, subName,
-                    messageTTLInSeconds);
-            // First filter the entire Ledger reached TTL based on the Ledger 
closing time to avoid client clock skew
-            checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds);
-            // Some part of entries in active Ledger may have reached TTL, so 
we need to continue searching.
-            
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 entry -> {
-                try {
-                    long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
-                    return MessageImpl.isEntryExpired(messageTTLInSeconds, 
entryTimestamp);
-                } catch (Exception e) {
-                    log.error("[{}][{}] Error deserializing message for expiry 
check", topicName, subName, e);
-                } finally {
-                    entry.release();
-                }
-                return false;
-            }, this, null);
-            return true;
-        } else {
+        if (!expirationCheckInProgressUpdater.compareAndSet(this, FALSE, 
TRUE)) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Ignore expire-message scheduled task, last 
check is still running", topicName,
                         subName);
             }
             return false;
         }
+        log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", 
topicName, subName,
+                messageTTLInSeconds);
+        // First filter the entire Ledger reached TTL based on the Ledger 
closing time to avoid client clock skew
+        checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds);
+        // Some part of entries in active Ledger may have reached TTL, so we 
need to continue searching.
+        long expiredMessageTimestamp = System.currentTimeMillis() - 
TimeUnit.SECONDS.toMillis(messageTTLInSeconds);
+        finder.findMessages(expiredMessageTimestamp, this);
+        return true;
     }
 
     private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int 
messageTTLInSeconds) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index cc4fe229624..31a9b7f95d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -36,9 +36,10 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import io.netty.channel.ChannelHandlerContext;
 import java.net.InetSocketAddress;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -47,6 +48,7 @@ import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.Codec;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -78,8 +80,10 @@ public class MessageCumulativeAckTest {
         var mockManagedLedger = mock(ManagedLedger.class);
         when(mockManagedLedger.getConfig()).thenReturn(new 
ManagedLedgerConfig());
         var persistentTopic = new PersistentTopic(topicName, 
mockManagedLedger, pulsarTestContext.getBrokerService());
+        ManagedCursor cursor = mock(ManagedCursor.class);
+        doReturn(Codec.encode("sub-1")).when(cursor).getName();
         sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
-            mock(ManagedCursorImpl.class), false));
+            cursor, false));
         doNothing().when(sub).acknowledgeMessage(any(), any(), any());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index f69eb1be432..d59fc5f3d2b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -60,6 +61,8 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -236,9 +239,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
                 });
         assertTrue(ex.get());
 
-        PersistentTopic mock = mock(PersistentTopic.class);
-        when(mock.getName()).thenReturn("topicname");
-        when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
+        PersistentTopic mock = mockPersistentTopic("topicname");
 
         PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
         monitor.findEntryFailed(new 
ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
@@ -421,9 +422,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         bkc.deleteLedger(ledgers.get(1).getLedgerId());
         bkc.deleteLedger(ledgers.get(2).getLedgerId());
 
-        PersistentTopic mock = mock(PersistentTopic.class);
-        when(mock.getName()).thenReturn("topicname");
-        when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
+        PersistentTopic mock = mockPersistentTopic("topicname");
 
         PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
         assertTrue(monitor.expireMessages(ttlSeconds));
@@ -459,15 +458,26 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         // The number of ledgers should be (entriesNum / MaxEntriesPerLedger) 
+ 1
         // Please refer to: https://github.com/apache/pulsar/pull/22034
         assertEquals(ledger.getLedgersInfoAsList().size(), entriesNum + 1);
-        PersistentTopic mock = mock(PersistentTopic.class);
-        when(mock.getName()).thenReturn("topicname");
-        when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
+        PersistentTopic mock = mockPersistentTopic("topicname");
         PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
         Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
         monitor.expireMessages(maxTTLSeconds);
         assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
     }
 
+    private PersistentTopic mockPersistentTopic(String topicName) throws 
Exception {
+        PersistentTopic mock = mock(PersistentTopic.class);
+        when(mock.getName()).thenReturn("topicname");
+        when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
+        BrokerService brokerService = mock(BrokerService.class);
+        doReturn(brokerService).when(mock).getBrokerService();
+        PulsarService pulsarService = mock(PulsarService.class);
+        doReturn(pulsarService).when(brokerService).pulsar();
+        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
+        doReturn(serviceConfiguration).when(pulsarService).getConfig();
+        return mock;
+    }
+
     @Test
     public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() 
throws Throwable {
         final String ledgerAndCursorName = 
"testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
@@ -482,9 +492,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
             ledger.addEntry(createMessageWrittenToLedger("msg" + i, 
incorrectPublishTimestamp));
         }
         assertEquals(ledger.getLedgersInfoAsList().size(), 2);
-        PersistentTopic mock = mock(PersistentTopic.class);
-        when(mock.getName()).thenReturn("topicname");
-        when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
+        PersistentTopic mock = mockPersistentTopic("topicname");
         PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
         AsyncCallbacks.MarkDeleteCallback markDeleteCallback =
                 (AsyncCallbacks.MarkDeleteCallback) spy(
@@ -523,9 +531,8 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
 
         PersistentSubscription subscription = 
mock(PersistentSubscription.class);
-        PersistentTopic topic = mock(PersistentTopic.class);
+        PersistentTopic topic = mockPersistentTopic("topicname");
         when(subscription.getTopic()).thenReturn(topic);
-        when(topic.getName()).thenReturn("topicname");
 
         for (int i = 0; i < totalEntries; i++) {
             positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + 
i)));
@@ -571,6 +578,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         clearInvocations(monitor);
 
         ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
+        doReturn("cursor").when(mockCursor).getName();
         PersistentMessageExpiryMonitor mockMonitor = spy(new 
PersistentMessageExpiryMonitor(topic,
                 cursor.getName(), mockCursor, subscription));
         // Not calling findEntryComplete to clear expirationCheckInProgress 
condition, so following call to
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
index fc10d315cb1..b945f5abcbc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
@@ -77,6 +77,7 @@ public class TransactionMarkerDeleteTest extends 
TransactionTestBase {
         ServiceConfiguration configuration = mock(ServiceConfiguration.class);
         doReturn(brokerService).when(topic).getBrokerService();
         doReturn(pulsarService).when(brokerService).getPulsar();
+        doReturn(pulsarService).when(brokerService).pulsar();
         doReturn(configuration).when(pulsarService).getConfig();
         doReturn(false).when(configuration).isTransactionCoordinatorEnabled();
         doReturn(managedLedger).when(topic).getManagedLedger();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 5972c1cc190..09f2b494725 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -145,6 +145,7 @@ import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
@@ -1549,6 +1550,7 @@ public class TransactionTest extends TransactionTestBase {
         when(topic.getName()).thenReturn("topic-a");
         // Mock cursor for subscription.
         ManagedCursor cursor_subscription = mock(ManagedCursor.class);
+        doReturn(Codec.encode("sub-a")).when(cursor_subscription).getName();
         doThrow(new 
RuntimeException("1")).when(cursor_subscription).updateLastActive();
         // Create subscription.
         String subscriptionName = "sub-a";

Reply via email to