viktorsomogyi commented on code in PR #12290: URL: https://github.com/apache/kafka/pull/12290#discussion_r1234081443
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ########## @@ -350,13 +353,16 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r } private static class Block { - private static CountDownLatch blockLatch; + // All latches that blocking connectors/tasks are or will be waiting on during a test case + private static final Set<CountDownLatch> BLOCK_LATCHES = new HashSet<>(); + // The latch that can be used to wait for a connector/task to reach the most-recently-registered blocking point + private static CountDownLatch awaitBlockLatch; private final String block; public static final String BLOCK_CONFIG = "block"; - private static ConfigDef config() { + public static ConfigDef config() { Review Comment: nit: The default (no modifier) should be sufficient. Any particular reasons for making this public? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ########## @@ -368,31 +374,54 @@ private static ConfigDef config() { ); } + /** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { + CountDownLatch awaitBlockLatch; synchronized (Block.class) { - if (blockLatch == null) { - throw new IllegalArgumentException("No connector has been created yet"); - } + awaitBlockLatch = Block.awaitBlockLatch; + } + + if (awaitBlockLatch == null) { + throw new IllegalArgumentException("No connector has been created yet"); } log.debug("Waiting for connector to block"); - if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); } log.debug("Connector should now be blocked"); } - // Note that there is only ever at most one global block latch at a time, which makes tests that + /** + * {@link CountDownLatch#countDown() Release} any latches allocated over the course of a test + * to either await a connector/task reaching a blocking point, or cause a connector/task to block. + */ + public static synchronized void reset() { Review Comment: nit: no need for `public` ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java: ########## @@ -401,23 +430,29 @@ public Block(Map<String, String> props) { public Block(String block) { this.block = block; - synchronized (Block.class) { - if (blockLatch != null) { - blockLatch.countDown(); + if (block != null) { + synchronized (Block.class) { + resetAwaitBlockLatch(); + awaitBlockLatch = new CountDownLatch(1); } - blockLatch = new CountDownLatch(1); } } public void maybeBlockOn(String block) { if (block.equals(this.block)) { log.info("Will block on {}", block); - blockLatch.countDown(); + CountDownLatch blockLatch; + synchronized (Block.class) { + awaitBlockLatch.countDown(); + blockLatch = newBlockLatch(); + } while (true) { try { - Thread.sleep(Long.MAX_VALUE); + blockLatch.await(); + log.debug("Instructed to stop blocking; will resume normal execution"); + return; } catch (InterruptedException e) { - // No-op. Just keep blocking. + log.debug("Interrupted while blocking; will continue blocking until instructed to stop"); } } Review Comment: Wouldn't this while block prevent the normal shutdown of connect based on the order in `BlockingConnectorTest` (you call `connect.stop` and then `Block.reset`)? For instance the way Worker is shutting down they expect WorkerConnectors to respond to an interrupt. Code reference: https://github.com/apache/kafka/blob/6f7682d2f4ecc8110f80cb6301de02f512d36a53/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L267 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org