This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a95a382 Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565) a95a382 is described below commit a95a3824e0b1c207ea844fbce724734764f0be62 Author: Masahiro Sakamoto <massa...@yahoo-corp.jp> AuthorDate: Tue Nov 2 02:13:47 2021 +0900 Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 20 +++++++------ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 33 ++++++++++++++++++++++ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 0bc88c5..10ccde2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1324,14 +1324,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { factory.close(this); STATE_UPDATER.set(this, State.Closed); - - if (this.timeoutTask != null) { - this.timeoutTask.cancel(false); - } - - if (this.checkLedgerRollTask != null) { - this.checkLedgerRollTask.cancel(false); - } + cancelScheduledTasks(); LedgerHandle lh = currentLedger; @@ -2606,6 +2599,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and // ledgers STATE_UPDATER.set(this, State.Fenced); + cancelScheduledTasks(); List<ManagedCursor> cursors = Lists.newArrayList(this.cursors); if (cursors.isEmpty()) { @@ -4003,4 +3997,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } + private void cancelScheduledTasks() { + if (this.timeoutTask != null) { + this.timeoutTask.cancel(false); + } + + if (this.checkLedgerRollTask != null) { + this.checkLedgerRollTask.cancel(false); + } + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 4358c2f..d10fcdd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -61,6 +61,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -3360,4 +3361,36 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { managedLedgerB.close(); } + + @Test + public void testCancellationOfScheduledTasks() throws Exception { + Field timeoutTaskField = ManagedLedgerImpl.class.getDeclaredField("timeoutTask"); + timeoutTaskField.setAccessible(true); + Field checkLedgerRollTaskField = ManagedLedgerImpl.class.getDeclaredField("checkLedgerRollTask"); + checkLedgerRollTaskField.setAccessible(true); + + ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory.open("my_test_ledger_1"); + ledger1.addEntry("dummy-entry-1".getBytes(Encoding)); + ScheduledFuture<?> timeoutTask1 = (ScheduledFuture<?>) timeoutTaskField.get(ledger1); + assertNotNull(timeoutTask1); + assertFalse(timeoutTask1.isDone()); + ScheduledFuture<?> checkLedgerRollTask1 = (ScheduledFuture<?>) checkLedgerRollTaskField.get(ledger1); + assertNotNull(checkLedgerRollTask1); + assertFalse(checkLedgerRollTask1.isDone()); + ledger1.close(); + assertTrue(timeoutTask1.isCancelled()); + assertTrue(checkLedgerRollTask1.isCancelled()); + + ManagedLedgerImpl ledger2 = (ManagedLedgerImpl) factory.open("my_test_ledger_2"); + ledger2.addEntry("dummy-entry-2".getBytes(Encoding)); + ScheduledFuture<?> timeoutTask2 = (ScheduledFuture<?>) timeoutTaskField.get(ledger2); + assertNotNull(timeoutTask2); + assertFalse(timeoutTask2.isDone()); + ScheduledFuture<?> checkLedgerRollTask2 = (ScheduledFuture<?>) checkLedgerRollTaskField.get(ledger2); + assertNotNull(checkLedgerRollTask2); + assertFalse(checkLedgerRollTask2.isDone()); + ledger2.delete(); + assertTrue(timeoutTask2.isCancelled()); + assertTrue(checkLedgerRollTask2.isCancelled()); + } }