This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a95a382  Cancel scheduled tasks when deleting ManagedLedgerImpl 
(#12565)
a95a382 is described below

commit a95a3824e0b1c207ea844fbce724734764f0be62
Author: Masahiro Sakamoto <massa...@yahoo-corp.jp>
AuthorDate: Tue Nov 2 02:13:47 2021 +0900

    Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 20 +++++++------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 33 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 8 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0bc88c5..10ccde2 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1324,14 +1324,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         factory.close(this);
         STATE_UPDATER.set(this, State.Closed);
-
-        if (this.timeoutTask != null) {
-            this.timeoutTask.cancel(false);
-        }
-
-        if (this.checkLedgerRollTask != null) {
-            this.checkLedgerRollTask.cancel(false);
-        }
+        cancelScheduledTasks();
 
         LedgerHandle lh = currentLedger;
 
@@ -2606,6 +2599,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         // Delete the managed ledger without closing, since we are not 
interested in gracefully closing cursors and
         // ledgers
         STATE_UPDATER.set(this, State.Fenced);
+        cancelScheduledTasks();
 
         List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
         if (cursors.isEmpty()) {
@@ -4003,4 +3997,14 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    private void cancelScheduledTasks() {
+        if (this.timeoutTask != null) {
+            this.timeoutTask.cancel(false);
+        }
+
+        if (this.checkLedgerRollTask != null) {
+            this.checkLedgerRollTask.cancel(false);
+        }
+    }
+
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4358c2f..d10fcdd 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -61,6 +61,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -3360,4 +3361,36 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         managedLedgerB.close();
 
     }
+
+    @Test
+    public void testCancellationOfScheduledTasks() throws Exception {
+        Field timeoutTaskField = 
ManagedLedgerImpl.class.getDeclaredField("timeoutTask");
+        timeoutTaskField.setAccessible(true);
+        Field checkLedgerRollTaskField = 
ManagedLedgerImpl.class.getDeclaredField("checkLedgerRollTask");
+        checkLedgerRollTaskField.setAccessible(true);
+
+        ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) 
factory.open("my_test_ledger_1");
+        ledger1.addEntry("dummy-entry-1".getBytes(Encoding));
+        ScheduledFuture<?> timeoutTask1 = (ScheduledFuture<?>) 
timeoutTaskField.get(ledger1);
+        assertNotNull(timeoutTask1);
+        assertFalse(timeoutTask1.isDone());
+        ScheduledFuture<?> checkLedgerRollTask1 = (ScheduledFuture<?>) 
checkLedgerRollTaskField.get(ledger1);
+        assertNotNull(checkLedgerRollTask1);
+        assertFalse(checkLedgerRollTask1.isDone());
+        ledger1.close();
+        assertTrue(timeoutTask1.isCancelled());
+        assertTrue(checkLedgerRollTask1.isCancelled());
+
+        ManagedLedgerImpl ledger2 = (ManagedLedgerImpl) 
factory.open("my_test_ledger_2");
+        ledger2.addEntry("dummy-entry-2".getBytes(Encoding));
+        ScheduledFuture<?> timeoutTask2 = (ScheduledFuture<?>) 
timeoutTaskField.get(ledger2);
+        assertNotNull(timeoutTask2);
+        assertFalse(timeoutTask2.isDone());
+        ScheduledFuture<?> checkLedgerRollTask2 = (ScheduledFuture<?>) 
checkLedgerRollTaskField.get(ledger2);
+        assertNotNull(checkLedgerRollTask2);
+        assertFalse(checkLedgerRollTask2.isDone());
+        ledger2.delete();
+        assertTrue(timeoutTask2.isCancelled());
+        assertTrue(checkLedgerRollTask2.isCancelled());
+    }
 }

Reply via email to