C0urante commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1684858189
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java: ########## @@ -694,12 +701,29 @@ public void testTasksFailOnInabilityToFence() throws Exception { )).all().get(); } - StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax); - log.info("Bringing up connector with fresh slate; fencing should not be necessary"); connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorStarted(connectorStart); - // Verify that the connector and its tasks have been able to start successfully + + // Hack: There is a small chance that our recent ACL updates for the connector have + // not yet been propagated across the entire Kafka cluster, and that our connector + // will fail on startup when it tries to list the end offsets of the worker's offsets topic + // So, we implement some retry logic here to add a layer of resiliency in that case + waitForCondition( + () -> { + ConnectorStateInfo status = connect.connectorStatus(CONNECTOR_NAME); + if ("RUNNING".equals(status.connector().state())) { + return true; + } else if ("FAILED".equals(status.connector().state())) { + log.debug("Restarting failed connector {}", CONNECTOR_NAME); + connect.restartConnector(CONNECTOR_NAME); + } + return false; + }, + 30_000, Review Comment: They're the same value, but we're retrying for slightly different reasons; delays in ACL propagation won't necessarily be caught during pre-flight connector config validation. I've pulled this out into a separate `ACL_PROPAGATION_TIMEOUT_MS` value; LMKWYT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org