[ 
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867047#comment-17867047
 ] 

Bhagyashree edited comment on KAFKA-17044 at 7/18/24 4:12 PM:
--------------------------------------------------------------

[~ChrisEgerton] , I agree there are ways to make start method of the connector 
finish faster. But as part of this JIRA, what I wanted to convey is that the 
shutdown method relies on connector startup to finish. If a DELETE call is 
made, the call is not honoured until the connector start completes. 
Taking example of the same connector where this is seen, this is JDBC source 
([https://github.com/confluentinc/kafka-connect-jdbc/tree/master)|https://github.com/confluentinc/kafka-connect-jdbc/tree/master]
 connector. If you check the 
[stop|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L219C9-L219C33]
 method of the connector, the method tries to update a [static 
member|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L106]
 of the class which in turn is expected to [stop the 
retries|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L80-L101].
 How I see is that the connector is written to honour any shutdown attempts 
made in middle of the retries but the connector's 
[stop()|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L210]
  is not getting called by runtime.
I found it similar to https://issues.apache.org/jira/browse/KAFKA-14725 but 
with connector. 
Let me know your thoughts. 


was (Author: JIRAUSER305984):
[~ChrisEgerton] , I agree there are ways to make start method of the connector 
finish faster. But as part of this JIRA, what I wanted to convey is that the 
shutdown method relies on connector startup to finish. If a DELETE call is 
made, the call is not honoured until the connector start completes. 
Taking example of the same connector where this is seen, this is JDBC 
source[https://github.com/confluentinc/kafka-connect-jdbc/tree/master] 
connector. If you check the 
[stop|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L219C9-L219C33]
 method of the connector, the method tries to update a [static 
member|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L106]
 of the class which in turn is expected to [stop the 
retries|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L80-L101].
 How I see is that the connector is written to honour any shutdown attempts 
made in middle of the retries but the connector's 
[stop()|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L210]
  is not getting called by runtime.
I found it similar to https://issues.apache.org/jira/browse/KAFKA-14725 but 
with connector. 
Let me know your thoughts. 

> Connector deletion can lead to resource leak during a long running connector 
> startup
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-17044
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17044
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>            Reporter: Bhagyashree
>            Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If 
> the connector is in 
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
>  state and still executing the 
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
>  method, a DELETE API call would invoke the 
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
>  and [notify() 
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
>  the connector worker would not shutdown immediately. This happens because 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  is a blocking call and the control reaches 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  in 
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
>  after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call has completed. This results in a gap in the delete flow where the 
> connector is not immediately shutdown leaving the resources running. 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  keeps running and only when the execution of 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes, we reach at the point of 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  and then 
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
>  of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of 
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a connector with time taking operation in 
> [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call
> 2. Call DELETE API to delete this connector
> 3. The connector would be deleted only after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes.
> The issue was observed when a connector was configured to retry a db 
> connection for sometime. 
> {*}Current Behaviour{*}: The connector did not shutdown until the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  method completed.
> {*}Expected Behaviou{*}r: The connector should abort what it is doing and 
> shutdown as requested by the Delete call.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to