viktorsomogyi commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1234081443


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -350,13 +353,16 @@ private void assertRequestTimesOut(String 
requestDescription, ThrowingRunnable r
     }
 
     private static class Block {
-        private static CountDownLatch blockLatch;
+        // All latches that blocking connectors/tasks are or will be waiting 
on during a test case
+        private static final Set<CountDownLatch> BLOCK_LATCHES = new 
HashSet<>();
+        // The latch that can be used to wait for a connector/task to reach 
the most-recently-registered blocking point
+        private static CountDownLatch awaitBlockLatch;
 
         private final String block;
 
         public static final String BLOCK_CONFIG = "block";
 
-        private static ConfigDef config() {
+        public static ConfigDef config() {

Review Comment:
   nit: The default (no modifier) should be sufficient. Any particular reasons 
for making this public?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -368,31 +374,54 @@ private static ConfigDef config() {
                 );
         }
 
+        /**
+         * {@link CountDownLatch#await() Wait} for the connector/task to reach 
the point in its lifecycle where
+         * it will block.
+         */
         public static void waitForBlock() throws InterruptedException, 
TimeoutException {
+            CountDownLatch awaitBlockLatch;
             synchronized (Block.class) {
-                if (blockLatch == null) {
-                    throw new IllegalArgumentException("No connector has been 
created yet");
-                }
+                awaitBlockLatch = Block.awaitBlockLatch;
+            }
+
+            if (awaitBlockLatch == null) {
+                throw new IllegalArgumentException("No connector has been 
created yet");
             }
 
             log.debug("Waiting for connector to block");
-            if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+            if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
                 throw new TimeoutException("Timed out waiting for connector to 
block.");
             }
             log.debug("Connector should now be blocked");
         }
 
-        // Note that there is only ever at most one global block latch at a 
time, which makes tests that
+        /**
+         * {@link CountDownLatch#countDown() Release} any latches allocated 
over the course of a test
+         * to either await a connector/task reaching a blocking point, or 
cause a connector/task to block.
+         */
+        public static synchronized void reset() {

Review Comment:
   nit: no need for `public`



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -401,23 +430,29 @@ public Block(Map<String, String> props) {
 
         public Block(String block) {
             this.block = block;
-            synchronized (Block.class) {
-                if (blockLatch != null) {
-                    blockLatch.countDown();
+            if (block != null) {
+                synchronized (Block.class) {
+                    resetAwaitBlockLatch();
+                    awaitBlockLatch = new CountDownLatch(1);
                 }
-                blockLatch = new CountDownLatch(1);
             }
         }
 
         public void maybeBlockOn(String block) {
             if (block.equals(this.block)) {
                 log.info("Will block on {}", block);
-                blockLatch.countDown();
+                CountDownLatch blockLatch;
+                synchronized (Block.class) {
+                    awaitBlockLatch.countDown();
+                    blockLatch = newBlockLatch();
+                }
                 while (true) {
                     try {
-                        Thread.sleep(Long.MAX_VALUE);
+                        blockLatch.await();
+                        log.debug("Instructed to stop blocking; will resume 
normal execution");
+                        return;
                     } catch (InterruptedException e) {
-                        // No-op. Just keep blocking.
+                        log.debug("Interrupted while blocking; will continue 
blocking until instructed to stop");
                     }
                 }

Review Comment:
   Wouldn't this while block prevent the normal shutdown of connect based on 
the order in `BlockingConnectorTest` (you call `connect.stop` and then 
`Block.reset`)? For instance the way Worker is shutting down they expect 
WorkerConnectors to respond to an interrupt.
   
   Code reference:
   
https://github.com/apache/kafka/blob/6f7682d2f4ecc8110f80cb6301de02f512d36a53/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L267



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to