Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
C0urante merged PR #12290: URL: https://github.com/apache/kafka/pull/12290 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
gharris1727 commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1450789672 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -434,23 +484,31 @@ public Block(Map props) { public Block(String block) { this.block = block; -synchronized (Block.class) { -if (blockLatch != null) { -blockLatch.countDown(); +if (block != null) { +synchronized (Block.class) { +resetAwaitBlockLatch(); +awaitBlockLatch = new CountDownLatch(1); +Block.class.notify(); } -blockLatch = new CountDownLatch(1); } } public void maybeBlockOn(String block) { if (block.equals(this.block)) { log.info("Will block on {}", block); -blockLatch.countDown(); +CountDownLatch blockLatch; +synchronized (Block.class) { +awaitBlockLatch.countDown(); Review Comment: nit: small NPE here under this sequence of calls: 1. new Block(s) 2. Block.reset() 3. block.maybeBlockOn(s) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
C0urante commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1428127814 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); -} +awaitBlockLatch = Block.awaitBlockLatch; +} + +if (awaitBlockLatch == null) { +throw new IllegalArgumentException("No connector has been created yet"); } log.debug("Waiting for connector to block"); -if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { +if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); Review Comment: I don't believe these changes are necessary here since the portions they address are not affected by the PR. If you would like to do this cleanup in a separate PR, I'd be happy to review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
gharris1727 commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1425906944 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); -} +awaitBlockLatch = Block.awaitBlockLatch; +} + +if (awaitBlockLatch == null) { +throw new IllegalArgumentException("No connector has been created yet"); } log.debug("Waiting for connector to block"); -if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { +if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); Review Comment: WDYT about a `Block.prepare()` called before the test starts that creates the awaitBlockLatch, instead of having the Block constructor initialize it? That could eliminate the wait-notify mechanism, since only one thread (the test thread) would be responsible for setting/clearing the awaitBlockLatch. edit: Would this also allow you to block in methods used during plugin scanning, if you only started blocking if the asyncBlockLatch had been prepared first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
gharris1727 commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1425906944 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); -} +awaitBlockLatch = Block.awaitBlockLatch; +} + +if (awaitBlockLatch == null) { +throw new IllegalArgumentException("No connector has been created yet"); } log.debug("Waiting for connector to block"); -if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { +if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); Review Comment: WDYT about a `Block.prepare()` called before the test starts that creates the awaitBlockLatch, instead of having the Block constructor initialize it? That could eliminate the wait-notify mechanism, since only one thread (the test thread) would be responsible for setting/clearing the awaitBlockLatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
gharris1727 commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1425904694 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); -} +awaitBlockLatch = Block.awaitBlockLatch; +} + +if (awaitBlockLatch == null) { +throw new IllegalArgumentException("No connector has been created yet"); } log.debug("Waiting for connector to block"); -if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { +if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); Review Comment: If you add this stanza before the `log.debug("Connector should now be blocked")` the tests still pass: ``` boolean retry; synchronized (Block.class) { retry = Block.awaitBlockLatch != null && Block.awaitBlockLatch != awaitBlockLatch; } if (retry) { log.debug("New blocking instance was created, retrying wait"); waitForBlock(); } ``` For me, I see this being printed in: * testBlockInSinkTaskStart * testBlockInConnectorStart * testWorkerRestartWithBlockInConnectorStart * testBlockInSourceTaskStart * testBlockInConnectorInitialize This leads me to believe that this function is normally exiting before the blocking method of the last-instantiated instance happens. I don't immediately see how this could cause flakiness, but it's at least an instance of the method not doing what it says it does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
gharris1727 commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1425879028 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -139,7 +141,8 @@ public void setup() throws Exception { public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); -Block.resetBlockLatch(); +// unblock everything so that we don't leak threads after each test run +Block.reset(); Review Comment: > It's easier to handle in a single method rather than copy over to 11 test cases Oh I see, I thought it would be sufficient to add one test case that called stop() to verify that one type of blocked thread still allows shutdown to complete, rather than verifying it for all of the different ways of blocking threads. That would have less coverage that the current tests on trunk. > I've pushed a tweak that adds logic to wait for the blocked threads to complete in Block::reset. I think this is probably the better solution. The leak tester can separately verify that resources were closed properly now that the test ensures the threads stop. :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
C0urante commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1425798851 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -139,7 +141,8 @@ public void setup() throws Exception { public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); -Block.resetBlockLatch(); +// unblock everything so that we don't leak threads after each test run +Block.reset(); Review Comment: > I think that should take place in a test then, not in the cleanup. It's easier to handle in a single method rather than copy over to 11 test cases, though. And I also don't necessarily see why `@After`-annotated classes need to be used exclusively for cleanup. The concern about threads leaking (even if for a short period) beyond the scope of the test definitely seems valid. I've pushed a tweak that adds logic to wait for the blocked threads to complete in `Block::reset`. LMK if this seems clean enough; if not, I can bite the bullet and reverse the order of operations in `BlockingConnectorTest::close` and then see about adding more explicit checks for graceful worker shutdown in other places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
gharris1727 commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1425661550 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -350,13 +353,16 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r } private static class Block { Review Comment: > Do you think it's alright to merge this as-is without blocking on that? Yep not a blocker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
gharris1727 commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1425661185 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -139,7 +141,8 @@ public void setup() throws Exception { public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); -Block.resetBlockLatch(); +// unblock everything so that we don't leak threads after each test run +Block.reset(); Review Comment: I think that should take place in a test then, not in the cleanup. The reason I bring this up is that if I were to assert that the clients/threads are all stopped immediately after Block.reset() (as implemented in #14783) there's no synchronization to ensure that cleanup takes place before the assertion fires. The "asynchronous cleanup" initiated by Block.reset could exceed the lifetime of the test, still leaking the threads but only temporarily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
C0urante commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1425475580 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); -} +awaitBlockLatch = Block.awaitBlockLatch; +} + +if (awaitBlockLatch == null) { +throw new IllegalArgumentException("No connector has been created yet"); Review Comment: Good point--given that we're not guaranteed that, e.g., `Connector::start` has been invoked after a REST request to create a connector has returned, this does seem like a chance for a flaky failure. I've tweaked this part to handle the case when `awaitBlockLatch` is null gracefully, without risking blocking forever. ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -350,13 +353,16 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r } private static class Block { Review Comment: > can you make this public to allow OffsetsApiIntegrationTest to use the latch? Yep, done 👍 > and do you think that maybe these connectors should be moved out of this test to a common reusable class? I do think this would be cleaner, but it'd fairly involved. Do you think it's alright to merge this as-is without blocking on that? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -350,13 +353,16 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r } private static class Block { Review Comment: > can you make this public to allow OffsetsApiIntegrationTest to use the latch? Yep, done 👍 > and do you think that maybe these connectors should be moved out of this test to a common reusable class? I do think this would be cleaner, but it'd fairly involved. Do you think it's alright to merge this as-is without blocking on that? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -139,7 +141,8 @@ public void setup() throws Exception { public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); -Block.resetBlockLatch(); +// unblock everything so that we don't leak threads after each test run +Block.reset(); Review Comment: It may be valuable to ensure that workers can shut down gracefully under these circumstances. Thoughts? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); -} +awaitBlockLatch = Block.awaitBlockLatch; +} + +if (awaitBlockLatch == null) { +throw new IllegalArgumentException("No connector has been created yet"); } log.debug("Waiting for connector to block"); -if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { +if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); Review Comment: Right now it's a matter of writing test code carefully, with the assumption that if any connector or task instance has hit the block, it's the one we're interested in. So far I believe this holds for all the tests; let me know if you've found any exceptions, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kaf
Re: [PR] MINOR: Stop leaking threads in BlockingConnectorTest [kafka]
gharris1727 commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1424413335 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -350,13 +353,16 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r } private static class Block { Review Comment: can you make this public to allow OffsetsApiIntegrationTest to use the latch? and do you think that maybe these connectors should be moved out of this test to a common reusable class? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); -} +awaitBlockLatch = Block.awaitBlockLatch; +} + +if (awaitBlockLatch == null) { +throw new IllegalArgumentException("No connector has been created yet"); Review Comment: Is this an opportunity for a flaky failure, if the test thread advances before the connector is created. It seems very rare, I don't see any instances on the Gradle dashboard. ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } +/** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { +CountDownLatch awaitBlockLatch; synchronized (Block.class) { -if (blockLatch == null) { -throw new IllegalArgumentException("No connector has been created yet"); -} +awaitBlockLatch = Block.awaitBlockLatch; +} + +if (awaitBlockLatch == null) { +throw new IllegalArgumentException("No connector has been created yet"); } log.debug("Waiting for connector to block"); -if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { +if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); Review Comment: Since scanning creates connector instances, and validation caches the connector instance, how do you ensure that the right awaitBlockLatch is being waited on here? ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ## @@ -139,7 +141,8 @@ public void setup() throws Exception { public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); -Block.resetBlockLatch(); +// unblock everything so that we don't leak threads after each test run +Block.reset(); Review Comment: WDYT about resetting before stopping the workers, to allow a normal shutdown to happen? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org