Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2024-01-18 Thread via GitHub


C0urante merged PR #12290:
URL: https://github.com/apache/kafka/pull/12290


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1450789672


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -434,23 +484,31 @@ public Block(Map props) {
 
 public Block(String block) {
 this.block = block;
-synchronized (Block.class) {
-if (blockLatch != null) {
-blockLatch.countDown();
+if (block != null) {
+synchronized (Block.class) {
+resetAwaitBlockLatch();
+awaitBlockLatch = new CountDownLatch(1);
+Block.class.notify();
 }
-blockLatch = new CountDownLatch(1);
 }
 }
 
 public void maybeBlockOn(String block) {
 if (block.equals(this.block)) {
 log.info("Will block on {}", block);
-blockLatch.countDown();
+CountDownLatch blockLatch;
+synchronized (Block.class) {
+awaitBlockLatch.countDown();

Review Comment:
   nit: small NPE here under this sequence of calls:
   
   1. new Block(s)
   2. Block.reset()
   3. block.maybeBlockOn(s)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-15 Thread via GitHub


C0urante commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1428127814


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");
 }
 
 log.debug("Waiting for connector to block");
-if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
 throw new TimeoutException("Timed out waiting for connector to 
block.");

Review Comment:
   I don't believe these changes are necessary here since the portions they 
address are not affected by the PR. If you would like to do this cleanup in a 
separate PR, I'd be happy to review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-13 Thread via GitHub


gharris1727 commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1425906944


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");
 }
 
 log.debug("Waiting for connector to block");
-if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
 throw new TimeoutException("Timed out waiting for connector to 
block.");

Review Comment:
   WDYT about a `Block.prepare()` called before the test starts that creates 
the awaitBlockLatch, instead of having the Block constructor initialize it? 
That could eliminate the wait-notify mechanism, since only one thread (the test 
thread) would be responsible for setting/clearing the awaitBlockLatch.
   
   edit: Would this also allow you to block in methods used during plugin 
scanning, if you only started blocking if the asyncBlockLatch had been prepared 
first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-13 Thread via GitHub


gharris1727 commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1425906944


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");
 }
 
 log.debug("Waiting for connector to block");
-if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
 throw new TimeoutException("Timed out waiting for connector to 
block.");

Review Comment:
   WDYT about a `Block.prepare()` called before the test starts that creates 
the awaitBlockLatch, instead of having the Block constructor initialize it? 
That could eliminate the wait-notify mechanism, since only one thread (the test 
thread) would be responsible for setting/clearing the awaitBlockLatch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-13 Thread via GitHub


gharris1727 commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1425904694


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");
 }
 
 log.debug("Waiting for connector to block");
-if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
 throw new TimeoutException("Timed out waiting for connector to 
block.");

Review Comment:
   If you add this stanza before the `log.debug("Connector should now be 
blocked")` the tests still pass:
   
   ```
   boolean retry;
   synchronized (Block.class) {
   retry = Block.awaitBlockLatch != null && Block.awaitBlockLatch != 
awaitBlockLatch;
   }
   if (retry) {
   log.debug("New blocking instance was created, retrying wait");
   waitForBlock();
   }
   ```
   For me, I see this being printed in:
   * testBlockInSinkTaskStart
   * testBlockInConnectorStart
   * testWorkerRestartWithBlockInConnectorStart
   * testBlockInSourceTaskStart
   * testBlockInConnectorInitialize
   
   This leads me to believe that this function is normally exiting before the 
blocking method of the last-instantiated instance happens.
   
   I don't immediately see how this could cause flakiness, but it's at least an 
instance of the method not doing what it says it does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-13 Thread via GitHub


gharris1727 commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1425879028


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -139,7 +141,8 @@ public void setup() throws Exception {
 public void close() {
 // stop all Connect, Kafka and Zk threads.
 connect.stop();
-Block.resetBlockLatch();
+// unblock everything so that we don't leak threads after each test run
+Block.reset();

Review Comment:
   > It's easier to handle in a single method rather than copy over to 11 test 
cases
   
   Oh I see, I thought it would be sufficient to add one test case that called 
stop() to verify that one type of blocked thread still allows shutdown to 
complete, rather than verifying it for all of the different ways of blocking 
threads. That would have less coverage that the current tests on trunk.
   
   > I've pushed a tweak that adds logic to wait for the blocked threads to 
complete in Block::reset.
   
   I think this is probably the better solution. The leak tester can separately 
verify that resources were closed properly now that the test ensures the 
threads stop. :+1:
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-13 Thread via GitHub


C0urante commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1425798851


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -139,7 +141,8 @@ public void setup() throws Exception {
 public void close() {
 // stop all Connect, Kafka and Zk threads.
 connect.stop();
-Block.resetBlockLatch();
+// unblock everything so that we don't leak threads after each test run
+Block.reset();

Review Comment:
   > I think that should take place in a test then, not in the cleanup.
   
   It's easier to handle in a single method rather than copy over to 11 test 
cases, though. And I also don't necessarily see why `@After`-annotated classes 
need to be used exclusively for cleanup.
   
   The concern about threads leaking (even if for a short period) beyond the 
scope of the test definitely seems valid. I've pushed a tweak that adds logic 
to wait for the blocked threads to complete in `Block::reset`. LMK if this 
seems clean enough; if not, I can bite the bullet and reverse the order of 
operations in `BlockingConnectorTest::close` and then see about adding more 
explicit checks for graceful worker shutdown in other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-13 Thread via GitHub


gharris1727 commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1425661550


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -350,13 +353,16 @@ private void assertRequestTimesOut(String 
requestDescription, ThrowingRunnable r
 }
 
 private static class Block {

Review Comment:
   > Do you think it's alright to merge this as-is without blocking on that?
   
   Yep not a blocker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-13 Thread via GitHub


gharris1727 commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1425661185


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -139,7 +141,8 @@ public void setup() throws Exception {
 public void close() {
 // stop all Connect, Kafka and Zk threads.
 connect.stop();
-Block.resetBlockLatch();
+// unblock everything so that we don't leak threads after each test run
+Block.reset();

Review Comment:
   I think that should take place in a test then, not in the cleanup.
   
   The reason I bring this up is that if I were to assert that the 
clients/threads are all stopped immediately after Block.reset() (as implemented 
in #14783) there's no synchronization to ensure that cleanup takes place before 
the assertion fires. The "asynchronous cleanup" initiated by Block.reset could 
exceed the lifetime of the test, still leaking the threads but only temporarily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-13 Thread via GitHub


C0urante commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1425475580


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");

Review Comment:
   Good point--given that we're not guaranteed that, e.g., `Connector::start` 
has been invoked after a REST request to create a connector has returned, this 
does seem like a chance for a flaky failure.
   
   I've tweaked this part to handle the case when `awaitBlockLatch` is null 
gracefully, without risking blocking forever.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -350,13 +353,16 @@ private void assertRequestTimesOut(String 
requestDescription, ThrowingRunnable r
 }
 
 private static class Block {

Review Comment:
   > can you make this public to allow OffsetsApiIntegrationTest to use the 
latch?
   
   Yep, done 👍 
   
   > and do you think that maybe these connectors should be moved out of this 
test to a common reusable class?
   
   I do think this would be cleaner, but it'd fairly involved. Do you think 
it's alright to merge this as-is without blocking on that?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -350,13 +353,16 @@ private void assertRequestTimesOut(String 
requestDescription, ThrowingRunnable r
 }
 
 private static class Block {

Review Comment:
   > can you make this public to allow OffsetsApiIntegrationTest to use the 
latch?
   
   Yep, done 👍 
   
   > and do you think that maybe these connectors should be moved out of this 
test to a common reusable class?
   
   I do think this would be cleaner, but it'd fairly involved. Do you think 
it's alright to merge this as-is without blocking on that?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -139,7 +141,8 @@ public void setup() throws Exception {
 public void close() {
 // stop all Connect, Kafka and Zk threads.
 connect.stop();
-Block.resetBlockLatch();
+// unblock everything so that we don't leak threads after each test run
+Block.reset();

Review Comment:
   It may be valuable to ensure that workers can shut down gracefully under 
these circumstances. Thoughts?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");
 }
 
 log.debug("Waiting for connector to block");
-if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
 throw new TimeoutException("Timed out waiting for connector to 
block.");

Review Comment:
   Right now it's a matter of writing test code carefully, with the assumption 
that if any connector or task instance has hit the block, it's the one we're 
interested in. So far I believe this holds for all the tests; let me know if 
you've found any exceptions, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kaf

Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]

2023-12-12 Thread via GitHub


gharris1727 commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1424413335


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -350,13 +353,16 @@ private void assertRequestTimesOut(String 
requestDescription, ThrowingRunnable r
 }
 
 private static class Block {

Review Comment:
   can you make this public to allow OffsetsApiIntegrationTest to use the latch?
   
   and do you think that maybe these connectors should be moved out of this 
test to a common reusable class?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");

Review Comment:
   Is this an opportunity for a flaky failure, if the test thread advances 
before the connector is created. It seems very rare, I don't see any instances 
on the Gradle dashboard.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -368,31 +374,54 @@ private static ConfigDef config() {
 );
 }
 
+/**
+ * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+ * it will block.
+ */
 public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+CountDownLatch awaitBlockLatch;
 synchronized (Block.class) {
-if (blockLatch == null) {
-throw new IllegalArgumentException("No connector has been 
created yet");
-}
+awaitBlockLatch = Block.awaitBlockLatch;
+}
+
+if (awaitBlockLatch == null) {
+throw new IllegalArgumentException("No connector has been 
created yet");
 }
 
 log.debug("Waiting for connector to block");
-if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
 throw new TimeoutException("Timed out waiting for connector to 
block.");

Review Comment:
   
   Since scanning creates connector instances, and validation caches the 
connector instance, how do you ensure that the right awaitBlockLatch is being 
waited on here?



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##
@@ -139,7 +141,8 @@ public void setup() throws Exception {
 public void close() {
 // stop all Connect, Kafka and Zk threads.
 connect.stop();
-Block.resetBlockLatch();
+// unblock everything so that we don't leak threads after each test run
+Block.reset();

Review Comment:
   WDYT about resetting before stopping the workers, to allow a normal shutdown 
to happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org