C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1204604996


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, 
Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            // At this point, we should be the leader (the call to 
alterConnectorOffsetsChecks makes sure of that) and can safely run
+            // a zombie fencing request
+            if (isSourceConnector(connName) && 
config.exactlyOnceSourceEnabled()) {
+                log.debug("Performing a round of zombie fencing before 
altering offsets for source connector {} with exactly-once semantics enabled.", 
connName);
+                getFenceZombieSourceTasksCallable(connName, (error, ignored) 
-> {
+                    if (error != null) {
+                        log.error("Failed to perform zombie fencing for 
exactly-once source connector prior to altering offsets", error);
+                        callback.onCompletion(new ConnectException("Failed to 
perform zombie fencing for exactly-once source connector prior to altering 
offsets",
+                                error), null);
+                    } else {
+                        log.debug("Successfully completed zombie fencing for 
source connector {}; proceeding to alter offsets.", connName);
+                        // We need to ensure that we perform the necessary 
checks again inside alterConnectorOffsetsHerderRequest
+                        // since it is being run in a separate herder request 
and the conditions could have changed since the
+                        // previous check
+                        addRequest(getAlterConnectorOffsetsCallable(connName, 
offsets, callback), forwardErrorCallback(callback));
+                    }
+                }).call();
+            } else {
+                getAlterConnectorOffsetsCallable(connName, offsets, 
callback).call();

Review Comment:
   Nice, LGTM 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to