This is an automated email from the ASF dual-hosted git repository. daojun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7be22eb2b23 [improve][broker] Optimize message TTL check (#24271) 7be22eb2b23 is described below commit 7be22eb2b23057bd5e09c361a43d6ccdcc0c8afd Author: 道君- Tao Jiuming <dao...@apache.org> AuthorDate: Wed May 14 12:58:43 2025 +0800 [improve][broker] Optimize message TTL check (#24271) --- .../persistent/PersistentMessageExpiryMonitor.java | 35 +++++++++------------ .../broker/service/MessageCumulativeAckTest.java | 8 +++-- .../service/PersistentMessageFinderTest.java | 36 +++++++++++++--------- .../service/TransactionMarkerDeleteTest.java | 1 + .../pulsar/broker/transaction/TransactionTest.java | 2 ++ 5 files changed, 46 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 6404e56d59b..c0d54e7eb34 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Objects; import java.util.Optional; import java.util.SortedMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; @@ -37,7 +38,6 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; @@ -52,6 +52,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag private final Rate msgExpired; private final LongAdder totalMsgExpired; private final PersistentSubscription subscription; + private final PersistentMessageFinder finder; private static final int FALSE = 0; private static final int TRUE = 1; @@ -70,6 +71,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag this.subscription = subscription; this.msgExpired = new Rate(); this.totalMsgExpired = new LongAdder(); + int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis = topic.getBrokerService().pulsar() + .getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis(); + this.finder = new PersistentMessageFinder(topicName, cursor, + managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis); } @VisibleForTesting @@ -81,31 +86,21 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag @Override public boolean expireMessages(int messageTTLInSeconds) { - if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { - log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName, - messageTTLInSeconds); - // First filter the entire Ledger reached TTL based on the Ledger closing time to avoid client clock skew - checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds); - // Some part of entries in active Ledger may have reached TTL, so we need to continue searching. - cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> { - try { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); - return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp); - } catch (Exception e) { - log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e); - } finally { - entry.release(); - } - return false; - }, this, null); - return true; - } else { + if (!expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", topicName, subName); } return false; } + log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName, + messageTTLInSeconds); + // First filter the entire Ledger reached TTL based on the Ledger closing time to avoid client clock skew + checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds); + // Some part of entries in active Ledger may have reached TTL, so we need to continue searching. + long expiredMessageTimestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(messageTTLInSeconds); + finder.findMessages(expiredMessageTimestamp, this); + return true; } private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTLInSeconds) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index cc4fe229624..31a9b7f95d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -36,9 +36,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; + +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -47,6 +48,7 @@ import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.Codec; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -78,8 +80,10 @@ public class MessageCumulativeAckTest { var mockManagedLedger = mock(ManagedLedger.class); when(mockManagedLedger.getConfig()).thenReturn(new ManagedLedgerConfig()); var persistentTopic = new PersistentTopic(topicName, mockManagedLedger, pulsarTestContext.getBrokerService()); + ManagedCursor cursor = mock(ManagedCursor.class); + doReturn(Codec.encode("sub-1")).when(cursor).getName(); sub = spy(new PersistentSubscription(persistentTopic, "sub-1", - mock(ManagedCursorImpl.class), false)); + cursor, false)); doNothing().when(sub).acknowledgeMessage(any(), any(), any()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index f69eb1be432..d59fc5f3d2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -60,6 +61,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -236,9 +239,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { }); assertTrue(ex.get()); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); + PersistentTopic mock = mockPersistentTopic("topicname"); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), @@ -421,9 +422,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { bkc.deleteLedger(ledgers.get(1).getLedgerId()); bkc.deleteLedger(ledgers.get(2).getLedgerId()); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); + PersistentTopic mock = mockPersistentTopic("topicname"); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); assertTrue(monitor.expireMessages(ttlSeconds)); @@ -459,15 +458,26 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { // The number of ledgers should be (entriesNum / MaxEntriesPerLedger) + 1 // Please refer to: https://github.com/apache/pulsar/pull/22034 assertEquals(ledger.getLedgersInfoAsList().size(), entriesNum + 1); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); + PersistentTopic mock = mockPersistentTopic("topicname"); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds)); monitor.expireMessages(maxTTLSeconds); assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); } + private PersistentTopic mockPersistentTopic(String topicName) throws Exception { + PersistentTopic mock = mock(PersistentTopic.class); + when(mock.getName()).thenReturn("topicname"); + when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); + BrokerService brokerService = mock(BrokerService.class); + doReturn(brokerService).when(mock).getBrokerService(); + PulsarService pulsarService = mock(PulsarService.class); + doReturn(pulsarService).when(brokerService).pulsar(); + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + doReturn(serviceConfiguration).when(pulsarService).getConfig(); + return mock; + } + @Test public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable { final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger"; @@ -482,9 +492,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp)); } assertEquals(ledger.getLedgersInfoAsList().size(), 2); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); + PersistentTopic mock = mockPersistentTopic("topicname"); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); AsyncCallbacks.MarkDeleteCallback markDeleteCallback = (AsyncCallbacks.MarkDeleteCallback) spy( @@ -523,9 +531,8 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); PersistentSubscription subscription = mock(PersistentSubscription.class); - PersistentTopic topic = mock(PersistentTopic.class); + PersistentTopic topic = mockPersistentTopic("topicname"); when(subscription.getTopic()).thenReturn(topic); - when(topic.getName()).thenReturn("topicname"); for (int i = 0; i < totalEntries; i++) { positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i))); @@ -571,6 +578,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { clearInvocations(monitor); ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class); + doReturn("cursor").when(mockCursor).getName(); PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic, cursor.getName(), mockCursor, subscription)); // Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java index fc10d315cb1..b945f5abcbc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java @@ -77,6 +77,7 @@ public class TransactionMarkerDeleteTest extends TransactionTestBase { ServiceConfiguration configuration = mock(ServiceConfiguration.class); doReturn(brokerService).when(topic).getBrokerService(); doReturn(pulsarService).when(brokerService).getPulsar(); + doReturn(pulsarService).when(brokerService).pulsar(); doReturn(configuration).when(pulsarService).getConfig(); doReturn(false).when(configuration).isTransactionCoordinatorEnabled(); doReturn(managedLedger).when(topic).getManagedLedger(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 5972c1cc190..09f2b494725 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -145,6 +145,7 @@ import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; @@ -1549,6 +1550,7 @@ public class TransactionTest extends TransactionTestBase { when(topic.getName()).thenReturn("topic-a"); // Mock cursor for subscription. ManagedCursor cursor_subscription = mock(ManagedCursor.class); + doReturn(Codec.encode("sub-a")).when(cursor_subscription).getName(); doThrow(new RuntimeException("1")).when(cursor_subscription).updateLastActive(); // Create subscription. String subscriptionName = "sub-a";