[ https://issues.apache.org/jira/browse/KAFKA-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499337#comment-17499337 ]
Chris Egerton commented on KAFKA-13631: --------------------------------------- Interestingly enough, it seemed initially like this was a duplicate of KAFKA-10873, but after taking a look at the provided log file, the root cause here appears to be completely different. In KAFKA-10873, the warning message is logged because connectors/tasks that fail during startup are tracked by the {{DistributedHerder}} class, but not by the {{Worker}} class, so that when those connectors/tasks are revoked, the herder tells the worker to shut them down, and the worker responds by saying "I don't know what you're talking about, I don't own any of these". In this issue, there's no evidence in the provided log file that the connector or its task failed to start. In fact, there's plenty of evidence that they were both healthy ("Source task finished initialization and start", "Sink task finished initialization and start", "Finished creating connector local-file-sink", "Finished creating connector local-file-source3", etc.). I haven't had time to verify yet, but I wonder if there's redundant logic in the Connect framework for handling connector deletions that causes it to try stop deleted connectors and their tasks twice. When a tombstone is picked up from the config topic, the herder makes note of that [here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1670] by adding the name of the connector to its {{connectorConfigUpdates}} field. Later on, the herder uses the contents of that field to [stop|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L633-L635] and then (if and only if the connector was reconfigured instead of deleted) restart the connector. We see evidence of this in the provided log file with this line: {quote}[2022-01-28 10:52:42,222] INFO [Worker clientId=connect-1, groupId=connect-cluster] Handling connector-only config update by stopping connector local-file-sink (org.apache.kafka.connect.runtime.distributed.DistributedHerder:620){quote} Additionally, when a connector or task is revoked from the herder (which usually happens as a result of a rebalance), it is also stopped ([connector|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1917] , [task|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1925]). We see evidence that, shortly after the warning log message is emitted, the herder concluded this work: {quote}[2022-01-28 10:52:42,247] WARN Ignoring stop request for unowned connector local-file-sink (org.apache.kafka.connect.runtime.Worker:385) [2022-01-28 10:52:42,247] {quote} {quote}WARN Ignoring await stop request for non-present connector local-file-sink (org.apache.kafka.connect.runtime.Worker:410){quote} {quote}(seven lines omitted){quote} {quote}[2022-01-28 10:52:42,283] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished stopping tasks in preparation for rebalance{quote} Finally, it appears that deleted connectors and tasks are explicitly revoked during rebalance if the incremental cooperative protocol is used. The set of deleted connectors and tasks is calculated [here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L209], and then added to the list of connectors and tasks to explicitly revoke from workers in the cluster [here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L244], [here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L317-L340], and finally [here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L307-L309]. In summary: haven't verified with unit or integration tests yet, but it looks like this is happening because workers try to stop deleted connectors and their tasks twice: once in response to reading a tombstone from the config topic for that connector, and a second time in response to explicit revocation performed by the leader during the ensuing rebalance. > Warning "Ignoring await stop request for non-present connector..." when > shutting down connector via API in distributed mode > --------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-13631 > URL: https://issues.apache.org/jira/browse/KAFKA-13631 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.8.1 > Reporter: Simon Schall > Priority: Minor > Attachments: connect-1.log > > > * S{color:#172b4d}etup: kafka with version 2.8.1 with scala version > 2.13{color} > * {color:#172b4d}Start zookeeper, kafka server and > kafka-connect-distributed{color} > * {color:#172b4d}Create a FileStreamSinkConnector (e.g. via the following > request:{color} > {code:java} > curl --location --request PUT > 'localhost:8083/connectors/local-file-sink/config' \ > --header 'Content-Type: application/json' \ > --data-raw '{ > "name":"local-file-sink", > "connector.class":"FileStreamSink", > "tasks.max":1, > "file":"test.sink.txt", > "topics":"connect-test" > }'{code} > * {color:#172b4d}shutdown the connector via the api as followed:{color} > {code:java} > curl --location --request DELETE 'localhost:8083/connectors/local-file-sink/' > {code} > > In the log of connect-distributed will appear the following warnings (for > complete log see attachement, warning appears in l.2103ff. ): > {color:#172b4d}[2022-01-28 10:52:42,246] INFO Stopping connector > local-file-sink (org.apache.kafka.connect.runtime.Worker:382) > [2022-01-28 10:52:42,247] WARN Ignoring stop request for unowned connector > local-file-sink (org.apache.kafka.connect.runtime.Worker:385) > [2022-01-28 10:52:42,247] WARN Ignoring await stop request for non-present > connector local-file-sink (org.apache.kafka.connect.runtime.Worker:410){color} > > {color:#172b4d}In the log it looks like, it is tried twice to stop the > connector. The first attempt is successful (log l.2096ff.), but the second > attempt causes the warning, as the connector is already stopped. It is not > traceable for us why the connector is tried to stop twice, but it seems to be > wrong.{color} > {color:#172b4d}Furthermore, the problem also occurs with any other connector > we tried so far, so it is not specific to the FileStreamSinkConnector.{color} -- This message was sent by Atlassian Jira (v8.20.1#820001)