C0urante commented on code in PR #12802:
URL: https://github.com/apache/kafka/pull/12802#discussion_r1047862099


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1645,6 +1646,8 @@ private void startAndStop(Collection<Callable<Void>> 
callables) {
             startAndStopExecutor.invokeAll(callables);
         } catch (InterruptedException e) {
             // ignore
+        } catch (RejectedExecutionException re) {
+            log.error("startAndStopExecutor already shutdown or full. Not 
invoking explicit connector/task shutdown");

Review Comment:
   Thank you for looking into this. It's these kind of details that make 
maintaining this part of the code base tricky, and I really appreciate the time 
you've put into diving deep on this one!
   
   I've revisited some of the logic here and it seems like the assumption that 
the cleanup in `halt` (where we invoke, among other things, `member::stop`) 
will be allowed to proceed cleanly if we just handle a 
`RejectedExecutionException` being thrown in `startAndStop` is not quite 
accurate.
   
   The only way to externally trigger worker shutdown is to begin to terminate 
the JVM (usually done via `SIGINT`, or ctrl+C). At this point, threads are not 
given time to gracefully terminate (even if they are non-daemon threads); 
instead, the only thing that's guaranteed to take place before the JVM exiting 
is the execution of any shutdown hooks that have been added. In this case, we 
add a [shutdown hook in the `Connect` 
class](https://github.com/apache/kafka/blob/67c72596afe58363eceeb32084c5c04637a33831/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java#L95-L105)
 that [shuts down the REST server and invokes 
`Herder::stop`](https://github.com/apache/kafka/blob/67c72596afe58363eceeb32084c5c04637a33831/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java#L68-L69).
   
   So, even if we catch a `RejectedExecutionException` and handle that 
gracefully, there's still a good chance that the JVM will terminate before we 
can successfully invoke (among other things) `Worker::stop`.
   
   
   With that in mind, here's my proposal:
   
   1. Modify the shutdown logic in `halt` to use 
`Worker::stopAndAwaitConnectors` and `Worker::stopAndAwaitTasks` (the no-args 
variants of each), instead of using `startAndStop` at all. This will make it 
easier to reason about how long graceful shutdown should take, and reduce the 
risk of a `rejectedExecutionException` being thrown
   2. Increase the timeout for `herderExecutor::awaitTermination` to a value 
large enough to accommodate a read to the end of the config topic (which is 
currently bounded by `workerSyncTimeoutMs`), plus successfully attempting to 
stop all connectors and tasks (which should now be bounded by 5 seconds (the 
hardcoded graceful shutdown timeout for connectors) plus 
`workerTasksShutdownTimeoutMs`)
   3. Add a `catch` clause to the body of `startAndStop` that does swallow a 
`RejectedExecutionException` and just log a `DEBUG`-level message in that case, 
but only if we're already in the process of shutdown (i.e., `stopping.get()` is 
`true`), and throws the exception otherwise
   
   The idea here is to still attempt to gracefully shut down the worker, but 
also handle the edge case where we time out while trying to do so and the 
herder tick thread attempts to access a now-closed resource.
   
   Do you think this addresses the issue sufficiently?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to