This is an automated email from the ASF dual-hosted git repository.
kezhuw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 57cf5135a FIx PersistentTTLNode thread leak (#1264)
57cf5135a is described below
commit 57cf5135aa7cf2e5e37f3147b3d702f06b15ccfc
Author: chevaris <[email protected]>
AuthorDate: Mon Apr 21 12:24:30 2025 +0200
FIx PersistentTTLNode thread leak (#1264)
Fixes #1263.
---
.../utils/CloseableScheduledExecutorService.java | 24 ++++
.../TestCloseableScheduledExecutorService.java | 83 ++++++++++++++
.../framework/recipes/nodes/PersistentTtlNode.java | 54 ++++++---
.../recipes/nodes/TestPersistentTtlNode.java | 127 ++++++++++++++++++---
4 files changed, 260 insertions(+), 28 deletions(-)
diff --git
a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
index 8fbc420ee..3161b2e44 100644
---
a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
+++
b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -95,4 +95,28 @@ public class CloseableScheduledExecutorService extends
CloseableExecutorService
scheduledExecutorService.scheduleWithFixedDelay(task,
initialDelay, delay, unit);
return new InternalScheduledFutureTask(scheduledFuture);
}
+
+ /**
+ * Creates and executes a periodic action that becomes enabled first after
the given initial
+ * delay, and subsequently with the given period; that is executions will
commence after {@code
+ * initialDelay} then {@code initialDelay+period}, then {@code
initialDelay + 2 * period}, and so
+ * on. If any execution of the task encounters an exception, subsequent
executions are suppressed.
+ * Otherwise, the task will only terminate via cancellation or termination
of the executor. If any
+ * execution of this task takes longer than its period, then subsequent
executions may start late,
+ * but will not concurrently execute.
+ *
+ * @param task the task to execute
+ * @param initialDelay the time to delay first execution
+ * @param period the period between successive executions
+ * @param unit the time unit of the initialDelay and delay parameters
+ * @return a Future representing pending completion of the task, and whose
<tt>get()</tt> method
+ * will throw an exception upon cancellation
+ */
+ public Future<?> scheduleAtFixedRate(Runnable task, long initialDelay,
long period, TimeUnit unit) {
+ Preconditions.checkState(isOpen.get(), "CloseableExecutorService is
closed");
+
+ ScheduledFuture<?> scheduledFuture =
+ scheduledExecutorService.scheduleAtFixedRate(task,
initialDelay, period, unit);
+ return new InternalScheduledFutureTask(scheduledFuture);
+ }
}
diff --git
a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
index 42ad7a6bb..290635680 100644
---
a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
+++
b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
@@ -20,9 +20,12 @@
package org.apache.curator.utils;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -110,4 +113,84 @@ public class TestCloseableScheduledExecutorService {
assertTrue(newValue > value);
assertEquals(innerValue, innerCounter.get());
}
+
+ @Test
+ public void testCloseableScheduleAtFixedRate() throws InterruptedException
{
+ CloseableScheduledExecutorService service = new
CloseableScheduledExecutorService(executorService);
+
+ final CountDownLatch latch = new CountDownLatch(QTY);
+ final AtomicInteger fixedDelayCounter = new AtomicInteger();
+ service.scheduleAtFixedRate(
+ new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ // This delay is almost NOT impacting when using
scheduleAtFixedRate
+ try {
+ Thread.sleep(DELAY_MS);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+ },
+ DELAY_MS,
+ DELAY_MS,
+ TimeUnit.MILLISECONDS);
+ service.scheduleWithFixedDelay(
+ new Runnable() {
+ @Override
+ public void run() {
+ fixedDelayCounter.incrementAndGet();
+ // This delay is impacting when using
scheduleWithFixedDelay
+ try {
+ Thread.sleep(DELAY_MS);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+ },
+ DELAY_MS,
+ DELAY_MS,
+ TimeUnit.MILLISECONDS);
+ assertTrue(latch.await((QTY * 2) * DELAY_MS, TimeUnit.MILLISECONDS));
+ assertTrue(fixedDelayCounter.get() <= (QTY / 2 + 1));
+ }
+
+ @Test
+ public void testCloseWithoutShutdown() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ try (CloseableScheduledExecutorService service = new
CloseableScheduledExecutorService(executorService)) {
+ service.submit(latch::countDown);
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ }
+ assertFalse(executorService.isShutdown());
+ }
+
+ @Test
+ public void testCloseWithShutdown() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ try (CloseableScheduledExecutorService service = new
CloseableScheduledExecutorService(executorService, true)) {
+ service.submit(latch::countDown);
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ }
+ assertTrue(executorService.isShutdown());
+ assertThrows(RejectedExecutionException.class, () ->
executorService.submit(() -> System.out.println("Hello")));
+ }
+
+ @Test
+ public void testCloseMultipleTimes() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ CloseableScheduledExecutorService service = null;
+ try {
+ service = new CloseableScheduledExecutorService(executorService,
true);
+ service.submit(latch::countDown);
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ } finally {
+ if (service != null) {
+ service.close();
+ service.close();
+ service.close();
+ }
+ }
+ }
}
diff --git
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java
index 33e1608a2..1bcff4c3f 100644
---
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java
+++
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java
@@ -24,11 +24,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
@@ -61,13 +60,15 @@ public class PersistentTtlNode implements Closeable {
public static final int DEFAULT_TOUCH_SCHEDULE_FACTOR = 2;
public static final boolean DEFAULT_USE_PARENT_CREATION = true;
+ @VisibleForTesting
+ static final String TOUCH_THREAD_NAME = "PersistentTtlNode";
+
private final Logger log = LoggerFactory.getLogger(getClass());
private final PersistentNode node;
private final CuratorFramework client;
private final long ttlMs;
private final int touchScheduleFactor;
- private final ScheduledExecutorService executorService;
- private final AtomicReference<Future<?>> futureRef = new
AtomicReference<>();
+ private final CloseableScheduledExecutorService closeableExecutorService;
private final String childPath;
/**
@@ -79,7 +80,9 @@ public class PersistentTtlNode implements Closeable {
public PersistentTtlNode(CuratorFramework client, String path, long ttlMs,
byte[] initData) {
this(
client,
-
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("PersistentTtlNode")),
+ new CloseableScheduledExecutorService(
+
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory(TOUCH_THREAD_NAME)),
+ true),
path,
ttlMs,
initData,
@@ -99,7 +102,9 @@ public class PersistentTtlNode implements Closeable {
CuratorFramework client, String path, long ttlMs, byte[] initData,
boolean useParentCreation) {
this(
client,
-
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("PersistentTtlNode")),
+ new CloseableScheduledExecutorService(
+
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory(TOUCH_THREAD_NAME)),
+ true),
path,
ttlMs,
initData,
@@ -128,7 +133,7 @@ public class PersistentTtlNode implements Closeable {
int touchScheduleFactor) {
this(
client,
- executorService,
+ new CloseableScheduledExecutorService(executorService),
path,
ttlMs,
initData,
@@ -157,6 +162,26 @@ public class PersistentTtlNode implements Closeable {
String childNodeName,
int touchScheduleFactor,
boolean useParentCreation) {
+ this(
+ client,
+ new CloseableScheduledExecutorService(executorService, false),
+ path,
+ ttlMs,
+ initData,
+ childNodeName,
+ touchScheduleFactor,
+ useParentCreation);
+ }
+
+ private PersistentTtlNode(
+ CuratorFramework client,
+ CloseableScheduledExecutorService closeableExecutorService,
+ String path,
+ long ttlMs,
+ byte[] initData,
+ String childNodeName,
+ int touchScheduleFactor,
+ boolean useParentCreation) {
this.client = Objects.requireNonNull(client, "client cannot be null");
this.ttlMs = ttlMs;
this.touchScheduleFactor = touchScheduleFactor;
@@ -168,7 +193,7 @@ public class PersistentTtlNode implements Closeable {
// NOP
}
};
- this.executorService = Objects.requireNonNull(executorService,
"executorService cannot be null");
+ this.closeableExecutorService = closeableExecutorService;
childPath = ZKPaths.makePath(Objects.requireNonNull(path, "path cannot
be null"), childNodeName);
}
@@ -198,9 +223,8 @@ public class PersistentTtlNode implements Closeable {
*/
public void start() {
node.start();
- Future<?> future = executorService.scheduleAtFixedRate(
+ closeableExecutorService.scheduleAtFixedRate(
this::touch, ttlMs / touchScheduleFactor, ttlMs /
touchScheduleFactor, TimeUnit.MILLISECONDS);
- futureRef.set(future);
}
/**
@@ -238,6 +262,11 @@ public class PersistentTtlNode implements Closeable {
return node.getData();
}
+ @VisibleForTesting
+ CloseableScheduledExecutorService getCloseableScheduledExecutorService() {
+ return closeableExecutorService;
+ }
+
/**
* Call when you are done with the PersistentTtlNode. Note: the ZNode is
<em>not</em> immediately
* deleted. However, if no other PersistentTtlNode with the same path is
running the node will get deleted
@@ -245,10 +274,7 @@ public class PersistentTtlNode implements Closeable {
*/
@Override
public void close() {
- Future<?> future = futureRef.getAndSet(null);
- if (future != null) {
- future.cancel(true);
- }
+ closeableExecutorService.close();
try {
node.close();
} catch (IOException e) {
diff --git
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
index b98ada445..562075522 100644
---
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
+++
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java
@@ -28,9 +28,11 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@@ -41,6 +43,7 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.Timing;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.compatibility.Timing2;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
@@ -201,23 +204,17 @@ public class TestPersistentTtlNode extends
CuratorTestBase {
final long testTtlMs = 500L;
final CountDownLatch mainCreatedLatch = new CountDownLatch(1);
final CountDownLatch mainDeletedLatch = new CountDownLatch(1);
- final AtomicBoolean touchCreated = new AtomicBoolean();
try (CuratorFramework client =
CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1))) {
client.start();
assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
- try (PersistentWatcher watcher = new PersistentWatcher(client,
mainPath, true)) {
+ try (PersistentWatcher watcher = new PersistentWatcher(client,
mainPath, false)) {
final Watcher listener = event -> {
- final String path = event.getPath();
- if (mainPath.equals(path)) {
- final EventType type = event.getType();
- if (EventType.NodeCreated.equals(type)) {
- mainCreatedLatch.countDown();
- } else if (EventType.NodeDeleted.equals(type)) {
- mainDeletedLatch.countDown();
- }
- } else if (touchPath.equals(path)) {
- touchCreated.set(true);
+ final EventType type = event.getType();
+ if (EventType.NodeCreated.equals(type)) {
+ mainCreatedLatch.countDown();
+ } else if (EventType.NodeDeleted.equals(type)) {
+ mainDeletedLatch.countDown();
}
};
watcher.getListenable().addListener(listener);
@@ -233,8 +230,110 @@ public class TestPersistentTtlNode extends
CuratorTestBase {
}
assertNull(client.checkExists().forPath(touchPath));
assertTrue(mainDeletedLatch.await(3L * testTtlMs,
TimeUnit.MILLISECONDS));
- assertFalse(touchCreated.get()); // Just to control that touch
ZNode never created
}
}
}
+
+ @Test
+ public void testInternalExecutorClose() throws Exception {
+ final String mainPath = "/parent/main";
+ final String touchPath = ZKPaths.makePath(mainPath,
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+ final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+ try (CuratorFramework client =
+ CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1))) {
+ client.start();
+ assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+ try (PersistentWatcher watcher = new PersistentWatcher(client,
touchPath, false)) {
+ final Watcher listener = event -> {
+ if (EventType.NodeCreated.equals(event.getType())) {
+ touchCreatedLatch.countDown();
+ }
+ };
+ watcher.getListenable().addListener(listener);
+ watcher.start();
+ final AtomicLong executorThreadId = new AtomicLong();
+ CloseableScheduledExecutorService executor;
+ try (PersistentTtlNode node = new PersistentTtlNode(client,
mainPath, ttlMs, new byte[0])) {
+ node.start();
+ assertTrue(touchCreatedLatch.await(5 * ttlMs,
TimeUnit.MILLISECONDS));
+ executor = node.getCloseableScheduledExecutorService();
+ assertNotNull(executor);
+ executor.submit(() ->
+
executorThreadId.set(Thread.currentThread().getId()))
+ .get();
+ assertNotNull(getThreadWithIdAndName(executorThreadId,
PersistentTtlNode.TOUCH_THREAD_NAME));
+ }
+ assertTrue(executor.isShutdown());
+ // Here we know the executor has been shutdown, so the thread
should NOT be there eventually
+ boolean executorThreadDestroyed = false;
+ final long maxTimeExecutorThreadDestroyedMs = 1_000L;
+ final long checkIntervalMs = 10L;
+ for (long i = 0; i <= maxTimeExecutorThreadDestroyedMs /
checkIntervalMs; i++) {
+ Thread.sleep(checkIntervalMs);
+ if (getThreadWithIdAndName(executorThreadId,
PersistentTtlNode.TOUCH_THREAD_NAME) == null) {
+ executorThreadDestroyed = true;
+ break;
+ }
+ }
+ assertTrue(executorThreadDestroyed);
+ }
+ }
+ }
+
+ /**
+ * @return thread with given id and name or <code>null</code> if NOT exist
+ */
+ private Thread getThreadWithIdAndName(final AtomicLong executorThreadId,
final String name) {
+ return Thread.getAllStackTraces().keySet().stream()
+ .filter(t -> t.getId() == executorThreadId.get() &&
t.getName().contains(name))
+ .findFirst()
+ .orElse(null);
+ }
+
+ @Test
+ public void testExternalExecutorClose() throws Exception {
+ final String mainPath = "/parent/main";
+ final String touchPath = ZKPaths.makePath(mainPath,
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+ final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+ final String threadName = "testThreadName";
+ final ScheduledExecutorService executor =
+ Executors.newSingleThreadScheduledExecutor(task -> new
Thread(task, threadName));
+ try (CuratorFramework client =
+ CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1))) {
+ client.start();
+ assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+ try (PersistentWatcher watcher = new PersistentWatcher(client,
touchPath, false)) {
+ final Watcher listener = event -> {
+ if (EventType.NodeCreated.equals(event.getType())) {
+ touchCreatedLatch.countDown();
+ }
+ };
+ watcher.getListenable().addListener(listener);
+ watcher.start();
+ final AtomicLong executorThreadId = new AtomicLong();
+ try (PersistentTtlNode node = new PersistentTtlNode(
+ client,
+ executor,
+ mainPath,
+ ttlMs,
+ new byte[0],
+ PersistentTtlNode.DEFAULT_CHILD_NODE_NAME,
+ PersistentTtlNode.DEFAULT_TOUCH_SCHEDULE_FACTOR)) {
+ node.start();
+ assertTrue(touchCreatedLatch.await(5 * ttlMs,
TimeUnit.MILLISECONDS));
+ node.getCloseableScheduledExecutorService()
+ .submit(() ->
+
executorThreadId.set(Thread.currentThread().getId()))
+ .get();
+ assertNotNull(getThreadWithIdAndName(executorThreadId,
threadName));
+ }
+ // Here PersistentTtlNode tasks have been cancelled (BUT
executor NOT shutdown)
+ Thread.sleep(100L);
+ assertFalse(executor.isShutdown());
+ assertNotNull(getThreadWithIdAndName(executorThreadId,
threadName));
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ }
}