[ https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867047#comment-17867047 ]
Bhagyashree edited comment on KAFKA-17044 at 7/18/24 4:12 PM: -------------------------------------------------------------- [~ChrisEgerton] , I agree there are ways to make start method of the connector finish faster. But as part of this JIRA, what I wanted to convey is that the shutdown method relies on connector startup to finish. If a DELETE call is made, the call is not honoured until the connector start completes. Taking example of the same connector where this is seen, this is JDBC source ([https://github.com/confluentinc/kafka-connect-jdbc/tree/master)|https://github.com/confluentinc/kafka-connect-jdbc/tree/master] connector. If you check the [stop|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L219C9-L219C33] method of the connector, the method tries to update a [static member|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L106] of the class which in turn is expected to [stop the retries|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L80-L101]. How I see is that the connector is written to honour any shutdown attempts made in middle of the retries but the connector's [stop()|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L210] is not getting called by runtime. I found it similar to https://issues.apache.org/jira/browse/KAFKA-14725 but with connector. Let me know your thoughts. was (Author: JIRAUSER305984): [~ChrisEgerton] , I agree there are ways to make start method of the connector finish faster. But as part of this JIRA, what I wanted to convey is that the shutdown method relies on connector startup to finish. If a DELETE call is made, the call is not honoured until the connector start completes. Taking example of the same connector where this is seen, this is JDBC source[https://github.com/confluentinc/kafka-connect-jdbc/tree/master] connector. If you check the [stop|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L219C9-L219C33] method of the connector, the method tries to update a [static member|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L106] of the class which in turn is expected to [stop the retries|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L80-L101]. How I see is that the connector is written to honour any shutdown attempts made in middle of the retries but the connector's [stop()|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L210] is not getting called by runtime. I found it similar to https://issues.apache.org/jira/browse/KAFKA-14725 but with connector. Let me know your thoughts. > Connector deletion can lead to resource leak during a long running connector > startup > ------------------------------------------------------------------------------------ > > Key: KAFKA-17044 > URL: https://issues.apache.org/jira/browse/KAFKA-17044 > Project: Kafka > Issue Type: Bug > Components: connect > Reporter: Bhagyashree > Priority: Major > > We have identified a gap in the shutdown flow for the connector worker. If > the connector is in > [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404] > state and still executing the > [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218] > method, a DELETE API call would invoke the > [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298] > and [notify() > |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but > the connector worker would not shutdown immediately. This happens because > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > is a blocking call and the control reaches > [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176] > in > [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151] > after the > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > call has completed. This results in a gap in the delete flow where the > connector is not immediately shutdown leaving the resources running. > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > keeps running and only when the execution of > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > completes, we reach at the point of > [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176] > and then > [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183] > of the connector worker is invoked. > This seems similar to what has been identified for connector tasks as part of > https://issues.apache.org/jira/browse/KAFKA-14725. > *Steps to repro* > 1. Start a connector with time taking operation in > [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > call > 2. Call DELETE API to delete this connector > 3. The connector would be deleted only after the > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > completes. > The issue was observed when a connector was configured to retry a db > connection for sometime. > {*}Current Behaviour{*}: The connector did not shutdown until the > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > method completed. > {*}Expected Behaviou{*}r: The connector should abort what it is doing and > shutdown as requested by the Delete call. -- This message was sent by Atlassian Jira (v8.20.10#820010)