This is an automated email from the ASF dual-hosted git repository.

kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 57cf5135a FIx PersistentTTLNode thread leak (#1264)
57cf5135a is described below

commit 57cf5135aa7cf2e5e37f3147b3d702f06b15ccfc
Author: chevaris <[email protected]>
AuthorDate: Mon Apr 21 12:24:30 2025 +0200

    FIx PersistentTTLNode thread leak (#1264)
    
    Fixes #1263.
---
 .../utils/CloseableScheduledExecutorService.java   |  24 ++++
 .../TestCloseableScheduledExecutorService.java     |  83 ++++++++++++++
 .../framework/recipes/nodes/PersistentTtlNode.java |  54 ++++++---
 .../recipes/nodes/TestPersistentTtlNode.java       | 127 ++++++++++++++++++---
 4 files changed, 260 insertions(+), 28 deletions(-)

diff --git 
a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
 
b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
index 8fbc420ee..3161b2e44 100644
--- 
a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
+++ 
b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -95,4 +95,28 @@ public class CloseableScheduledExecutorService extends 
CloseableExecutorService
                 scheduledExecutorService.scheduleWithFixedDelay(task, 
initialDelay, delay, unit);
         return new InternalScheduledFutureTask(scheduledFuture);
     }
+
+    /**
+     * Creates and executes a periodic action that becomes enabled first after 
the given initial
+     * delay, and subsequently with the given period; that is executions will 
commence after {@code
+     * initialDelay} then {@code initialDelay+period}, then {@code 
initialDelay + 2 * period}, and so
+     * on. If any execution of the task encounters an exception, subsequent 
executions are suppressed.
+     * Otherwise, the task will only terminate via cancellation or termination 
of the executor. If any
+     * execution of this task takes longer than its period, then subsequent 
executions may start late,
+     * but will not concurrently execute.
+     *
+     * @param task the task to execute
+     * @param initialDelay the time to delay first execution
+     * @param period the period between successive executions
+     * @param unit the time unit of the initialDelay and delay parameters
+     * @return a Future representing pending completion of the task, and whose 
<tt>get()</tt> method
+     *     will throw an exception upon cancellation
+     */
+    public Future<?> scheduleAtFixedRate(Runnable task, long initialDelay, 
long period, TimeUnit unit) {
+        Preconditions.checkState(isOpen.get(), "CloseableExecutorService is 
closed");
+
+        ScheduledFuture<?> scheduledFuture =
+                scheduledExecutorService.scheduleAtFixedRate(task, 
initialDelay, period, unit);
+        return new InternalScheduledFutureTask(scheduledFuture);
+    }
 }
diff --git 
a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
 
b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
index 42ad7a6bb..290635680 100644
--- 
a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
+++ 
b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
@@ -20,9 +20,12 @@
 package org.apache.curator.utils;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -110,4 +113,84 @@ public class TestCloseableScheduledExecutorService {
         assertTrue(newValue > value);
         assertEquals(innerValue, innerCounter.get());
     }
+
+    @Test
+    public void testCloseableScheduleAtFixedRate() throws InterruptedException 
{
+        CloseableScheduledExecutorService service = new 
CloseableScheduledExecutorService(executorService);
+
+        final CountDownLatch latch = new CountDownLatch(QTY);
+        final AtomicInteger fixedDelayCounter = new AtomicInteger();
+        service.scheduleAtFixedRate(
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        latch.countDown();
+                        // This delay is almost NOT impacting when using 
scheduleAtFixedRate
+                        try {
+                            Thread.sleep(DELAY_MS);
+                        } catch (InterruptedException e) {
+                            // Do nothing
+                        }
+                    }
+                },
+                DELAY_MS,
+                DELAY_MS,
+                TimeUnit.MILLISECONDS);
+        service.scheduleWithFixedDelay(
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        fixedDelayCounter.incrementAndGet();
+                        // This delay is impacting when using 
scheduleWithFixedDelay
+                        try {
+                            Thread.sleep(DELAY_MS);
+                        } catch (InterruptedException e) {
+                            // Do nothing
+                        }
+                    }
+                },
+                DELAY_MS,
+                DELAY_MS,
+                TimeUnit.MILLISECONDS);
+        assertTrue(latch.await((QTY * 2) * DELAY_MS, TimeUnit.MILLISECONDS));
+        assertTrue(fixedDelayCounter.get() <= (QTY / 2 + 1));
+    }
+
+    @Test
+    public void testCloseWithoutShutdown() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        try (CloseableScheduledExecutorService service = new 
CloseableScheduledExecutorService(executorService)) {
+            service.submit(latch::countDown);
+            assertTrue(latch.await(1, TimeUnit.SECONDS));
+        }
+        assertFalse(executorService.isShutdown());
+    }
+
+    @Test
+    public void testCloseWithShutdown() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        try (CloseableScheduledExecutorService service = new 
CloseableScheduledExecutorService(executorService, true)) {
+            service.submit(latch::countDown);
+            assertTrue(latch.await(1, TimeUnit.SECONDS));
+        }
+        assertTrue(executorService.isShutdown());
+        assertThrows(RejectedExecutionException.class, () -> 
executorService.submit(() -> System.out.println("Hello")));
+    }
+
+    @Test
+    public void testCloseMultipleTimes() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        CloseableScheduledExecutorService service = null;
+        try {
+            service = new CloseableScheduledExecutorService(executorService, 
true);
+            service.submit(latch::countDown);
+            assertTrue(latch.await(1, TimeUnit.SECONDS));
+        } finally {
+            if (service != null) {
+                service.close();
+                service.close();
+                service.close();
+            }
+        }
+    }
 }
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java
index 33e1608a2..1bcff4c3f 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java
@@ -24,11 +24,10 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
@@ -61,13 +60,15 @@ public class PersistentTtlNode implements Closeable {
     public static final int DEFAULT_TOUCH_SCHEDULE_FACTOR = 2;
     public static final boolean DEFAULT_USE_PARENT_CREATION = true;
 
+    @VisibleForTesting
+    static final String TOUCH_THREAD_NAME = "PersistentTtlNode";
+
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final PersistentNode node;
     private final CuratorFramework client;
     private final long ttlMs;
     private final int touchScheduleFactor;
-    private final ScheduledExecutorService executorService;
-    private final AtomicReference<Future<?>> futureRef = new 
AtomicReference<>();
+    private final CloseableScheduledExecutorService closeableExecutorService;
     private final String childPath;
 
     /**
@@ -79,7 +80,9 @@ public class PersistentTtlNode implements Closeable {
     public PersistentTtlNode(CuratorFramework client, String path, long ttlMs, 
byte[] initData) {
         this(
                 client,
-                
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("PersistentTtlNode")),
+                new CloseableScheduledExecutorService(
+                        
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory(TOUCH_THREAD_NAME)),
+                        true),
                 path,
                 ttlMs,
                 initData,
@@ -99,7 +102,9 @@ public class PersistentTtlNode implements Closeable {
             CuratorFramework client, String path, long ttlMs, byte[] initData, 
boolean useParentCreation) {
         this(
                 client,
-                
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("PersistentTtlNode")),
+                new CloseableScheduledExecutorService(
+                        
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory(TOUCH_THREAD_NAME)),
+                        true),
                 path,
                 ttlMs,
                 initData,
@@ -128,7 +133,7 @@ public class PersistentTtlNode implements Closeable {
             int touchScheduleFactor) {
         this(
                 client,
-                executorService,
+                new CloseableScheduledExecutorService(executorService),
                 path,
                 ttlMs,
                 initData,
@@ -157,6 +162,26 @@ public class PersistentTtlNode implements Closeable {
             String childNodeName,
             int touchScheduleFactor,
             boolean useParentCreation) {
+        this(
+                client,
+                new CloseableScheduledExecutorService(executorService, false),
+                path,
+                ttlMs,
+                initData,
+                childNodeName,
+                touchScheduleFactor,
+                useParentCreation);
+    }
+
+    private PersistentTtlNode(
+            CuratorFramework client,
+            CloseableScheduledExecutorService closeableExecutorService,
+            String path,
+            long ttlMs,
+            byte[] initData,
+            String childNodeName,
+            int touchScheduleFactor,
+            boolean useParentCreation) {
         this.client = Objects.requireNonNull(client, "client cannot be null");
         this.ttlMs = ttlMs;
         this.touchScheduleFactor = touchScheduleFactor;
@@ -168,7 +193,7 @@ public class PersistentTtlNode implements Closeable {
                         // NOP
                     }
                 };
-        this.executorService = Objects.requireNonNull(executorService, 
"executorService cannot be null");
+        this.closeableExecutorService = closeableExecutorService;
         childPath = ZKPaths.makePath(Objects.requireNonNull(path, "path cannot 
be null"), childNodeName);
     }
 
@@ -198,9 +223,8 @@ public class PersistentTtlNode implements Closeable {
      */
     public void start() {
         node.start();
-        Future<?> future = executorService.scheduleAtFixedRate(
+        closeableExecutorService.scheduleAtFixedRate(
                 this::touch, ttlMs / touchScheduleFactor, ttlMs / 
touchScheduleFactor, TimeUnit.MILLISECONDS);
-        futureRef.set(future);
     }
 
     /**
@@ -238,6 +262,11 @@ public class PersistentTtlNode implements Closeable {
         return node.getData();
     }
 
+    @VisibleForTesting
+    CloseableScheduledExecutorService getCloseableScheduledExecutorService() {
+        return closeableExecutorService;
+    }
+
     /**
      * Call when you are done with the PersistentTtlNode. Note: the ZNode is 
<em>not</em> immediately
      * deleted. However, if no other PersistentTtlNode with the same path is 
running the node will get deleted
@@ -245,10 +274,7 @@ public class PersistentTtlNode implements Closeable {
      */
     @Override
     public void close() {
-        Future<?> future = futureRef.getAndSet(null);
-        if (future != null) {
-            future.cancel(true);
-        }
+        closeableExecutorService.close();
         try {
             node.close();
         } catch (IOException e) {
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
index b98ada445..562075522 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
@@ -28,9 +28,11 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@@ -41,6 +43,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
@@ -201,23 +204,17 @@ public class TestPersistentTtlNode extends 
CuratorTestBase {
         final long testTtlMs = 500L;
         final CountDownLatch mainCreatedLatch = new CountDownLatch(1);
         final CountDownLatch mainDeletedLatch = new CountDownLatch(1);
-        final AtomicBoolean touchCreated = new AtomicBoolean();
         try (CuratorFramework client =
                 CuratorFrameworkFactory.newClient(server.getConnectString(), 
new RetryOneTime(1))) {
             client.start();
             assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
-            try (PersistentWatcher watcher = new PersistentWatcher(client, 
mainPath, true)) {
+            try (PersistentWatcher watcher = new PersistentWatcher(client, 
mainPath, false)) {
                 final Watcher listener = event -> {
-                    final String path = event.getPath();
-                    if (mainPath.equals(path)) {
-                        final EventType type = event.getType();
-                        if (EventType.NodeCreated.equals(type)) {
-                            mainCreatedLatch.countDown();
-                        } else if (EventType.NodeDeleted.equals(type)) {
-                            mainDeletedLatch.countDown();
-                        }
-                    } else if (touchPath.equals(path)) {
-                        touchCreated.set(true);
+                    final EventType type = event.getType();
+                    if (EventType.NodeCreated.equals(type)) {
+                        mainCreatedLatch.countDown();
+                    } else if (EventType.NodeDeleted.equals(type)) {
+                        mainDeletedLatch.countDown();
                     }
                 };
                 watcher.getListenable().addListener(listener);
@@ -233,8 +230,110 @@ public class TestPersistentTtlNode extends 
CuratorTestBase {
                 }
                 assertNull(client.checkExists().forPath(touchPath));
                 assertTrue(mainDeletedLatch.await(3L * testTtlMs, 
TimeUnit.MILLISECONDS));
-                assertFalse(touchCreated.get()); // Just to control that touch 
ZNode never created
             }
         }
     }
+
+    @Test
+    public void testInternalExecutorClose() throws Exception {
+        final String mainPath = "/parent/main";
+        final String touchPath = ZKPaths.makePath(mainPath, 
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+        final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+        try (CuratorFramework client =
+                CuratorFrameworkFactory.newClient(server.getConnectString(), 
new RetryOneTime(1))) {
+            client.start();
+            assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+            try (PersistentWatcher watcher = new PersistentWatcher(client, 
touchPath, false)) {
+                final Watcher listener = event -> {
+                    if (EventType.NodeCreated.equals(event.getType())) {
+                        touchCreatedLatch.countDown();
+                    }
+                };
+                watcher.getListenable().addListener(listener);
+                watcher.start();
+                final AtomicLong executorThreadId = new AtomicLong();
+                CloseableScheduledExecutorService executor;
+                try (PersistentTtlNode node = new PersistentTtlNode(client, 
mainPath, ttlMs, new byte[0])) {
+                    node.start();
+                    assertTrue(touchCreatedLatch.await(5 * ttlMs, 
TimeUnit.MILLISECONDS));
+                    executor = node.getCloseableScheduledExecutorService();
+                    assertNotNull(executor);
+                    executor.submit(() ->
+                                    
executorThreadId.set(Thread.currentThread().getId()))
+                            .get();
+                    assertNotNull(getThreadWithIdAndName(executorThreadId, 
PersistentTtlNode.TOUCH_THREAD_NAME));
+                }
+                assertTrue(executor.isShutdown());
+                // Here we know the executor has been shutdown, so the thread 
should NOT be there eventually
+                boolean executorThreadDestroyed = false;
+                final long maxTimeExecutorThreadDestroyedMs = 1_000L;
+                final long checkIntervalMs = 10L;
+                for (long i = 0; i <= maxTimeExecutorThreadDestroyedMs / 
checkIntervalMs; i++) {
+                    Thread.sleep(checkIntervalMs);
+                    if (getThreadWithIdAndName(executorThreadId, 
PersistentTtlNode.TOUCH_THREAD_NAME) == null) {
+                        executorThreadDestroyed = true;
+                        break;
+                    }
+                }
+                assertTrue(executorThreadDestroyed);
+            }
+        }
+    }
+
+    /**
+     * @return thread with given id and name or <code>null</code> if NOT exist
+     */
+    private Thread getThreadWithIdAndName(final AtomicLong executorThreadId, 
final String name) {
+        return Thread.getAllStackTraces().keySet().stream()
+                .filter(t -> t.getId() == executorThreadId.get() && 
t.getName().contains(name))
+                .findFirst()
+                .orElse(null);
+    }
+
+    @Test
+    public void testExternalExecutorClose() throws Exception {
+        final String mainPath = "/parent/main";
+        final String touchPath = ZKPaths.makePath(mainPath, 
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+        final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+        final String threadName = "testThreadName";
+        final ScheduledExecutorService executor =
+                Executors.newSingleThreadScheduledExecutor(task -> new 
Thread(task, threadName));
+        try (CuratorFramework client =
+                CuratorFrameworkFactory.newClient(server.getConnectString(), 
new RetryOneTime(1))) {
+            client.start();
+            assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+            try (PersistentWatcher watcher = new PersistentWatcher(client, 
touchPath, false)) {
+                final Watcher listener = event -> {
+                    if (EventType.NodeCreated.equals(event.getType())) {
+                        touchCreatedLatch.countDown();
+                    }
+                };
+                watcher.getListenable().addListener(listener);
+                watcher.start();
+                final AtomicLong executorThreadId = new AtomicLong();
+                try (PersistentTtlNode node = new PersistentTtlNode(
+                        client,
+                        executor,
+                        mainPath,
+                        ttlMs,
+                        new byte[0],
+                        PersistentTtlNode.DEFAULT_CHILD_NODE_NAME,
+                        PersistentTtlNode.DEFAULT_TOUCH_SCHEDULE_FACTOR)) {
+                    node.start();
+                    assertTrue(touchCreatedLatch.await(5 * ttlMs, 
TimeUnit.MILLISECONDS));
+                    node.getCloseableScheduledExecutorService()
+                            .submit(() ->
+                                    
executorThreadId.set(Thread.currentThread().getId()))
+                            .get();
+                    assertNotNull(getThreadWithIdAndName(executorThreadId, 
threadName));
+                }
+                // Here PersistentTtlNode tasks have been cancelled (BUT 
executor NOT shutdown)
+                Thread.sleep(100L);
+                assertFalse(executor.isShutdown());
+                assertNotNull(getThreadWithIdAndName(executorThreadId, 
threadName));
+            }
+        } finally {
+            executor.shutdownNow();
+        }
+    }
 }

Reply via email to