C0urante commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1425475580
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ########## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } + /** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { + CountDownLatch awaitBlockLatch; synchronized (Block.class) { - if (blockLatch == null) { - throw new IllegalArgumentException("No connector has been created yet"); - } + awaitBlockLatch = Block.awaitBlockLatch; + } + + if (awaitBlockLatch == null) { + throw new IllegalArgumentException("No connector has been created yet"); Review Comment: Good point--given that we're not guaranteed that, e.g., `Connector::start` has been invoked after a REST request to create a connector has returned, this does seem like a chance for a flaky failure. I've tweaked this part to handle the case when `awaitBlockLatch` is null gracefully, without risking blocking forever. ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ########## @@ -350,13 +353,16 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r } private static class Block { Review Comment: > can you make this public to allow OffsetsApiIntegrationTest to use the latch? Yep, done 👍 > and do you think that maybe these connectors should be moved out of this test to a common reusable class? I do think this would be cleaner, but it'd fairly involved. Do you think it's alright to merge this as-is without blocking on that? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ########## @@ -350,13 +353,16 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r } private static class Block { Review Comment: > can you make this public to allow OffsetsApiIntegrationTest to use the latch? Yep, done 👍 > and do you think that maybe these connectors should be moved out of this test to a common reusable class? I do think this would be cleaner, but it'd fairly involved. Do you think it's alright to merge this as-is without blocking on that? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ########## @@ -139,7 +141,8 @@ public void setup() throws Exception { public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); - Block.resetBlockLatch(); + // unblock everything so that we don't leak threads after each test run + Block.reset(); Review Comment: It may be valuable to ensure that workers can shut down gracefully under these circumstances. Thoughts? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ########## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } + /** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { + CountDownLatch awaitBlockLatch; synchronized (Block.class) { - if (blockLatch == null) { - throw new IllegalArgumentException("No connector has been created yet"); - } + awaitBlockLatch = Block.awaitBlockLatch; + } + + if (awaitBlockLatch == null) { + throw new IllegalArgumentException("No connector has been created yet"); } log.debug("Waiting for connector to block"); - if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); Review Comment: Right now it's a matter of writing test code carefully, with the assumption that if any connector or task instance has hit the block, it's the one we're interested in. So far I believe this holds for all the tests; let me know if you've found any exceptions, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org