C0urante commented on code in PR #12802: URL: https://github.com/apache/kafka/pull/12802#discussion_r1047862099
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1645,6 +1646,8 @@ private void startAndStop(Collection<Callable<Void>> callables) { startAndStopExecutor.invokeAll(callables); } catch (InterruptedException e) { // ignore + } catch (RejectedExecutionException re) { + log.error("startAndStopExecutor already shutdown or full. Not invoking explicit connector/task shutdown"); Review Comment: Thank you for looking into this. It's these kind of details that make maintaining this part of the code base tricky, and I really appreciate the time you've put into diving deep on this one! I've revisited some of the logic here and it seems like the assumption that the cleanup in `halt` (where we invoke, among other things, `member::stop`) will be allowed to proceed cleanly if we just handle a `RejectedExecutionException` being thrown in `startAndStop` is not quite accurate. The only way to externally trigger worker shutdown is to begin to terminate the JVM (usually done via `SIGINT`, or ctrl+C). At this point, threads are not given time to gracefully terminate (even if they are non-daemon threads); instead, the only thing that's guaranteed to take place before the JVM exiting is the execution of any shutdown hooks that have been added. In this case, we add a [shutdown hook in the `Connect` class](https://github.com/apache/kafka/blob/67c72596afe58363eceeb32084c5c04637a33831/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java#L95-L105) that [shuts down the REST server and invokes `Herder::stop`](https://github.com/apache/kafka/blob/67c72596afe58363eceeb32084c5c04637a33831/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java#L68-L69). So, even if we catch a `RejectedExecutionException` and handle that gracefully, there's still a good chance that the JVM will terminate before we can successfully invoke (among other things) `Worker::stop`. With that in mind, here's my proposal: 1. Modify the shutdown logic in `halt` to use `Worker::stopAndAwaitConnectors` and `Worker::stopAndAwaitTasks` (the no-args variants of each), instead of using `startAndStop` at all. This will make it easier to reason about how long graceful shutdown should take, and reduce the risk of a `rejectedExecutionException` being thrown 2. Increase the timeout for `herderExecutor::awaitTermination` to a value large enough to accommodate a read to the end of the config topic (which is currently bounded by `workerSyncTimeoutMs`), plus successfully attempting to stop all connectors and tasks (which should now be bounded by 5 seconds (the hardcoded graceful shutdown timeout for connectors) plus `workerTasksShutdownTimeoutMs`) 3. Add a `catch` clause to the body of `startAndStop` that does swallow a `RejectedExecutionException` and just log a `DEBUG`-level message in that case, but only if we're already in the process of shutdown (i.e., `stopping.get()` is `true`), and throws the exception otherwise The idea here is to still attempt to gracefully shut down the worker, but also handle the edge case where we time out while trying to do so and the herder tick thread attempts to access a now-closed resource. Do you think this addresses the issue sufficiently? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org