C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1684858189


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -694,12 +701,29 @@ public void testTasksFailOnInabilityToFence() throws 
Exception {
             )).all().get();
         }
 
-        StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax);
-
         log.info("Bringing up connector with fresh slate; fencing should not 
be necessary");
         connect.configureConnector(CONNECTOR_NAME, props);
-        assertConnectorStarted(connectorStart);
-        // Verify that the connector and its tasks have been able to start 
successfully
+
+        // Hack: There is a small chance that our recent ACL updates for the 
connector have
+        // not yet been propagated across the entire Kafka cluster, and that 
our connector
+        // will fail on startup when it tries to list the end offsets of the 
worker's offsets topic
+        // So, we implement some retry logic here to add a layer of resiliency 
in that case
+        waitForCondition(
+                () -> {
+                    ConnectorStateInfo status = 
connect.connectorStatus(CONNECTOR_NAME);
+                    if ("RUNNING".equals(status.connector().state())) {
+                        return true;
+                    } else if ("FAILED".equals(status.connector().state())) {
+                        log.debug("Restarting failed connector {}", 
CONNECTOR_NAME);
+                        connect.restartConnector(CONNECTOR_NAME);
+                    }
+                    return false;
+                },
+                30_000,

Review Comment:
   They're the same value, but we're retrying for slightly different reasons; 
delays in ACL propagation won't necessarily be caught during pre-flight 
connector config validation.
   
   I've pulled this out into a separate `ACL_PROPAGATION_TIMEOUT_MS` value; 
LMKWYT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to