C0urante commented on code in PR #12290:
URL: https://github.com/apache/kafka/pull/12290#discussion_r1237612378


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java:
##########
@@ -401,23 +430,29 @@ public Block(Map<String, String> props) {
 
         public Block(String block) {
             this.block = block;
-            synchronized (Block.class) {
-                if (blockLatch != null) {
-                    blockLatch.countDown();
+            if (block != null) {
+                synchronized (Block.class) {
+                    resetAwaitBlockLatch();
+                    awaitBlockLatch = new CountDownLatch(1);
                 }
-                blockLatch = new CountDownLatch(1);
             }
         }
 
         public void maybeBlockOn(String block) {
             if (block.equals(this.block)) {
                 log.info("Will block on {}", block);
-                blockLatch.countDown();
+                CountDownLatch blockLatch;
+                synchronized (Block.class) {
+                    awaitBlockLatch.countDown();
+                    blockLatch = newBlockLatch();
+                }
                 while (true) {
                     try {
-                        Thread.sleep(Long.MAX_VALUE);
+                        blockLatch.await();
+                        log.debug("Instructed to stop blocking; will resume 
normal execution");
+                        return;
                     } catch (InterruptedException e) {
-                        // No-op. Just keep blocking.
+                        log.debug("Interrupted while blocking; will continue 
blocking until instructed to stop");
                     }
                 }

Review Comment:
   Sort of--this prevents the `Worker` class's `executor` field from shutting 
down gracefully (i.e., when we invoke `awaitTermination` on it in 
`ThreadUtils::shutdownExecutorServiceQuietly`), but it doesn't prevent the 
Connect worker from shutting down, since we put a bound on how long we wait for 
the executor to shut down before moving on.
   
   This is why the tests on trunk (which also have this kind of `while (true)` 
loop to simulate connector/task blocks) don't hang.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to