[ 
https://issues.apache.org/jira/browse/KAFKA-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499337#comment-17499337
 ] 

Chris Egerton edited comment on KAFKA-13631 at 3/1/22, 5:26 AM:
----------------------------------------------------------------

Interestingly enough, it seemed initially like this was a duplicate of 
KAFKA-10873, but after taking a look at the provided log file, the root cause 
here appears to be completely different.

In KAFKA-10873, the warning message is logged because connectors/tasks that 
fail during startup are tracked by the {{DistributedHerder}} class, but not by 
the {{Worker}} class, so that when those connectors/tasks are revoked, the 
herder tells the worker to shut them down, and the worker responds by saying "I 
don't know what you're talking about, I don't own any of these".

In this issue, there's no evidence in the provided log file that the connector 
or its task failed to start. In fact, there's plenty of evidence that they were 
both healthy ("Source task finished initialization and start", "Sink task 
finished initialization and start", "Finished creating connector 
local-file-sink", "Finished creating connector local-file-source3", etc.).

I haven't had time to verify yet, but I wonder if there's redundant logic in 
the Connect framework for handling connector deletions that causes it to try 
stop deleted connectors and their tasks twice.

When a tombstone is picked up from the config topic, the herder makes note of 
that 
[here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1670]
 by adding the name of the connector to its {{connectorConfigUpdates}} field. 
Later on, the herder uses the contents of that field to 
[stop|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L633-L635]
 and then (if and only if the connector was reconfigured instead of deleted) 
restart the connector. We see evidence of this in the provided log file with 
this line:
{quote}[2022-01-28 10:52:42,222] INFO [Worker clientId=connect-1, 
groupId=connect-cluster] Handling connector-only config update by stopping 
connector local-file-sink 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:620)
{quote}
Additionally, when a connector or task is revoked from the herder (which 
usually happens as a result of a rebalance), it is also stopped 
([connector|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1917]
 , 
[task|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1925]).
 We see evidence that, shortly after the warning log message is emitted, the 
herder concluded this work:
{quote}[2022-01-28 10:52:42,247] WARN Ignoring stop request for unowned 
connector local-file-sink (org.apache.kafka.connect.runtime.Worker:385) 
[2022-01-28 10:52:42,247]

WARN Ignoring await stop request for non-present connector local-file-sink 
(org.apache.kafka.connect.runtime.Worker:410)
{quote}
{quote}(seven lines omitted)
{quote}
{quote}[2022-01-28 10:52:42,283] INFO [Worker clientId=connect-1, 
groupId=connect-cluster] Finished stopping tasks in preparation for rebalance
{quote}
Finally, it appears that deleted connectors and tasks are explicitly revoked 
during rebalance if the incremental cooperative protocol is used. The set of 
deleted connectors and tasks is calculated 
[here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L209],
 and then added to the list of connectors and tasks to explicitly revoke from 
workers in the cluster 
[here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L244],
 
[here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L317-L340],
 and finally 
[here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L307-L309].

 

In summary: haven't verified with unit or integration tests yet, but it looks 
like this is happening because workers try to stop deleted connectors and their 
tasks twice: once in response to reading a tombstone from the config topic for 
that connector, and a second time in response to explicit revocation performed 
by the leader during the ensuing rebalance.


was (Author: chrisegerton):
Interestingly enough, it seemed initially like this was a duplicate of 
KAFKA-10873, but after taking a look at the provided log file, the root cause 
here appears to be completely different.

In KAFKA-10873, the warning message is logged because connectors/tasks that 
fail during startup are tracked by the {{DistributedHerder}} class, but not by 
the {{Worker}} class, so that when those connectors/tasks are revoked, the 
herder tells the worker to shut them down, and the worker responds by saying "I 
don't know what you're talking about, I don't own any of these".

