gharris1727 commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1621349710


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) {
         if (this.state == State.FAILED)
             return;
 
+        // Call stop() on the connector to release its resources. Connector
+        // could fail in the start() method, which is why we call stop() on
+        // INIT state as well.
+        if (this.state == State.STARTED || this.state == State.INIT)
+            connector.stop();

Review Comment:
   This is a potentially blocking call to the connector, and I don't think 
that's a good fit for this onFailure handler. This call would delay the 
statusListener call, which delays notifying the REST API of the FAILED status 
and updating the metrics. If it blocks indefinitely, the status and metrics are 
never updated.
   
   There is a connector.stop() call in doShutdown that could be changed to 
execute for the INIT and FAILED states. That would leave the resources 
allocated while the connector is waiting in the FAILED state, but would at 
least ensure they don't leak long-term.
   
   We may also change the control flow to make the transition to the FAILED 
state trigger doShutdown early, rather than having it wait() with all the 
resources still allocated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to