This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a5fd55f3a039e0ab96942caccf40001a54e229c9 Author: Lari Hotari <[email protected]> AuthorDate: Tue Nov 4 13:05:08 2025 +0200 [fix][broker] Fix bug in PersistentMessageExpiryMonitor which blocked further expirations (#24941) (cherry picked from commit 6cdd110f6cc6577d3f4a5a80bee7affc504ac8c0) --- .../persistent/PersistentMessageExpiryMonitor.java | 1 + .../PersistentMessageExpiryMonitorMockTest.java | 129 +++++++++++++++++++++ 2 files changed, 130 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 98b30bb7b9f..658e1024f93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -233,6 +233,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag if (position != null) { var markDeletedPosition = cursor.getMarkDeletedPosition(); if (markDeletedPosition != null && markDeletedPosition.compareTo(position) >= 0) { + expirationCheckInProgress = FALSE; return; } log.info("[{}][{}] Expiring all messages until position {}", topicName, subName, position); diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorMockTest.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorMockTest.java new file mode 100644 index 00000000000..3b969255f3a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorMockTest.java @@ -0,0 +1,129 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Test class to demonstrate bugs in PersistentMessageExpiryMonitor using mocks. + */ +public class PersistentMessageExpiryMonitorMockTest { + + private PersistentTopic mockTopic; + private ManagedCursor mockCursor; + private ManagedLedger mockManagedLedger; + private BrokerService mockBrokerService; + + @BeforeMethod + public void setup() { + mockTopic = mock(PersistentTopic.class); + mockCursor = mock(ManagedCursor.class); + mockManagedLedger = mock(ManagedLedger.class); + mockBrokerService = mock(BrokerService.class); + + when(mockTopic.getName()).thenReturn("test-topic"); + when(mockTopic.getBrokerService()).thenReturn(mockBrokerService); + when(mockCursor.getManagedLedger()).thenReturn(mockManagedLedger); + + PulsarService mockPulsarService = mock(PulsarService.class); + ServiceConfiguration config = new ServiceConfiguration(); + when(mockBrokerService.pulsar()).thenReturn(mockPulsarService); + when(mockPulsarService.getConfig()).thenReturn(config); + } + + /** + * Ensure that mark delete short circuit resets expirationCheckInProgress flag. + */ + @Test + public void testExpireMessagesWithMarkDeleteShortCircuitResetsExpirationCheckInProgressFlag() throws Exception { + // Setup: Create a scenario where mark delete position is already ahead + Position markDeletedPosition = PositionFactory.create(2, 100); + Position positionToExpire = PositionFactory.create(2, 50); // Earlier than markDeletedPosition + + when(mockCursor.getMarkDeletedPosition()).thenReturn(markDeletedPosition); + when(mockCursor.getManagedLedger()).thenReturn(mockManagedLedger); + when(mockCursor.getName()).thenReturn("test-cursor"); + + // Mock the asyncFindNewestMatching call to return positionToExpire + doAnswer(invocation -> { + AsyncCallbacks.FindEntryCallback callback = invocation.getArgument(4); + Object ctx = invocation.getArgument(5); + callback.findEntryComplete(positionToExpire, ctx); + return null; + }).when(mockCursor).asyncFindNewestMatching( + any(ManagedCursor.FindPositionConstraint.class), + any(), // Predicate<Entry> + any(), // startPosition + any(), // endPosition + any(AsyncCallbacks.FindEntryCallback.class), + any(), // ctx + anyBoolean() // isFindFromLedger + ); + + // Setup ledger info with expired ledger + NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfo = new TreeMap<>(); + MLDataFormats.ManagedLedgerInfo.LedgerInfo expiredLedger = + MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder() + .setLedgerId(2) + .setEntries(60) + .setTimestamp(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10)) // 10 seconds old + .build(); + ledgerInfo.put(2L, expiredLedger); + + when(mockManagedLedger.getLedgersInfo()).thenReturn(ledgerInfo); + when(mockManagedLedger.getLastConfirmedEntry()).thenReturn(PositionFactory.create(2, 200)); + + PersistentMessageExpiryMonitor monitor = + new PersistentMessageExpiryMonitor(mockTopic, "test-subscription", mockCursor, null); + + // First call should return true + boolean firstCallResult = monitor.expireMessages(5); + assertTrue(firstCallResult, "First expireMessages call should return true"); + + // Second call should return true since false would be returned if expirationCheckInProgress was not reset + boolean secondCallResult = monitor.expireMessages(5); + assertTrue(secondCallResult, "Second expireMessages call should also return true"); + + // All subsequent calls will also return true + boolean thirdCallResult = monitor.expireMessages(5); + assertTrue(thirdCallResult, "Third expireMessages call should also return true"); + } +} \ No newline at end of file