In this issue, there's no evidence in the provided log file that the connector 
or its task failed to start. In fact, there's plenty of evidence that they were 
both healthy ("Source task finished initialization and start", "Sink task 
finished initialization and start", "Finished creating connector 
local-file-sink", "Finished creating connector local-file-source3", etc.).

I haven't had time to verify yet, but I wonder if there's redundant logic in 
the Connect framework for handling connector deletions that causes it to try 
stop deleted connectors and their tasks twice.

When a tombstone is picked up from the config topic, the herder makes note of 
that 
[here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1670]
 by adding the name of the connector to its {{connectorConfigUpdates}} field. 
Later on, the herder uses the contents of that field to 
[stop|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L633-L635]
 and then (if and only if the connector was reconfigured instead of deleted) 
restart the connector. We see evidence of this in the provided log file with 
this line:
{quote}[2022-01-28 10:52:42,222] INFO [Worker clientId=connect-1, 
groupId=connect-cluster] Handling connector-only config update by stopping 
connector local-file-sink 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:620){quote}
Additionally, when a connector or task is revoked from the herder (which 
usually happens as a result of a rebalance), it is also stopped 
([connector|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1917]
 , 
[task|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1925]).
 We see evidence that, shortly after the warning log message is emitted, the 
herder concluded this work:
{quote}[2022-01-28 10:52:42,247] WARN Ignoring stop request for unowned 
connector local-file-sink (org.apache.kafka.connect.runtime.Worker:385) 
[2022-01-28 10:52:42,247] {quote}
{quote}WARN Ignoring await stop request for non-present connector 
local-file-sink (org.apache.kafka.connect.runtime.Worker:410){quote}
{quote}(seven lines omitted){quote}
{quote}[2022-01-28 10:52:42,283] INFO [Worker clientId=connect-1, 
groupId=connect-cluster] Finished stopping tasks in preparation for 
rebalance{quote}
Finally, it appears that deleted connectors and tasks are explicitly revoked 
during rebalance if the incremental cooperative protocol is used. The set of 
deleted connectors and tasks is calculated 
[here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L209],
 and then added to the list of connectors and tasks to explicitly revoke from 
workers in the cluster 
[here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L244],
 
[here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L317-L340],
 and finally 
[here|https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L307-L309].

 

In summary: haven't verified with unit or integration tests yet, but it looks 
like this is happening because workers try to stop deleted connectors and their 
tasks twice: once in response to reading a tombstone from the config topic for 
that connector, and a second time in response to explicit revocation performed 
by the leader during the ensuing rebalance.

> Warning "Ignoring await stop request for non-present connector..." when 
> shutting down connector via API in distributed mode
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13631
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13631
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.8.1
>            Reporter: Simon Schall
>            Priority: Minor
>         Attachments: connect-1.log
>
>
> * S{color:#172b4d}etup: kafka with version 2.8.1 with scala version 
> 2.13{color}
>  * {color:#172b4d}Start zookeeper, kafka server and 
> kafka-connect-distributed{color}
>  * {color:#172b4d}Create a FileStreamSinkConnector (e.g. via the following 
> request:{color}
> {code:java}
>  curl --location --request PUT 
> 'localhost:8083/connectors/local-file-sink/config'  \
> --header 'Content-Type: application/json' \
> --data-raw '{
> "name":"local-file-sink",
> "connector.class":"FileStreamSink",
> "tasks.max":1,
> "file":"test.sink.txt",
> "topics":"connect-test"
> }'{code}
>  * {color:#172b4d}shutdown the connector via the api as followed:{color}
> {code:java}
> curl --location --request DELETE 'localhost:8083/connectors/local-file-sink/'
> {code}
>  
> In the log of connect-distributed will appear the following warnings (for 
> complete log see attachement, warning appears in l.2103ff. ):
> {color:#172b4d}[2022-01-28 10:52:42,246] INFO Stopping connector 
> local-file-sink (org.apache.kafka.connect.runtime.Worker:382)
> [2022-01-28 10:52:42,247] WARN Ignoring stop request for unowned connector 
> local-file-sink (org.apache.kafka.connect.runtime.Worker:385)
> [2022-01-28 10:52:42,247] WARN Ignoring await stop request for non-present 
> connector local-file-sink (org.apache.kafka.connect.runtime.Worker:410){color}
>  
> {color:#172b4d}In the log it looks like, it is tried twice to stop the 
> connector. The first attempt is successful (log l.2096ff.), but the second 
> attempt causes the warning, as the connector is already stopped. It is not 
> traceable for us why the connector is tried to stop twice, but it seems to be 
> wrong.{color}
> {color:#172b4d}Furthermore, the problem also occurs with any other connector 
> we tried so far, so it is not specific to the FileStreamSinkConnector.{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to