Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]
zzzk1 commented on PR #16451: URL: https://github.com/apache/kafka/pull/16451#issuecomment-2226903656 @C0urante Improved code based on your suggestion, ptal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]
C0urante commented on code in PR #16451: URL: https://github.com/apache/kafka/pull/16451#discussion_r1676172863 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java: ## @@ -47,6 +50,11 @@ public Connect(H herder, ConnectRestServer rest) { shutdownHook = new ShutdownHook(); } +// public for testing +public Future getDistributedHerderFuture() { Review Comment: Nit: We don't use the `get` prefix in this codebase. Also, this method can technically be called for more than just distributed workers. Maybe we could call it `herderTask()` and note in the Javadocs that it will return null if the herder type doesn't have a separate work thread? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -232,6 +233,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final DistributedConfig config; +// visible for testing +public Future herderFuture; Review Comment: This field should still be `private`, and only exposed via a getter: ```java private Future herderFuture; ... public Future herderFuture() { return herderFuture; } ``` Otherwise, it could be modified by other classes, which we almost certainly do not want. ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java: ## @@ -152,11 +156,20 @@ public void testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I "Body did not contain expected message detailing the worker's in-progress operation: " + body ); } + connect.resetRequestTimeout(); +Future future = worker.getDistributedHerderFuture(); + +try { +future.get(5000, TimeUnit.MILLISECONDS); +} catch (TimeoutException | ExecutionException exception) { +log.error("Failed to start a worker:", exception); +future.cancel(true); +} Review Comment: Few things: 1. We don't need or want to cancel the future; resource cleanup should take place when we shut down the cluster at the end of the test (see [here](https://github.com/apache/kafka/blob/0ada8fac6869cad8ac33a79032cf5d57bfa2a3ea/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java#L61-L65)) 2. We should fail the test on a `TimeoutException`, since it indicates that the worker hasn't failed to startup yet 3. We don't need to log anything, especially not at `ERROR` level, if we get an `ExecutionException` 4. We should probably go with a higher timeout; our CI can be really slow sometimes and if we do something too low it might cause flakiness ```suggestion assertThrows( ExecutionException.class, () -> future.get(1, TimeUnit.MINUTES) ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]
zzzk1 commented on code in PR #16451: URL: https://github.com/apache/kafka/pull/16451#discussion_r1672424490 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -231,6 +232,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { Thread herderThread; private final DistributedConfig config; + +public static Future herderFuture; Review Comment: I have Imporved this place, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]
C0urante commented on code in PR #16451: URL: https://github.com/apache/kafka/pull/16451#discussion_r1667658373 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -231,6 +232,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { Thread herderThread; private final DistributedConfig config; + +public static Future herderFuture; Review Comment: I don't think this can be `static`; if it is, we won't be able to use it with multiple workers in a single integration test (which we do very frequently). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]
C0urante commented on code in PR #16451: URL: https://github.com/apache/kafka/pull/16451#discussion_r1664292432 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -363,7 +364,17 @@ public DistributedHerder(DistributedConfig config, @Override public void start() { -this.herderExecutor.submit(this); +Future future = this.herderExecutor.submit(this); +try { +future.get(6, TimeUnit.SECONDS); +} catch (TimeoutException timeoutException) { +log.error("herder work thread timeout:", timeoutException); +future.cancel(true); +} catch (InterruptedException interruptedException) { +Thread.currentThread().interrupt(); +} catch (ExecutionException executionException) { +log.error("herder work thread execution exception:", executionException); Review Comment: We don't want to change `start` to block. Can we save the future in a field in this class, then expose something like a `Future herderTask()` method that allows testing code to access that future and, if desired, block on it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]
zzzk1 opened a new pull request, #16451: URL: https://github.com/apache/kafka/pull/16451 Synchronously verify worker status when created ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org