C0urante commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1221803692


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -374,19 +374,38 @@ public synchronized void connectorOffsets(String 
connName, Callback<ConnectorOff
     }
 
     @Override
-    public synchronized void alterConnectorOffsets(String connName, 
Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+    protected synchronized void modifyConnectorOffsets(String connName, 
Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
+        if (!modifyConnectorOffsetsChecks(connName, cb)) {
+            return;
+        }
+
+        if (offsets == null) {
+            worker.resetConnectorOffsets(connName, 
configState.connectorConfig(connName), cb);
+        } else {
+            worker.alterConnectorOffsets(connName, 
configState.connectorConfig(connName), offsets, cb);
+        }
+    }
+
+    /**
+     * This method performs a few checks for external requests to modify 
(alter or reset) connector offsets and
+     * completes the callback exceptionally if any check fails.
+     * @param connName the name of the connector whose offsets are to be 
modified
+     * @param cb callback to invoke upon completion
+     * @return true if all the checks passed, false otherwise
+     */
+    private boolean modifyConnectorOffsetsChecks(String connName, 
Callback<Message> cb) {
         if (!configState.contains(connName)) {
             cb.onCompletion(new NotFoundException("Connector " + connName + " 
not found", null), null);
-            return;
+            return false;
         }
 
         if (configState.targetState(connName) != TargetState.STOPPED || 
configState.taskCount(connName) != 0) {
-            cb.onCompletion(new BadRequestException("Connectors must be in the 
STOPPED state before their offsets can be altered. " +
-                    "This can be done for the specified connector by issuing a 
PUT request to the /connectors/" + connName + "/stop endpoint"), null);
-            return;
+            cb.onCompletion(new BadRequestException("Connectors must be in the 
STOPPED state before their offsets can be modified externally. " +

Review Comment:
   Nit: "modified externally" may not be very clear to users. I think "altered" 
captures the general concept well enough even if they're doing a reset, but if 
you'd like to be more precise, maybe we can say "altered or reset"?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String 
connName, Callback<Message> c
         // If the target state for the connector is stopped, its task count is 
0, and there is no rebalance pending (checked above),
         // we can be sure that the tasks have at least been attempted to be 
stopped (or cancelled if they took too long to stop).
         // Zombie tasks are handled by a round of zombie fencing for exactly 
once source connectors. Zombie sink tasks are handled
-        // naturally because requests to alter consumer group offsets will 
fail if there are still active members in the group.
+        // naturally because requests to alter consumer group offsets / delete 
consumer groups will fail if there are still active members
+        // in the group.
         if (configState.targetState(connName) != TargetState.STOPPED || 
configState.taskCount(connName) != 0) {
-            callback.onCompletion(new BadRequestException("Connectors must be 
in the STOPPED state before their offsets can be altered. This " +
-                    "can be done for the specified connector by issuing a PUT 
request to the /connectors/" + connName + "/stop endpoint"), null);
+            callback.onCompletion(new BadRequestException("Connectors must be 
in the STOPPED state before their offsets can be modified externally. " +

Review Comment:
   Same thought RE "modified externally" (though seeing how "modified"/"modify" 
is used in some other log and exception messages in this class, I think just 
"modified" could work here too).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
                                 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, 
offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+                            log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+                            cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
+                    }
 
-                        
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    // This should only occur for an offset reset request when:
+                    // 1. There was a prior attempt to reset offsets
+                    // OR
+                    // 2. No offsets have been committed yet
+                    if (offsetsToWrite.isEmpty()) {
+                        completeAlterOffsetsCallback(alterOffsetsResult, 
isReset, cb);
+                        return;
                     }
 
-                    @SuppressWarnings("rawtypes")
-                    KafkaFuture<Void> compositeAdminFuture = 
KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0]));
-
-                    compositeAdminFuture.whenComplete((ignored, error) -> {
-                        if (error != null) {
-                            // When a consumer group is non-empty, only group 
members can commit offsets. An attempt to alter offsets via the admin client
-                            // will result in an UnknownMemberIdException if 
the consumer group is non-empty (i.e. if the sink tasks haven't stopped
-                            // completely or if the connector is resumed while 
the alter offsets request is being processed). Similarly, an attempt to
-                            // delete consumer group offsets for a non-empty 
consumer group will result in a GroupSubscribedToTopicException
-                            if (error instanceof UnknownMemberIdException || 
error instanceof GroupSubscribedToTopicException) {
-                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName + " either because 
its tasks " +
-                                                "haven't stopped completely 
yet or the connector was resumed before the request to alter its offsets could 
be successfully " +
-                                                "completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the " +
-                                                "Connect cluster may need to 
be restarted to get rid of the zombie sink tasks."),
-                                        null);
-                            } else {
-                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName, error), null);
-                            }
-                        } else {
-                            completeAlterOffsetsCallback(alterOffsetsResult, 
cb);
-                        }
-                    }).whenComplete((ignored, ignoredError) -> {
-                        // errors originating from the original future are 
handled in the prior whenComplete invocation which isn't expected to throw
-                        // an exception itself, and we can thus ignore the 
error here
-                        Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
-                    });
+                    if (isReset) {
+                        resetSinkConnectorOffsets(connName, groupId, admin, 
cb, alterOffsetsResult);
+                    } else {
+                        alterSinkConnectorOffsets(connName, groupId, admin, 
offsetsToWrite, cb, alterOffsetsResult);
+                    }
                 } catch (Throwable t) {
-                    Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
+                    Utils.closeQuietly(admin, "Offset modification admin for 
sink connector " + connName);
                     throw t;
                 }
             } catch (Throwable t) {
-                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter 
offsets for sink connector " + connName), null);
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify 
offsets for sink connector " + connName), null);
             }
         }));
     }
 
     /**
-     * Alter a source connector's offsets.
+     * Alter a sink connector's consumer group offsets. This is done via calls 
to {@link Admin#alterConsumerGroupOffsets}
+     * and / or {@link Admin#deleteConsumerGroupOffsets}.
      *
-     * @param connName the name of the source connector whose offsets are to 
be altered
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param groupId the sink connector's consumer group ID
+     * @param admin the {@link Admin admin client} to be used for altering the 
consumer group offsets; should be closed after use

Review Comment:
   We close the admin client in this method, right? I think we can be clearer 
about that:
   ```suggestion
        * @param admin the {@link Admin admin client} to be used for altering 
the consumer group offsets; will be closed after use
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
                                 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);

Review Comment:
   Can we use a `Timer` to limit the total runtime of this method to 90 
seconds, instead of using a 90-second timeout for each admin client request?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1460,30 +1602,31 @@ void alterSourceConnectorOffsets(String connName, 
Connector connector, Map<Strin
                 try {
                     
offsetWriterCallback.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
                 } catch (ExecutionException e) {
-                    throw new ConnectException("Failed to alter offsets for 
source connector " + connName, e.getCause());
+                    throw new ConnectException("Failed to modify offsets for 
source connector " + connName, e.getCause());
                 } catch (TimeoutException e) {
-                    throw new ConnectException("Timed out while attempting to 
alter offsets for source connector " + connName, e);
+                    throw new ConnectException("Timed out while attempting to 
modify offsets for source connector " + connName, e);
                 } catch (InterruptedException e) {
-                    throw new ConnectException("Unexpectedly interrupted while 
attempting to alter offsets for source connector " + connName, e);
+                    throw new ConnectException("Unexpectedly interrupted while 
attempting to modify offsets for source connector " + connName, e);
                 }
 
-                completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                completeAlterOffsetsCallback(alterOffsetsResult, isReset, cb);
             } catch (Throwable t) {
-                log.error("Failed to alter offsets for source connector {}", 
connName, t);
-                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter 
offsets for source connector " + connName), null);
+                log.error("Failed to modify offsets for source connector {}", 
connName, t);
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify 
offsets for source connector " + connName), null);
             } finally {
-                Utils.closeQuietly(offsetStore::stop, "Offset store for offset 
alter request for connector " + connName);
+                Utils.closeQuietly(offsetStore::stop, "Offset store for offset 
modification request for connector " + connName);
             }
         }));
     }
 
-    private void completeAlterOffsetsCallback(boolean alterOffsetsResult, 
Callback<Message> cb) {
+    private void completeAlterOffsetsCallback(boolean alterOffsetsResult, 
boolean isReset, Callback<Message> cb) {

Review Comment:
   Should we rename this to `completeModifyOffsetsCallback` to match the other 
naming changes we've made in this class?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
                                 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, 
offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+                            log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+                            cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
+                    }
 
-                        
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    // This should only occur for an offset reset request when:
+                    // 1. There was a prior attempt to reset offsets
+                    // OR
+                    // 2. No offsets have been committed yet
+                    if (offsetsToWrite.isEmpty()) {
+                        completeAlterOffsetsCallback(alterOffsetsResult, 
isReset, cb);
+                        return;
                     }
 
-                    @SuppressWarnings("rawtypes")
-                    KafkaFuture<Void> compositeAdminFuture = 
KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0]));
-
-                    compositeAdminFuture.whenComplete((ignored, error) -> {
-                        if (error != null) {
-                            // When a consumer group is non-empty, only group 
members can commit offsets. An attempt to alter offsets via the admin client
-                            // will result in an UnknownMemberIdException if 
the consumer group is non-empty (i.e. if the sink tasks haven't stopped
-                            // completely or if the connector is resumed while 
the alter offsets request is being processed). Similarly, an attempt to
-                            // delete consumer group offsets for a non-empty 
consumer group will result in a GroupSubscribedToTopicException
-                            if (error instanceof UnknownMemberIdException || 
error instanceof GroupSubscribedToTopicException) {
-                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName + " either because 
its tasks " +
-                                                "haven't stopped completely 
yet or the connector was resumed before the request to alter its offsets could 
be successfully " +
-                                                "completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the " +
-                                                "Connect cluster may need to 
be restarted to get rid of the zombie sink tasks."),
-                                        null);
-                            } else {
-                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName, error), null);
-                            }
-                        } else {
-                            completeAlterOffsetsCallback(alterOffsetsResult, 
cb);
-                        }
-                    }).whenComplete((ignored, ignoredError) -> {
-                        // errors originating from the original future are 
handled in the prior whenComplete invocation which isn't expected to throw
-                        // an exception itself, and we can thus ignore the 
error here
-                        Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
-                    });
+                    if (isReset) {
+                        resetSinkConnectorOffsets(connName, groupId, admin, 
cb, alterOffsetsResult);
+                    } else {
+                        alterSinkConnectorOffsets(connName, groupId, admin, 
offsetsToWrite, cb, alterOffsetsResult);
+                    }
                 } catch (Throwable t) {
-                    Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
+                    Utils.closeQuietly(admin, "Offset modification admin for 
sink connector " + connName);
                     throw t;
                 }
             } catch (Throwable t) {
-                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter 
offsets for sink connector " + connName), null);
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify 
offsets for sink connector " + connName), null);
             }
         }));
     }
 
     /**
-     * Alter a source connector's offsets.
+     * Alter a sink connector's consumer group offsets. This is done via calls 
to {@link Admin#alterConsumerGroupOffsets}
+     * and / or {@link Admin#deleteConsumerGroupOffsets}.
      *
-     * @param connName the name of the source connector whose offsets are to 
be altered
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param groupId the sink connector's consumer group ID
+     * @param admin the {@link Admin admin client} to be used for altering the 
consumer group offsets; should be closed after use
+     * @param offsetsToWrite a mapping from topic partitions to offsets that 
need to be written; may not be null or empty
+     * @param cb callback to invoke upon completion
+     * @param alterOffsetsResult the result of the call to {@link 
SinkConnector#alterOffsets} for the connector
+     */
+    private void alterSinkConnectorOffsets(String connName, String groupId, 
Admin admin, Map<TopicPartition, Long> offsetsToWrite,
+                                           Callback<Message> cb, boolean 
alterOffsetsResult) {
+        List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
+
+        Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
offsetsToWrite.entrySet()
+                .stream()
+                .filter(entry -> entry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
OffsetAndMetadata(e.getValue())));
+
+        if (!offsetsToAlter.isEmpty()) {
+            log.debug("Committing the following consumer group offsets using 
an admin client for sink connector {}: {}.",
+                    connName, offsetsToAlter);
+            AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions 
= new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+            AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = 
admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                    alterConsumerGroupOffsetsOptions);
+
+            adminFutures.add(alterConsumerGroupOffsetsResult.all());
+        }
+
+        Set<TopicPartition> partitionsToReset = offsetsToWrite.entrySet()
+                .stream()
+                .filter(entry -> entry.getValue() == null)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toSet());
+
+        if (!partitionsToReset.isEmpty()) {
+            log.debug("Deleting the consumer group offsets for the following 
topic partitions using an admin client for sink connector {}: {}.",
+                    connName, partitionsToReset);
+            DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult 
= admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
+                    deleteConsumerGroupOffsetsOptions);
+
+            adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+        }
+
+        @SuppressWarnings("rawtypes")
+        KafkaFuture<Void> compositeAdminFuture = 
KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0]));
+
+        compositeAdminFuture.whenComplete((ignored, error) -> {
+            if (error != null) {
+                // When a consumer group is non-empty, only group members can 
commit offsets. An attempt to alter offsets via the admin client
+                // will result in an UnknownMemberIdException if the consumer 
group is non-empty (i.e. if the sink tasks haven't stopped
+                // completely or if the connector is resumed while the alter 
offsets request is being processed). Similarly, an attempt to
+                // delete consumer group offsets for a non-empty consumer 
group will result in a GroupSubscribedToTopicException
+                if (error instanceof UnknownMemberIdException || error 
instanceof GroupSubscribedToTopicException) {
+                    cb.onCompletion(new ConnectException("Failed to alter 
consumer group offsets for connector " + connName + " either because its tasks 
" +
+                                    "haven't stopped completely yet or the 
connector was resumed before the request to alter its offsets could be 
successfully " +
+                                    "completed. If the connector is in a 
stopped state, this operation can be safely retried. If it doesn't eventually 
succeed, the " +
+                                    "Connect cluster may need to be restarted 
to get rid of the zombie sink tasks."),
+                            null);
+                } else {
+                    cb.onCompletion(new ConnectException("Failed to alter 
consumer group offsets for connector " + connName, error), null);
+                }
+            } else {
+                completeAlterOffsetsCallback(alterOffsetsResult, false, cb);
+            }
+        }).whenComplete((ignored, ignoredError) -> {
+            // errors originating from the original future are handled in the 
prior whenComplete invocation which isn't expected to throw
+            // an exception itself, and we can thus ignore the error here
+            Utils.closeQuietly(admin, "Offset alter admin for sink connector " 
+ connName);
+        });

Review Comment:
   It looks like this is just pulled out from the [existing 
`alterSinkConnectorOffsets` 
method](https://github.com/apache/kafka/blob/9cfc4b9373bf887ec2fb95bc607f4ebd1c8613c0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1323-L1383)
 (which is fine); let me know if there are any other changes that I've missed?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -574,6 +574,199 @@ public void 
alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect,
                 "Source connector offsets should reflect the expected number 
of records produced");
     }
 
+    @Test
+    public void testResetSinkConnectorOffsets() throws Exception {
+        resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), 
connect.kafka());
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() 
throws Exception {
+        Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+        
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.GROUP_ID_CONFIG,
+                "overridden-group-id");
+        resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
+        // Ensure that the overridden consumer group ID was the one actually 
used
+        try (Admin admin = connect.kafka().createAdminClient()) {
+            Collection<ConsumerGroupListing> consumerGroups = 
admin.listConsumerGroups().all().get();
+            assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing 
-> "overridden-group-id".equals(consumerGroupListing.groupId())));
+            assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing 
-> 
SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
+        }
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
+        EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
+
+        try (AutoCloseable ignored = kafkaCluster::stop) {
+            kafkaCluster.start();
+
+            Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+
+            resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster);
+        }
+    }
+
+    private void resetAndVerifySinkConnectorOffsets(Map<String, String> 
connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
+        int numPartitions = 3;
+        int numMessages = 10;
+        kafkaCluster.createTopic(TOPIC, numPartitions);
+
+        // Produce numMessages messages to each partition
+        for (int partition = 0; partition < numPartitions; partition++) {
+            for (int message = 0; message < numMessages; message++) {
+                kafkaCluster.produce(TOPIC, partition, "key", "value");
+            }
+        }
+        // Create sink connector
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+                "Connector tasks did not start in time.");
+
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", 
numPartitions, numMessages,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Reset the sink connector's offsets
+        String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
+                "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
+
+        waitForEmptySinkConnectorOffsets(CONNECTOR_NAME);
+
+        // Reset the sink connector's offsets again while it is still in a 
STOPPED state and ensure that there is no error
+        response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
+                "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
+
+        waitForEmptySinkConnectorOffsets(CONNECTOR_NAME);
+
+        // Resume the connector and expect its offsets to catch up to the 
latest offsets
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                NUM_TASKS,
+                "Connector tasks did not resume in time"
+        );
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", 
numPartitions, 10,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsZombieSinkTasks() throws 
Exception {
+        connect.kafka().createTopic(TOPIC, 1);
+
+        // Produce 10 messages
+        for (int message = 0; message < 10; message++) {
+            connect.kafka().produce(TOPIC, 0, "key", "value");
+        }
+
+        // Configure a sink connector whose sink task blocks in its stop method
+        Map<String, String> connectorConfigs = new HashMap<>();
+        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSinkConnector.class.getName());
+        connectorConfigs.put(TOPICS_CONFIG, TOPIC);
+        connectorConfigs.put("block", "Task::stop");
+
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+                "Connector tasks did not start in time.");
+
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, 10,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+
+        connect.stopConnector(CONNECTOR_NAME);
+
+        // Try to reset the offsets
+        ConnectRestException e = assertThrows(ConnectRestException.class, () 
-> connect.resetConnectorOffsets(CONNECTOR_NAME));
+        assertThat(e.getMessage(), containsString("zombie sink task"));
+    }
+
+    @Test
+    public void testResetSourceConnectorOffsets() throws Exception {
+        resetAndVerifySourceConnectorOffsets(connect, 
baseSourceConnectorConfigs());
+    }
+
+    // Note that the following test also implicitly tests the custom offsets 
topic case since source connectors always use
+    // a separate offsets topic when exactly once support is enabled and the 
Kafka cluster targeted by the source connector
+    // is different from the Connect cluster's backing Kafka cluster.
+    @Test
+    public void 
testResetSourceConnectorOffsetsExactlyOnceSupportEnabledAndDifferentKafkaClusterTargeted()
 throws Exception {
+        Properties brokerProps = new Properties();
+        brokerProps.put("transaction.state.log.replication.factor", "1");
+        brokerProps.put("transaction.state.log.min.isr", "1");
+        EmbeddedKafkaCluster connectorTargetedKafkaCluster = new 
EmbeddedKafkaCluster(1, new Properties());
+        workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, 
"enabled");
+
+        // This embedded Connect cluster will internally spin up its own 
embedded Kafka cluster
+        EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new 
EmbeddedConnectCluster.Builder()

