C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1204604996
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { ); } + @Override + public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) { + log.trace("Submitting alter offsets request for connector '{}'", connName); + + addRequest(() -> { + refreshConfigSnapshot(workerSyncTimeoutMs); + if (!alterConnectorOffsetsChecks(connName, callback)) { + return null; + } + // At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run + // a zombie fencing request + if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) { + log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once semantics enabled.", connName); + getFenceZombieSourceTasksCallable(connName, (error, ignored) -> { + if (error != null) { + log.error("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", error); + callback.onCompletion(new ConnectException("Failed to perform zombie fencing for exactly-once source connector prior to altering offsets", + error), null); + } else { + log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName); + // We need to ensure that we perform the necessary checks again inside alterConnectorOffsetsHerderRequest + // since it is being run in a separate herder request and the conditions could have changed since the + // previous check + addRequest(getAlterConnectorOffsetsCallable(connName, offsets, callback), forwardErrorCallback(callback)); + } + }).call(); + } else { + getAlterConnectorOffsetsCallable(connName, offsets, callback).call(); Review Comment: Nice, LGTM 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org