yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1221937078


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String 
connName, Callback<Message> c
         // If the target state for the connector is stopped, its task count is 
0, and there is no rebalance pending (checked above),
         // we can be sure that the tasks have at least been attempted to be 
stopped (or cancelled if they took too long to stop).
         // Zombie tasks are handled by a round of zombie fencing for exactly 
once source connectors. Zombie sink tasks are handled
-        // naturally because requests to alter consumer group offsets will 
fail if there are still active members in the group.
+        // naturally because requests to alter consumer group offsets / delete 
consumer groups will fail if there are still active members
+        // in the group.
         if (configState.targetState(connName) != TargetState.STOPPED || 
configState.taskCount(connName) != 0) {
-            callback.onCompletion(new BadRequestException("Connectors must be 
in the STOPPED state before their offsets can be altered. This " +
-                    "can be done for the specified connector by issuing a PUT 
request to the /connectors/" + connName + "/stop endpoint"), null);
+            callback.onCompletion(new BadRequestException("Connectors must be 
in the STOPPED state before their offsets can be modified externally. " +

Review Comment:
   I think just "modified" should be clear enough to users since this is 
synchronously surfaced to the user via the alter / reset offsets REST API 
response 👍 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
                                 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, 
offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+                            log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+                            cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
+                    }
 
-                        
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    // This should only occur for an offset reset request when:
+                    // 1. There was a prior attempt to reset offsets
+                    // OR
+                    // 2. No offsets have been committed yet
+                    if (offsetsToWrite.isEmpty()) {
+                        completeAlterOffsetsCallback(alterOffsetsResult, 
isReset, cb);
+                        return;
                     }
 
-                    @SuppressWarnings("rawtypes")
-                    KafkaFuture<Void> compositeAdminFuture = 
KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0]));
-
-                    compositeAdminFuture.whenComplete((ignored, error) -> {
-                        if (error != null) {
-                            // When a consumer group is non-empty, only group 
members can commit offsets. An attempt to alter offsets via the admin client
-                            // will result in an UnknownMemberIdException if 
the consumer group is non-empty (i.e. if the sink tasks haven't stopped
-                            // completely or if the connector is resumed while 
the alter offsets request is being processed). Similarly, an attempt to
-                            // delete consumer group offsets for a non-empty 
consumer group will result in a GroupSubscribedToTopicException
-                            if (error instanceof UnknownMemberIdException || 
error instanceof GroupSubscribedToTopicException) {
-                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName + " either because 
its tasks " +
-                                                "haven't stopped completely 
yet or the connector was resumed before the request to alter its offsets could 
be successfully " +
-                                                "completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the " +
-                                                "Connect cluster may need to 
be restarted to get rid of the zombie sink tasks."),
-                                        null);
-                            } else {
-                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName, error), null);
-                            }
-                        } else {
-                            completeAlterOffsetsCallback(alterOffsetsResult, 
cb);
-                        }
-                    }).whenComplete((ignored, ignoredError) -> {
-                        // errors originating from the original future are 
handled in the prior whenComplete invocation which isn't expected to throw
-                        // an exception itself, and we can thus ignore the 
error here
-                        Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
-                    });
+                    if (isReset) {
+                        resetSinkConnectorOffsets(connName, groupId, admin, 
cb, alterOffsetsResult);
+                    } else {
+                        alterSinkConnectorOffsets(connName, groupId, admin, 
offsetsToWrite, cb, alterOffsetsResult);
+                    }
                 } catch (Throwable t) {
-                    Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
+                    Utils.closeQuietly(admin, "Offset modification admin for 
sink connector " + connName);
                     throw t;
                 }
             } catch (Throwable t) {
-                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter 
offsets for sink connector " + connName), null);
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify 
offsets for sink connector " + connName), null);
             }
         }));
     }
 
     /**
-     * Alter a source connector's offsets.
+     * Alter a sink connector's consumer group offsets. This is done via calls 
to {@link Admin#alterConsumerGroupOffsets}
+     * and / or {@link Admin#deleteConsumerGroupOffsets}.
      *
-     * @param connName the name of the source connector whose offsets are to 
be altered
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param groupId the sink connector's consumer group ID
+     * @param admin the {@link Admin admin client} to be used for altering the 
consumer group offsets; should be closed after use

Review Comment:
   Good catch, thanks. I think this was leftover from a previous refactor 
(earlier it was the caller's responsibility to close the admin client).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
                                 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, 
offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+                            log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+                            cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
+                    }
 
-                        
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    // This should only occur for an offset reset request when:
+                    // 1. There was a prior attempt to reset offsets
+                    // OR
+                    // 2. No offsets have been committed yet
+                    if (offsetsToWrite.isEmpty()) {
+                        completeAlterOffsetsCallback(alterOffsetsResult, 
isReset, cb);
+                        return;
                     }
 
-                    @SuppressWarnings("rawtypes")
-                    KafkaFuture<Void> compositeAdminFuture = 
KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0]));
-
-                    compositeAdminFuture.whenComplete((ignored, error) -> {
-                        if (error != null) {
-                            // When a consumer group is non-empty, only group 
members can commit offsets. An attempt to alter offsets via the admin client
-                            // will result in an UnknownMemberIdException if 
the consumer group is non-empty (i.e. if the sink tasks haven't stopped
-                            // completely or if the connector is resumed while 
the alter offsets request is being processed). Similarly, an attempt to
-                            // delete consumer group offsets for a non-empty 
consumer group will result in a GroupSubscribedToTopicException
-                            if (error instanceof UnknownMemberIdException || 
error instanceof GroupSubscribedToTopicException) {
-                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName + " either because 
its tasks " +
-                                                "haven't stopped completely 
yet or the connector was resumed before the request to alter its offsets could 
be successfully " +
-                                                "completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the " +
-                                                "Connect cluster may need to 
be restarted to get rid of the zombie sink tasks."),
-                                        null);
-                            } else {
-                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName, error), null);
-                            }
-                        } else {
-                            completeAlterOffsetsCallback(alterOffsetsResult, 
cb);
-                        }
-                    }).whenComplete((ignored, ignoredError) -> {
-                        // errors originating from the original future are 
handled in the prior whenComplete invocation which isn't expected to throw
-                        // an exception itself, and we can thus ignore the 
error here
-                        Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
-                    });
+                    if (isReset) {
+                        resetSinkConnectorOffsets(connName, groupId, admin, 
cb, alterOffsetsResult);
+                    } else {
+                        alterSinkConnectorOffsets(connName, groupId, admin, 
offsetsToWrite, cb, alterOffsetsResult);
+                    }
                 } catch (Throwable t) {
-                    Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
+                    Utils.closeQuietly(admin, "Offset modification admin for 
sink connector " + connName);
                     throw t;
                 }
             } catch (Throwable t) {
-                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter 
offsets for sink connector " + connName), null);
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify 
offsets for sink connector " + connName), null);
             }
         }));
     }
 
     /**
-     * Alter a source connector's offsets.
+     * Alter a sink connector's consumer group offsets. This is done via calls 
to {@link Admin#alterConsumerGroupOffsets}
+     * and / or {@link Admin#deleteConsumerGroupOffsets}.
      *
-     * @param connName the name of the source connector whose offsets are to 
be altered
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param groupId the sink connector's consumer group ID
+     * @param admin the {@link Admin admin client} to be used for altering the 
consumer group offsets; should be closed after use
+     * @param offsetsToWrite a mapping from topic partitions to offsets that 
need to be written; may not be null or empty
+     * @param cb callback to invoke upon completion
+     * @param alterOffsetsResult the result of the call to {@link 
SinkConnector#alterOffsets} for the connector
+     */
+    private void alterSinkConnectorOffsets(String connName, String groupId, 
Admin admin, Map<TopicPartition, Long> offsetsToWrite,
+                                           Callback<Message> cb, boolean 
alterOffsetsResult) {
+        List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
+
+        Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
offsetsToWrite.entrySet()
+                .stream()
+                .filter(entry -> entry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
OffsetAndMetadata(e.getValue())));
+
+        if (!offsetsToAlter.isEmpty()) {
+            log.debug("Committing the following consumer group offsets using 
an admin client for sink connector {}: {}.",
+                    connName, offsetsToAlter);
+            AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions 
= new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+            AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = 
admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                    alterConsumerGroupOffsetsOptions);
+
+            adminFutures.add(alterConsumerGroupOffsetsResult.all());
+        }
+
+        Set<TopicPartition> partitionsToReset = offsetsToWrite.entrySet()
+                .stream()
+                .filter(entry -> entry.getValue() == null)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toSet());
+
+        if (!partitionsToReset.isEmpty()) {
+            log.debug("Deleting the consumer group offsets for the following 
topic partitions using an admin client for sink connector {}: {}.",
+                    connName, partitionsToReset);
+            DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult 
= admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
+                    deleteConsumerGroupOffsetsOptions);
+
+            adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+        }
+
+        @SuppressWarnings("rawtypes")
+        KafkaFuture<Void> compositeAdminFuture = 
KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0]));
+
+        compositeAdminFuture.whenComplete((ignored, error) -> {
+            if (error != null) {
+                // When a consumer group is non-empty, only group members can 
commit offsets. An attempt to alter offsets via the admin client
+                // will result in an UnknownMemberIdException if the consumer 
group is non-empty (i.e. if the sink tasks haven't stopped
+                // completely or if the connector is resumed while the alter 
offsets request is being processed). Similarly, an attempt to
+                // delete consumer group offsets for a non-empty consumer 
group will result in a GroupSubscribedToTopicException
+                if (error instanceof UnknownMemberIdException || error 
instanceof GroupSubscribedToTopicException) {
+                    cb.onCompletion(new ConnectException("Failed to alter 
consumer group offsets for connector " + connName + " either because its tasks 
" +
+                                    "haven't stopped completely yet or the 
connector was resumed before the request to alter its offsets could be 
successfully " +
+                                    "completed. If the connector is in a 
stopped state, this operation can be safely retried. If it doesn't eventually 
succeed, the " +
+                                    "Connect cluster may need to be restarted 
to get rid of the zombie sink tasks."),
+                            null);
+                } else {
+                    cb.onCompletion(new ConnectException("Failed to alter 
consumer group offsets for connector " + connName, error), null);
+                }
+            } else {
+                completeAlterOffsetsCallback(alterOffsetsResult, false, cb);
+            }
+        }).whenComplete((ignored, ignoredError) -> {
+            // errors originating from the original future are handled in the 
prior whenComplete invocation which isn't expected to throw
+            // an exception itself, and we can thus ignore the error here
+            Utils.closeQuietly(admin, "Offset alter admin for sink connector " 
+ connName);
+        });

Review Comment:
   Yeah, no changes here. I'm surprised that the GitHub diff UI isn't smarter 
about things like this 🤷‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to