Review Comment:
   Do we need to close this when we're done with it?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1268,39 +1270,55 @@ public void alterConnectorOffsets(String connName, 
Map<String, String> connector
             connector = plugins.newConnector(connectorClassOrAlias);
             if (ConnectUtils.isSinkConnector(connector)) {
                 log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
-                alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+                modifySinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
             } else {
                 log.debug("Altering offsets for source connector: {}", 
connName);
-                alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+                modifySourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Reset a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be reset
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion
+     */
+    public void resetConnectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<Message> cb) {

Review Comment:
   Do we need separate methods for resetting and altering offsets, or can we 
borrow the `modifyConnectorOffsets` strategy from the `AbstractHerder` and 
accept a possibly-null `offsets` parameter to distinguish between the two 
operations?
   
   If we're worried about accidentally introducing a nasty bug where an 
empty-bodied alter request causes an unintentional reset, we can add an 
integration test for that case.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
                                 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, 
offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+                            log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+                            cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
+                    }
 
-                        
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    // This should only occur for an offset reset request when:
+                    // 1. There was a prior attempt to reset offsets
+                    // OR
+                    // 2. No offsets have been committed yet
+                    if (offsetsToWrite.isEmpty()) {
+                        completeAlterOffsetsCallback(alterOffsetsResult, 
isReset, cb);
+                        return;
                     }
 
