[ https://issues.apache.org/jira/browse/KAFKA-15675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810057#comment-17810057 ]
Chris Egerton commented on KAFKA-15675: --------------------------------------- I've done some analysis on this one and believe I've found the root cause. It's a confluence of a few different issues, but the TL;DR is: *the request to {{POST /connectors/<connector>/restart?onlyFailed=false&includeTasks=false}} fails with a 409 error, this does not cause the test to (immediately) fail, but the connector is never restarted, which causes the test to time out while [waiting for the connector to be stopped|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L272-L275].* This kind of scenario probably raises several questions. Here's my best attempt to anticipate and address them: *Why does the 409 response not cause the test to immediately fail?* It's unclear on the original rationale for this, but the code structure [here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L374-L383] is fairly clear: issue the request, and if the status code is less than 400, attempt to deserialize the body. Then, unconditionally, return either null or the deserialized response body. *Why is the 409 response occurring?* The cluster (or, to be more specific, either the worker that received the initial REST request or, if the request was forwarded, the leader) detected that a rebalance due to an added/removed connector or new task configs was about to take place, and rejected the request. See the {{DistributedHerder}} class's [restartConnectorAndTasks|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1467] and [checkRebalanceNeeded|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2302-L2307] methods for the logic to check for pending rebalances, and its logic for detecting pending rebalances [here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2385], [here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2400], and [here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2419]. *Why is a rebalance pending by the time we try to restart the connector? Shouldn't the cluster and the set of connectors and tasks on it be stable by this point?* Yes, the cluster and set of connectors and tasks on it should be stable by the time we issue our restart request. We check to make sure that [every worker in the cluster is up and running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L116-L117] before proceeding with the rest of the test, and that the [connector and expected number of tasks are running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L252-L253] before issuing the restart request. Unfortunately, the former check–for worker liveness across the cluster–does not guarantee that every worker has joined the cluster. This check is [performed by issuing a request to the root resource|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L956-L975] ({{{}GET /{}}}) for each worker: if the response is valid (i.e., its body matches the expected format), then the worker is considered up and running. However, this does not guarantee that the worker has actually completed startup: it may not have finished reading to the end of internal topics, or had a chance to contact the group coordinator and join the cluster yet. After examining the logs of one test case, it appeared that the following sequence of events took place: # A single worker completes startup (creates and reads to the end of internal topics, then joins the cluster) # The connector is created (by chance, the REST request to create the connector happens to be sent to the only worker that has completed startup so far) # The connector is assigned to the only worker currently in the cluster # The connector generates task configs # The tasks for that connector are assigned to the only worker currently in the cluster # The other, more sluggish, workers in the cluster detect the new connector and/or task configs, and realize that a rebalance is pending # An attempt is made to restart the connector (by chance, the REST request happens to be sent to a worker that knows a rebalance is pending, but has not yet completed that rebalance) # The restart request is rejected with a 409 response # The test fails There are a few action items that come to mind based on this analysis: # Unconditionally log an ERROR-level message in our integration testing framework whenever a REST request is met with a response whose status code is 300 or higher # Improve our worker liveness checks to guarantee not only that a worker's REST server has started, but that it has had a chance to join the cluster # Add retry logic when 409 responses are encountered during our integration tests (this one is debatable, but our CI infrastructure is so miraculously sluggish that rebalances from failure to read to the end of the config topic may not be out of the realm of possibility). One possible approach could be to re-perform a worker liveness check (one that guarantees that a worker is caught up on the config topic and has had a chance to (re-)join the cluster) and then re-issue the request, but only once. > Fix flaky > ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test > --------------------------------------------------------------------------------------- > > Key: KAFKA-15675 > URL: https://issues.apache.org/jira/browse/KAFKA-15675 > Project: Kafka > Issue Type: Bug > Reporter: Kirk True > Assignee: Chris Egerton > Priority: Major > Labels: flaky-test > Attachments: error.stacktrace.txt, error.stdout.txt > > > This integration test is flaky around 9% of test runs. Source: [Gradle > Enterprise test > trends|https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=KAFKA&tests.container=org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest&tests.test=testMultiWorkerRestartOnlyConnector]. > One failure had this message: > {code:java} > java.lang.AssertionError: Failed to stop connector and tasks within 120000ms > {code} > Please see the attachments for the stack trace and stdout log. -- This message was sent by Atlassian Jira (v8.20.10#820010)