[ 
https://issues.apache.org/jira/browse/KAFKA-15675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810057#comment-17810057
 ] 

Chris Egerton commented on KAFKA-15675:
---------------------------------------

I've done some analysis on this one and believe I've found the root cause. It's 
a confluence of a few different issues, but the TL;DR is: *the request to 
{{POST /connectors/<connector>/restart?onlyFailed=false&includeTasks=false}} 
fails with a 409 error, this does not cause the test to (immediately) fail, but 
the connector is never restarted, which causes the test to time out while 
[waiting for the connector to be 
stopped|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L272-L275].*

 

This kind of scenario probably raises several questions. Here's my best attempt 
to anticipate and address them:

 

*Why does the 409 response not cause the test to immediately fail?*

It's unclear on the original rationale for this, but the code structure 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L374-L383]
 is fairly clear: issue the request, and if the status code is less than 400, 
attempt to deserialize the body. Then, unconditionally, return either null or 
the deserialized response body.

 

*Why is the 409 response occurring?*

The cluster (or, to be more specific, either the worker that received the 
initial REST request or, if the request was forwarded, the leader) detected 
that a rebalance due to an added/removed connector or new task configs was 
about to take place, and rejected the request. See the {{DistributedHerder}} 
class's 
[restartConnectorAndTasks|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1467]
 and 
[checkRebalanceNeeded|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2302-L2307]
 methods for the logic to check for pending rebalances, and its logic for 
detecting pending rebalances 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2385],
 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2400],
 and 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2419].

 

*Why is a rebalance pending by the time we try to restart the connector? 
Shouldn't the cluster and the set of connectors and tasks on it be stable by 
this point?*

Yes, the cluster and set of connectors and tasks on it should be stable by the 
time we issue our restart request. We check to make sure that [every worker in 
the cluster is up and 
running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L116-L117]
 before proceeding with the rest of the test, and that the [connector and 
expected number of tasks are 
running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L252-L253]
 before issuing the restart request. Unfortunately, the former check–for worker 
liveness across the cluster–does not guarantee that every worker has joined the 
cluster. This check is [performed by issuing a request to the root 
resource|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L956-L975]
 ({{{}GET /{}}}) for each worker: if the response is valid (i.e., its body 
matches the expected format), then the worker is considered up and running. 
However, this does not guarantee that the worker has actually completed 
startup: it may not have finished reading to the end of internal topics, or had 
a chance to contact the group coordinator and join the cluster yet.

 

After examining the logs of one test case, it appeared that the following 
sequence of events took place:
 # A single worker completes startup (creates and reads to the end of internal 
topics, then joins the cluster)
 # The connector is created (by chance, the REST request to create the 
connector happens to be sent to the only worker that has completed startup so 
far)
 # The connector is assigned to the only worker currently in the cluster
 # The connector generates task configs
 # The tasks for that connector are assigned to the only worker currently in 
the cluster
 # The other, more sluggish, workers in the cluster detect the new connector 
and/or task configs, and realize that a rebalance is pending
 # An attempt is made to restart the connector (by chance, the REST request 
happens to be sent to a worker that knows a rebalance is pending, but has not 
yet completed that rebalance)
 # The restart request is rejected with a 409 response
 # The test fails

 

There are a few action items that come to mind based on this analysis:
 # Unconditionally log an ERROR-level message in our integration testing 
framework whenever a REST request is met with a response whose status code is 
300 or higher
 # Improve our worker liveness checks to guarantee not only that a worker's 
REST server has started, but that it has had a chance to join the cluster
 # Add retry logic when 409 responses are encountered during our integration 
tests (this one is debatable, but our CI infrastructure is so miraculously 
sluggish that rebalances from failure to read to the end of the config topic 
may not be out of the realm of possibility). One possible approach could be to 
re-perform a worker liveness check (one that guarantees that a worker is caught 
up on the config topic and has had a chance to (re-)join the cluster) and then 
re-issue the request, but only once.

 

> Fix flaky 
> ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15675
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15675
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Kirk True
>            Assignee: Chris Egerton
>            Priority: Major
>              Labels: flaky-test
>         Attachments: error.stacktrace.txt, error.stdout.txt
>
>
> This integration test is flaky around 9% of test runs. Source: [Gradle 
> Enterprise test 
> trends|https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=KAFKA&tests.container=org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest&tests.test=testMultiWorkerRestartOnlyConnector].
> One failure had this message:
> {code:java}
> java.lang.AssertionError: Failed to stop connector and tasks within 120000ms 
> {code}
> Please see the attachments for the stack trace and stdout log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to