-                    @SuppressWarnings("rawtypes")
-                    KafkaFuture<Void> compositeAdminFuture = 
KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0]));
-
-                    compositeAdminFuture.whenComplete((ignored, error) -> {
-                        if (error != null) {
-                            // When a consumer group is non-empty, only group 
members can commit offsets. An attempt to alter offsets via the admin client
-                            // will result in an UnknownMemberIdException if 
the consumer group is non-empty (i.e. if the sink tasks haven't stopped
-                            // completely or if the connector is resumed while 
the alter offsets request is being processed). Similarly, an attempt to
-                            // delete consumer group offsets for a non-empty 
consumer group will result in a GroupSubscribedToTopicException
-                            if (error instanceof UnknownMemberIdException || 
error instanceof GroupSubscribedToTopicException) {
-                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName + " either because 
its tasks " +
-                                                "haven't stopped completely 
yet or the connector was resumed before the request to alter its offsets could 
be successfully " +
-                                                "completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the " +
-                                                "Connect cluster may need to 
be restarted to get rid of the zombie sink tasks."),
-                                        null);
-                            } else {
-                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName, error), null);
-                            }
-                        } else {
-                            completeAlterOffsetsCallback(alterOffsetsResult, 
cb);
-                        }
-                    }).whenComplete((ignored, ignoredError) -> {
-                        // errors originating from the original future are 
handled in the prior whenComplete invocation which isn't expected to throw
-                        // an exception itself, and we can thus ignore the 
error here
-                        Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
-                    });
+                    if (isReset) {
+                        resetSinkConnectorOffsets(connName, groupId, admin, 
cb, alterOffsetsResult);
+                    } else {
+                        alterSinkConnectorOffsets(connName, groupId, admin, 
offsetsToWrite, cb, alterOffsetsResult);
+                    }
                 } catch (Throwable t) {
-                    Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
+                    Utils.closeQuietly(admin, "Offset modification admin for 
sink connector " + connName);
                     throw t;
                 }
             } catch (Throwable t) {
-                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter 
offsets for sink connector " + connName), null);
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify 
offsets for sink connector " + connName), null);
             }
         }));
     }
 
     /**
-     * Alter a source connector's offsets.
+     * Alter a sink connector's consumer group offsets. This is done via calls 
to {@link Admin#alterConsumerGroupOffsets}
+     * and / or {@link Admin#deleteConsumerGroupOffsets}.
      *
-     * @param connName the name of the source connector whose offsets are to 
be altered
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param groupId the sink connector's consumer group ID
+     * @param admin the {@link Admin admin client} to be used for altering the 
consumer group offsets; should be closed after use
+     * @param offsetsToWrite a mapping from topic partitions to offsets that 
need to be written; may not be null or empty
+     * @param cb callback to invoke upon completion
+     * @param alterOffsetsResult the result of the call to {@link 
SinkConnector#alterOffsets} for the connector
+     */
+    private void alterSinkConnectorOffsets(String connName, String groupId, 
Admin admin, Map<TopicPartition, Long> offsetsToWrite,
+                                           Callback<Message> cb, boolean 
alterOffsetsResult) {
+        List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
+
+        Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
offsetsToWrite.entrySet()
+                .stream()
+                .filter(entry -> entry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
OffsetAndMetadata(e.getValue())));
+
+        if (!offsetsToAlter.isEmpty()) {
+            log.debug("Committing the following consumer group offsets using 
an admin client for sink connector {}: {}.",
+                    connName, offsetsToAlter);
+            AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions 
= new AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+            AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = 
admin.alterConsumerGroupOffsets(groupId, offsetsToAlter,
+                    alterConsumerGroupOffsetsOptions);
+
+            adminFutures.add(alterConsumerGroupOffsetsResult.all());
+        }
+
+        Set<TopicPartition> partitionsToReset = offsetsToWrite.entrySet()
+                .stream()
+                .filter(entry -> entry.getValue() == null)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toSet());
+
+        if (!partitionsToReset.isEmpty()) {
+            log.debug("Deleting the consumer group offsets for the following 
topic partitions using an admin client for sink connector {}: {}.",
+                    connName, partitionsToReset);
+            DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                    (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult 
= admin.deleteConsumerGroupOffsets(groupId, partitionsToReset,
+                    deleteConsumerGroupOffsetsOptions);
+
+            adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+        }
+
+        @SuppressWarnings("rawtypes")
+        KafkaFuture<Void> compositeAdminFuture = 
KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0]));
+
+        compositeAdminFuture.whenComplete((ignored, error) -> {
+            if (error != null) {
+                // When a consumer group is non-empty, only group members can 
commit offsets. An attempt to alter offsets via the admin client
+                // will result in an UnknownMemberIdException if the consumer 
group is non-empty (i.e. if the sink tasks haven't stopped
+                // completely or if the connector is resumed while the alter 
offsets request is being processed). Similarly, an attempt to
+                // delete consumer group offsets for a non-empty consumer 
group will result in a GroupSubscribedToTopicException
+                if (error instanceof UnknownMemberIdException || error 
instanceof GroupSubscribedToTopicException) {
+                    cb.onCompletion(new ConnectException("Failed to alter 
consumer group offsets for connector " + connName + " either because its tasks 
" +
+                                    "haven't stopped completely yet or the 
connector was resumed before the request to alter its offsets could be 
successfully " +
+                                    "completed. If the connector is in a 
stopped state, this operation can be safely retried. If it doesn't eventually 
succeed, the " +
+                                    "Connect cluster may need to be restarted 
to get rid of the zombie sink tasks."),
+                            null);
+                } else {
+                    cb.onCompletion(new ConnectException("Failed to alter 
consumer group offsets for connector " + connName, error), null);
+                }
+            } else {
+                completeAlterOffsetsCallback(alterOffsetsResult, false, cb);
+            }
+        }).whenComplete((ignored, ignoredError) -> {
+            // errors originating from the original future are handled in the 
prior whenComplete invocation which isn't expected to throw
+            // an exception itself, and we can thus ignore the error here
+            Utils.closeQuietly(admin, "Offset alter admin for sink connector " 
+ connName);
+        });
+    }
+
+    /**
+     * Reset a sink connector's consumer group offsets. This is done by 
deleting the consumer group via a call to
+     * {@link Admin#deleteConsumerGroups}
+     *
+     * @param connName the name of the sink connector whose offsets are to be 
reset
+     * @param groupId the sink connector's consumer group ID
+     * @param admin the {@link Admin admin client} to be used for resetting 
the consumer group offsets; should be closed after use

Review Comment:
   ```suggestion
        * @param admin the {@link Admin admin client} to be used for resetting 
the consumer group offsets; will be closed after use
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -574,6 +574,199 @@ public void 
alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect,
                 "Source connector offsets should reflect the expected number 
of records produced");
     }
 
+    @Test
+    public void testResetSinkConnectorOffsets() throws Exception {
+        resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), 
connect.kafka());
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() 
throws Exception {
+        Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+        
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.GROUP_ID_CONFIG,
+                "overridden-group-id");
+        resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
+        // Ensure that the overridden consumer group ID was the one actually 
used
+        try (Admin admin = connect.kafka().createAdminClient()) {
+            Collection<ConsumerGroupListing> consumerGroups = 
admin.listConsumerGroups().all().get();
+            assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing 
-> "overridden-group-id".equals(consumerGroupListing.groupId())));
+            assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing 
-> 
SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
+        }
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
+        EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
+
+        try (AutoCloseable ignored = kafkaCluster::stop) {
+            kafkaCluster.start();
+
+            Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+
+            resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster);
+        }
+    }
+
+    private void resetAndVerifySinkConnectorOffsets(Map<String, String> 
connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
+        int numPartitions = 3;
+        int numMessages = 10;
+        kafkaCluster.createTopic(TOPIC, numPartitions);
+
+        // Produce numMessages messages to each partition
+        for (int partition = 0; partition < numPartitions; partition++) {
+            for (int message = 0; message < numMessages; message++) {
+                kafkaCluster.produce(TOPIC, partition, "key", "value");
+            }
+        }
+        // Create sink connector
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+                "Connector tasks did not start in time.");
+
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", 
numPartitions, numMessages,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Reset the sink connector's offsets
+        String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
+                "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
+
+        waitForEmptySinkConnectorOffsets(CONNECTOR_NAME);
+
+        // Reset the sink connector's offsets again while it is still in a 
STOPPED state and ensure that there is no error
+        response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
+                "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
+
+        waitForEmptySinkConnectorOffsets(CONNECTOR_NAME);
+
+        // Resume the connector and expect its offsets to catch up to the 
latest offsets
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                NUM_TASKS,
+                "Connector tasks did not resume in time"
+        );
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", 
numPartitions, 10,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsZombieSinkTasks() throws 
Exception {
+        connect.kafka().createTopic(TOPIC, 1);
+
+        // Produce 10 messages
+        for (int message = 0; message < 10; message++) {
+            connect.kafka().produce(TOPIC, 0, "key", "value");
+        }
+
+        // Configure a sink connector whose sink task blocks in its stop method
+        Map<String, String> connectorConfigs = new HashMap<>();
+        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSinkConnector.class.getName());
+        connectorConfigs.put(TOPICS_CONFIG, TOPIC);
+        connectorConfigs.put("block", "Task::stop");
+
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+                "Connector tasks did not start in time.");
+
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, 10,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+
+        connect.stopConnector(CONNECTOR_NAME);
+
+        // Try to reset the offsets
+        ConnectRestException e = assertThrows(ConnectRestException.class, () 
-> connect.resetConnectorOffsets(CONNECTOR_NAME));
+        assertThat(e.getMessage(), containsString("zombie sink task"));
+    }
+
+    @Test
+    public void testResetSourceConnectorOffsets() throws Exception {
+        resetAndVerifySourceConnectorOffsets(connect, 
baseSourceConnectorConfigs());
+    }
+
+    // Note that the following test also implicitly tests the custom offsets 
topic case since source connectors always use
+    // a separate offsets topic when exactly once support is enabled and the 
Kafka cluster targeted by the source connector
+    // is different from the Connect cluster's backing Kafka cluster.
+    @Test
+    public void 
testResetSourceConnectorOffsetsExactlyOnceSupportEnabledAndDifferentKafkaClusterTargeted()
 throws Exception {
+        Properties brokerProps = new Properties();
+        brokerProps.put("transaction.state.log.replication.factor", "1");
+        brokerProps.put("transaction.state.log.min.isr", "1");
+        EmbeddedKafkaCluster connectorTargetedKafkaCluster = new 
EmbeddedKafkaCluster(1, new Properties());
+        workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, 
"enabled");
+
+        // This embedded Connect cluster will internally spin up its own 
embedded Kafka cluster
+        EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new 
EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .brokerProps(brokerProps)
+                .numWorkers(NUM_WORKERS)
+                .workerProps(workerProps)
+                .build();
+

Review Comment:
   Nit: extra newline
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to