C0urante commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1425475580


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -368,31 +374,54 @@ private static ConfigDef config() {
                 );
         }
 
+        /**
+         * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+         * it will block.
+         */
         public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+            CountDownLatch awaitBlockLatch;
             synchronized (Block.class) {
-                if (blockLatch == null) {
-                    throw new IllegalArgumentException("No connector has been 
created yet");
-                }
+                awaitBlockLatch = Block.awaitBlockLatch;
+            }
+
+            if (awaitBlockLatch == null) {
+                throw new IllegalArgumentException("No connector has been 
created yet");

Review Comment:
   Good point--given that we're not guaranteed that, e.g., `Connector::start` 
has been invoked after a REST request to create a connector has returned, this 
does seem like a chance for a flaky failure.
   
   I've tweaked this part to handle the case when `awaitBlockLatch` is null 
gracefully, without risking blocking forever.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -350,13 +353,16 @@ private void assertRequestTimesOut(String 
requestDescription, ThrowingRunnable r
     }
 
     private static class Block {

Review Comment:
   > can you make this public to allow OffsetsApiIntegrationTest to use the 
latch?
   
   Yep, done 👍 
   
   > and do you think that maybe these connectors should be moved out of this 
test to a common reusable class?
   
   I do think this would be cleaner, but it'd fairly involved. Do you think 
it's alright to merge this as-is without blocking on that?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -350,13 +353,16 @@ private void assertRequestTimesOut(String 
requestDescription, ThrowingRunnable r
     }
 
     private static class Block {

Review Comment:
   > can you make this public to allow OffsetsApiIntegrationTest to use the 
latch?
   
   Yep, done 👍 
   
   > and do you think that maybe these connectors should be moved out of this 
test to a common reusable class?
   
   I do think this would be cleaner, but it'd fairly involved. Do you think 
it's alright to merge this as-is without blocking on that?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -139,7 +141,8 @@ public void setup() throws Exception {
     public void close() {
         // stop all Connect, Kafka and Zk threads.
         connect.stop();
-        Block.resetBlockLatch();
+        // unblock everything so that we don't leak threads after each test run
+        Block.reset();

Review Comment:
   It may be valuable to ensure that workers can shut down gracefully under 
these circumstances. Thoughts?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -368,31 +374,54 @@ private static ConfigDef config() {
                 );
         }
 
+        /**
+         * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+         * it will block.
+         */
         public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+            CountDownLatch awaitBlockLatch;
             synchronized (Block.class) {
-                if (blockLatch == null) {
-                    throw new IllegalArgumentException("No connector has been 
created yet");
-                }
+                awaitBlockLatch = Block.awaitBlockLatch;
+            }
+
+            if (awaitBlockLatch == null) {
+                throw new IllegalArgumentException("No connector has been 
created yet");
             }
 
             log.debug("Waiting for connector to block");
-            if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+            if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
                 throw new TimeoutException("Timed out waiting for connector to 
block.");

Review Comment:
   Right now it's a matter of writing test code carefully, with the assumption 
that if any connector or task instance has hit the block, it's the one we're 
interested in. So far I believe this holds for all the tests; let me know if 
you've found any exceptions, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to