gharris1727 commented on code in PR #16095: URL: https://github.com/apache/kafka/pull/16095#discussion_r1621349710
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ########## @@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) { if (this.state == State.FAILED) return; + // Call stop() on the connector to release its resources. Connector + // could fail in the start() method, which is why we call stop() on + // INIT state as well. + if (this.state == State.STARTED || this.state == State.INIT) + connector.stop(); Review Comment: This is a potentially blocking call to the connector, and I don't think that's a good fit for this onFailure handler. This call would delay the statusListener call, which delays notifying the REST API of the FAILED status and updating the metrics. If it blocks indefinitely, the status and metrics are never updated. There is a connector.stop() call in doShutdown that could be changed to execute for the INIT and FAILED states. That would leave the resources allocated while the connector is waiting in the FAILED state, but would at least ensure they don't leak long-term. We may also change the control flow to make the transition to the FAILED state trigger doShutdown early, rather than having it wait() with all the resources still allocated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org