kezhuw commented on code in PR #1264:
URL: https://github.com/apache/curator/pull/1264#discussion_r2045865526
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java:
##########
@@ -237,4 +241,88 @@ void touch() {
}
}
}
+
+ @Test
+ public void testInternalExecutorClose() throws Exception {
+ final String mainPath = "/parent/main";
+ final String touchPath = ZKPaths.makePath(mainPath,
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+ final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+ try (CuratorFramework client =
+ CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1))) {
+ client.start();
+ assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+ try (PersistentWatcher watcher = new PersistentWatcher(client,
mainPath, true)) {
+ final Watcher listener = event -> {
+ final String path = event.getPath();
+ if (touchPath.equals(path)) {
+ touchCreatedLatch.countDown();
+ }
+ };
+ watcher.getListenable().addListener(listener);
+ watcher.start();
+ final AtomicLong executorThreadId = new AtomicLong();
+ try (PersistentTtlNode node = new PersistentTtlNode(client,
mainPath, ttlMs, new byte[0])) {
+ node.start();
+ assertTrue(touchCreatedLatch.await(5 * ttlMs,
TimeUnit.MILLISECONDS));
+ node.getCloseableScheduledExecutorService()
+ .submit(() ->
+
executorThreadId.set(Thread.currentThread().getId()));
+ assertFalse(getThreadsWithIdAndName(executorThreadId,
PersistentTtlNode.TOUCH_THREAD_NAME)
+ .isEmpty());
+ }
+ Thread.sleep(10L);
+ assertTrue(getThreadsWithIdAndName(executorThreadId,
PersistentTtlNode.TOUCH_THREAD_NAME)
+ .isEmpty());
+ }
+ }
+ }
+
+ private List<Thread> getThreadsWithIdAndName(final AtomicLong
executorThreadId, final String name) {
Review Comment:
There will be at most one thread. And I think "assertNotNull/assertNull" is
easy to read than "assertFalse(...isEmpty())".
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java:
##########
@@ -237,4 +241,88 @@ void touch() {
}
}
}
+
+ @Test
+ public void testInternalExecutorClose() throws Exception {
+ final String mainPath = "/parent/main";
+ final String touchPath = ZKPaths.makePath(mainPath,
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+ final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+ try (CuratorFramework client =
+ CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1))) {
+ client.start();
+ assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+ try (PersistentWatcher watcher = new PersistentWatcher(client,
mainPath, true)) {
+ final Watcher listener = event -> {
+ final String path = event.getPath();
+ if (touchPath.equals(path)) {
+ touchCreatedLatch.countDown();
+ }
+ };
+ watcher.getListenable().addListener(listener);
+ watcher.start();
+ final AtomicLong executorThreadId = new AtomicLong();
+ try (PersistentTtlNode node = new PersistentTtlNode(client,
mainPath, ttlMs, new byte[0])) {
+ node.start();
+ assertTrue(touchCreatedLatch.await(5 * ttlMs,
TimeUnit.MILLISECONDS));
+ node.getCloseableScheduledExecutorService()
+ .submit(() ->
+
executorThreadId.set(Thread.currentThread().getId()));
+ assertFalse(getThreadsWithIdAndName(executorThreadId,
PersistentTtlNode.TOUCH_THREAD_NAME)
+ .isEmpty());
+ }
+ Thread.sleep(10L);
+ assertTrue(getThreadsWithIdAndName(executorThreadId,
PersistentTtlNode.TOUCH_THREAD_NAME)
+ .isEmpty());
+ }
+ }
+ }
+
+ private List<Thread> getThreadsWithIdAndName(final AtomicLong
executorThreadId, final String name) {
+ return Thread.getAllStackTraces().keySet().stream()
+ .filter(t -> t.getId() == executorThreadId.get() &&
t.getName().contains(name))
+ .collect(Collectors.toList());
+ }
+
+ @Test
+ public void testExternalExecutorClose() throws Exception {
+ final String mainPath = "/parent/main";
+ final String touchPath = ZKPaths.makePath(mainPath,
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+ final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+ try (CuratorFramework client =
+ CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1))) {
+ client.start();
+ assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+ try (PersistentWatcher watcher = new PersistentWatcher(client,
mainPath, true)) {
+ final Watcher listener = event -> {
+ final String path = event.getPath();
+ if (touchPath.equals(path)) {
+ touchCreatedLatch.countDown();
+ }
+ };
+ watcher.getListenable().addListener(listener);
+ watcher.start();
+ final AtomicLong executorThreadId = new AtomicLong();
+ final String threadName = "testThreadName";
+ try (PersistentTtlNode node = new PersistentTtlNode(
+ client,
+ Executors.newSingleThreadScheduledExecutor(task -> new
Thread(task, threadName)),
+ mainPath,
+ ttlMs,
+ new byte[0],
+ PersistentTtlNode.DEFAULT_CHILD_NODE_NAME,
+ PersistentTtlNode.DEFAULT_TOUCH_SCHEDULE_FACTOR)) {
+ node.start();
+ assertTrue(touchCreatedLatch.await(5 * ttlMs,
TimeUnit.MILLISECONDS));
+ node.getCloseableScheduledExecutorService()
+ .submit(() ->
+
executorThreadId.set(Thread.currentThread().getId()));
Review Comment:
```suggestion
node.getCloseableScheduledExecutorService()
.submit(() ->
executorThreadId.set(Thread.currentThread().getId())).get();
```
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java:
##########
@@ -237,4 +241,88 @@ void touch() {
}
}
}
+
+ @Test
+ public void testInternalExecutorClose() throws Exception {
+ final String mainPath = "/parent/main";
+ final String touchPath = ZKPaths.makePath(mainPath,
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+ final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+ try (CuratorFramework client =
+ CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1))) {
+ client.start();
+ assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+ try (PersistentWatcher watcher = new PersistentWatcher(client,
mainPath, true)) {
+ final Watcher listener = event -> {
+ final String path = event.getPath();
+ if (touchPath.equals(path)) {
+ touchCreatedLatch.countDown();
+ }
+ };
+ watcher.getListenable().addListener(listener);
+ watcher.start();
+ final AtomicLong executorThreadId = new AtomicLong();
+ try (PersistentTtlNode node = new PersistentTtlNode(client,
mainPath, ttlMs, new byte[0])) {
+ node.start();
+ assertTrue(touchCreatedLatch.await(5 * ttlMs,
TimeUnit.MILLISECONDS));
+ node.getCloseableScheduledExecutorService()
+ .submit(() ->
+
executorThreadId.set(Thread.currentThread().getId()));
+ assertFalse(getThreadsWithIdAndName(executorThreadId,
PersistentTtlNode.TOUCH_THREAD_NAME)
+ .isEmpty());
+ }
+ Thread.sleep(10L);
+ assertTrue(getThreadsWithIdAndName(executorThreadId,
PersistentTtlNode.TOUCH_THREAD_NAME)
+ .isEmpty());
+ }
+ }
+ }
+
+ private List<Thread> getThreadsWithIdAndName(final AtomicLong
executorThreadId, final String name) {
+ return Thread.getAllStackTraces().keySet().stream()
+ .filter(t -> t.getId() == executorThreadId.get() &&
t.getName().contains(name))
+ .collect(Collectors.toList());
+ }
+
+ @Test
+ public void testExternalExecutorClose() throws Exception {
+ final String mainPath = "/parent/main";
+ final String touchPath = ZKPaths.makePath(mainPath,
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+ final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+ try (CuratorFramework client =
+ CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1))) {
+ client.start();
+ assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+ try (PersistentWatcher watcher = new PersistentWatcher(client,
mainPath, true)) {
+ final Watcher listener = event -> {
+ final String path = event.getPath();
+ if (touchPath.equals(path)) {
+ touchCreatedLatch.countDown();
+ }
+ };
+ watcher.getListenable().addListener(listener);
+ watcher.start();
+ final AtomicLong executorThreadId = new AtomicLong();
+ final String threadName = "testThreadName";
+ try (PersistentTtlNode node = new PersistentTtlNode(
+ client,
+ Executors.newSingleThreadScheduledExecutor(task -> new
Thread(task, threadName)),
Review Comment:
Shutdown this executor after assertions ?
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java:
##########
@@ -237,4 +241,88 @@ void touch() {
}
}
}
+
+ @Test
+ public void testInternalExecutorClose() throws Exception {
+ final String mainPath = "/parent/main";
+ final String touchPath = ZKPaths.makePath(mainPath,
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+ final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+ try (CuratorFramework client =
+ CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1))) {
+ client.start();
+ assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+ try (PersistentWatcher watcher = new PersistentWatcher(client,
mainPath, true)) {
+ final Watcher listener = event -> {
+ final String path = event.getPath();
+ if (touchPath.equals(path)) {
+ touchCreatedLatch.countDown();
+ }
+ };
+ watcher.getListenable().addListener(listener);
+ watcher.start();
+ final AtomicLong executorThreadId = new AtomicLong();
+ try (PersistentTtlNode node = new PersistentTtlNode(client,
mainPath, ttlMs, new byte[0])) {
+ node.start();
+ assertTrue(touchCreatedLatch.await(5 * ttlMs,
TimeUnit.MILLISECONDS));
+ node.getCloseableScheduledExecutorService()
+ .submit(() ->
+
executorThreadId.set(Thread.currentThread().getId()));
Review Comment:
```suggestion
node.getCloseableScheduledExecutorService()
.submit(() ->
executorThreadId.set(Thread.currentThread().getId())).get();
```
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java:
##########
@@ -237,4 +241,88 @@ void touch() {
}
}
}
+
+ @Test
+ public void testInternalExecutorClose() throws Exception {
+ final String mainPath = "/parent/main";
+ final String touchPath = ZKPaths.makePath(mainPath,
PersistentTtlNode.DEFAULT_CHILD_NODE_NAME);
+ final CountDownLatch touchCreatedLatch = new CountDownLatch(1);
+ try (CuratorFramework client =
+ CuratorFrameworkFactory.newClient(server.getConnectString(),
new RetryOneTime(1))) {
+ client.start();
+ assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS));
+ try (PersistentWatcher watcher = new PersistentWatcher(client,
mainPath, true)) {
+ final Watcher listener = event -> {
+ final String path = event.getPath();
+ if (touchPath.equals(path)) {
+ touchCreatedLatch.countDown();
+ }
+ };
+ watcher.getListenable().addListener(listener);
+ watcher.start();
+ final AtomicLong executorThreadId = new AtomicLong();
+ try (PersistentTtlNode node = new PersistentTtlNode(client,
mainPath, ttlMs, new byte[0])) {
+ node.start();
+ assertTrue(touchCreatedLatch.await(5 * ttlMs,
TimeUnit.MILLISECONDS));
+ node.getCloseableScheduledExecutorService()
+ .submit(() ->
+
executorThreadId.set(Thread.currentThread().getId()));
+ assertFalse(getThreadsWithIdAndName(executorThreadId,
PersistentTtlNode.TOUCH_THREAD_NAME)
+ .isEmpty());
+ }
+ Thread.sleep(10L);
Review Comment:
No sure 10ms it is enough in testing env. How about export underlying
executor for tests to `awaitTermination` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]