yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1200530069


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, 
Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            // At this point, we should be the leader (the call to 
alterConnectorOffsetsChecks makes sure of that) and can safely run
+            // a zombie fencing request
+            if (isSourceConnector(connName) && 
config.exactlyOnceSourceEnabled()) {
+                log.debug("Performing a round of zombie fencing before 
altering offsets for source connector {} with exactly-once semantics enabled.", 
connName);
+                getFenceZombieSourceTasksCallable(connName, (error, ignored) 
-> {
+                    if (error != null) {
+                        log.error("Failed to perform zombie fencing for 
exactly-once source connector prior to altering offsets", error);
+                        callback.onCompletion(new ConnectException("Failed to 
perform zombie fencing for exactly-once source connector prior to altering 
offsets",
+                                error), null);
+                    } else {
+                        log.debug("Successfully completed zombie fencing for 
source connector {}; proceeding to alter offsets.", connName);
+                        // We need to ensure that we perform the necessary 
checks again inside alterConnectorOffsetsHerderRequest
+                        // since it is being run in a separate herder request 
and the conditions could have changed since the
+                        // previous check
+                        addRequest(getAlterConnectorOffsetsCallable(connName, 
offsets, callback), forwardErrorCallback(callback));
+                    }
+                }).call();
+            } else {
+                getAlterConnectorOffsetsCallable(connName, offsets, 
callback).call();

Review Comment:
   I think one of the main motivations here (and above) was naming 😅 
   
   We already have `fenceZombieSourceTasks` and `alterConnectorOffsets` 
interface methods but there was a need to break out some of the logic from both 
into separate methods for re-use. I'm happy to take any naming suggestions to 
make them synchronous and wrap them into callables at the relevant call sites.
   
   Maybe something like `fenceZombieSourceTasksSync` / 
`alterConnectorOffsetsSync` or `fenceZombieSourceTasksInternal` / 
`alterConnectorOffsetsInternal`? I don't particularly like either though, 
naming is hard...



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, 
Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+        addRequest(() -> {
+            refreshConfigSnapshot(workerSyncTimeoutMs);

Review Comment:
   Thanks, both points make sense, done. Btw shouldn't we also handle the read 
timeout case in the 
[stopConnector](https://github.com/apache/kafka/blob/e96a463561ca8974fca37562b8675ae8ae4aff29/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1107)
 method? The `KafkaConfigBackingStore::putTaskConfigs` method does include a 
read to the end of the config topic at the end but the subsequently called 
`KafkaConfigBackingStore::putTargetState` method doesn't.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be 
altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> 
connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> 
offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+                alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", 
connName);
+                alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> 
offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = 
connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, 
sinkConnectorConfig,
+                        sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));

Review Comment:
   Wow, thanks 🤦 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be 
altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> 
connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> 
offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+                alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", 
connName);
+                alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> 
offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = 
connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, 
sinkConnectorConfig,
+                        sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = 
KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group 
topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for 
sink connector " + connName);
+                            // When a consumer group is non-empty, only group 
members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an 
UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink 
tasks haven't stopped
+                            // completely or if the connector is resumed while 
the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName + " either because 
its tasks " +
+                                                "haven't stopped completely 
yet or the connector was resumed before the request to alter its offsets could 
be successfully " +
+                                                "completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the " +
+                                                "Connect cluster may need to 
be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for topic partitions " + 
offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, 
error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for 
the following topic partitions using an admin client for sink connector {}: 
{}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            
deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin 
for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for 
certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the 
consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while 
the alter offsets request is being processed).
+                                    if (error2 instanceof 
GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new 
ConnectException("Failed to alter consumer group offsets for connector " + 
connName + " either because its tasks " +
+                                                        "haven't stopped 
completely yet or the connector was resumed before the request to alter its 
offsets could be successfully " +
+                                                        "completed. If the 
connector is in a stopped state, this operation can be safely retried. If it 
doesn't eventually succeed, the " +
+                                                        "Connect cluster may 
need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new 
ConnectException("Failed to delete consumer group offsets for topic partitions 
" + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    
completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for 
sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, 
cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter 
offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to 
be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be 
overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    private void alterSourceConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                             Map<Map<String, ?>, Map<String, 
?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new 
ConnectorTaskId(connName, 0), config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId)
+                : baseProducerConfigs(connName, "connector-offset-producer-" + 
connName, config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId);
+        KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+        ConnectorOffsetBackingStore offsetStore = 
config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, 
connName, connector, producer)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, 
connector, producer);
+
+        OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, connName, internalKeyConverter, 
internalValueConverter);
+        alterSourceConnectorOffsets(connName, connector, connectorConfig, 
offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
+    }
+
+    // Visible for testing
+    void alterSourceConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                     Map<Map<String, ?>, Map<String, ?>> 
offsets, ConnectorOffsetBackingStore offsetStore,
+                                     KafkaProducer<byte[], byte[]> producer, 
OffsetStorageWriter offsetWriter,
+                                     ClassLoader connectorLoader, 
Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SourceConnector) 
connector).alterOffsets(connectorConfig, offsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                offsetStore.configure(config);
+                // This reads to the end of the offsets topic and can be a 
potentially time-consuming operation
+                offsetStore.start();
+
+                // The alterSourceConnectorOffsets method should only be 
called after all the connector's tasks have been stopped, and it's
+                // safe to write offsets via an offset writer
+                offsets.forEach(offsetWriter::offset);
+
+                // We can call begin flush without a timeout because this 
newly created single-purpose offset writer can't do concurrent
+                // offset writes. We can also ignore the return value since it 
returns false if and only if there is no data to be flushed,
+                // and we've just put some data in the previous statement
+                offsetWriter.beginFlush();
+
+                try {
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.initTransactions();
+                        producer.beginTransaction();
+                    }
+                    log.debug("Committing the following partition offsets for 
source connector {}: {}", connName, offsets);
+                    FutureCallback<Void> offsetWriterCallback = new 
FutureCallback<>();
+                    offsetWriter.doFlush(offsetWriterCallback);
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.commitTransaction();
+                    }
+                    
offsetWriterCallback.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+                } catch (ExecutionException e) {
+                    throw new ConnectException("Failed to alter offsets for 
source connector " + connName, e.getCause());

Review Comment:
   We could, but I'm wondering whether we should (for sink connectors as well) 
- these exceptions will surface synchronously as REST API responses and I don't 
think the partitions add much value since the API requests themselves will 
contain the partitions. I've updated the sink connector error messages.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java:
##########
@@ -52,4 +53,82 @@ public static ConnectorOffsets 
consumerGroupOffsetsToConnectorOffsets(Map<TopicP
 
         return new ConnectorOffsets(connectorOffsets);
     }
+
+    /**
+     * Validate that the provided partitions (keys in the {@code 
partitionOffsets} map) look like:
+     * <pre>
+     *     {
+     *       "kafka_topic": "topic"
+     *       "kafka_partition": 3
+     *     }
+     * </pre>
+     *
+     * and that the provided offsets (values in the {@code partitionOffsets} 
map) look like:
+     * <pre>
+     *     {
+     *       "kafka_offset": 1000
+     *     }
+     * </pre>
+     *
+     * This method then parses them into a mapping from {@link 
TopicPartition}s to their corresponding {@link Long}
+     * valued offsets.
+     *
+     * @param partitionOffsets the partitions to offset map that needs to be 
validated and parsed.
+     * @return the parsed mapping from {@link TopicPartition} to its 
corresponding {@link Long} valued offset.
+     *
+     * @throws BadRequestException if the provided offsets aren't in the 
expected format
+     */
+    public static Map<TopicPartition, Long> 
validateAndParseSinkConnectorOffsets(Map<Map<String, ?>, Map<String, ?>> 
partitionOffsets) {

Review Comment:
   Thanks, that makes sense, I've renamed the method. I've kept the expected 
formats for the partitions and offsets in the Javadoc though, as I feel like 
they'll be handy to reference at the parsing points.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be 
altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> 
connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> 
offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+                alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", 
connName);
+                alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> 
offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = 
connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, 
sinkConnectorConfig,
+                        sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = 
KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group 
topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for 
sink connector " + connName);
+                            // When a consumer group is non-empty, only group 
members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an 
UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink 
tasks haven't stopped
+                            // completely or if the connector is resumed while 
the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName + " either because 
its tasks " +
+                                                "haven't stopped completely 
yet or the connector was resumed before the request to alter its offsets could 
be successfully " +
+                                                "completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the " +
+                                                "Connect cluster may need to 
be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for topic partitions " + 
offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, 
error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for 
the following topic partitions using an admin client for sink connector {}: 
{}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            
deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin 
for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for 
certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the 
consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while 
the alter offsets request is being processed).
+                                    if (error2 instanceof 
GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new 
ConnectException("Failed to alter consumer group offsets for connector " + 
connName + " either because its tasks " +
+                                                        "haven't stopped 
completely yet or the connector was resumed before the request to alter its 
offsets could be successfully " +
+                                                        "completed. If the 
connector is in a stopped state, this operation can be safely retried. If it 
doesn't eventually succeed, the " +
+                                                        "Connect cluster may 
need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new 
ConnectException("Failed to delete consumer group offsets for topic partitions 
" + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    
completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for 
sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, 
cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter 
offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to 
be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be 
overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    private void alterSourceConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                             Map<Map<String, ?>, Map<String, 
?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new 
ConnectorTaskId(connName, 0), config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId)
+                : baseProducerConfigs(connName, "connector-offset-producer-" + 
connName, config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId);
+        KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+        ConnectorOffsetBackingStore offsetStore = 
config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, 
connName, connector, producer)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, 
connector, producer);
+
+        OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, connName, internalKeyConverter, 
internalValueConverter);
+        alterSourceConnectorOffsets(connName, connector, connectorConfig, 
offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
+    }
+
+    // Visible for testing
+    void alterSourceConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                     Map<Map<String, ?>, Map<String, ?>> 
offsets, ConnectorOffsetBackingStore offsetStore,
+                                     KafkaProducer<byte[], byte[]> producer, 
OffsetStorageWriter offsetWriter,
+                                     ClassLoader connectorLoader, 
Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SourceConnector) 
connector).alterOffsets(connectorConfig, offsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                offsetStore.configure(config);
+                // This reads to the end of the offsets topic and can be a 
potentially time-consuming operation
+                offsetStore.start();

Review Comment:
   Ah, good catch, thanks. 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be 
altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> 
connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> 
offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+                alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", 
connName);
+                alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> 
offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = 
connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, 
sinkConnectorConfig,
+                        sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = 
KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group 
topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for 
sink connector " + connName);
+                            // When a consumer group is non-empty, only group 
members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an 
UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink 
tasks haven't stopped
+                            // completely or if the connector is resumed while 
the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName + " either because 
its tasks " +
+                                                "haven't stopped completely 
yet or the connector was resumed before the request to alter its offsets could 
be successfully " +
+                                                "completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the " +
+                                                "Connect cluster may need to 
be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for topic partitions " + 
offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, 
error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for 
the following topic partitions using an admin client for sink connector {}: 
{}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            
deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin 
for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for 
certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the 
consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while 
the alter offsets request is being processed).
+                                    if (error2 instanceof 
GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new 
ConnectException("Failed to alter consumer group offsets for connector " + 
connName + " either because its tasks " +
+                                                        "haven't stopped 
completely yet or the connector was resumed before the request to alter its 
offsets could be successfully " +
+                                                        "completed. If the 
connector is in a stopped state, this operation can be safely retried. If it 
doesn't eventually succeed, the " +
+                                                        "Connect cluster may 
need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new 
ConnectException("Failed to delete consumer group offsets for topic partitions 
" + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    
completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for 
sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, 
cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter 
offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to 
be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be 
overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    private void alterSourceConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                             Map<Map<String, ?>, Map<String, 
?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new 
ConnectorTaskId(connName, 0), config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId)
+                : baseProducerConfigs(connName, "connector-offset-producer-" + 
connName, config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId);
+        KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+        ConnectorOffsetBackingStore offsetStore = 
config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, 
connName, connector, producer)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, 
connector, producer);
+
+        OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, connName, internalKeyConverter, 
internalValueConverter);
+        alterSourceConnectorOffsets(connName, connector, connectorConfig, 
offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
+    }
+
+    // Visible for testing
+    void alterSourceConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                     Map<Map<String, ?>, Map<String, ?>> 
offsets, ConnectorOffsetBackingStore offsetStore,
+                                     KafkaProducer<byte[], byte[]> producer, 
OffsetStorageWriter offsetWriter,
+                                     ClassLoader connectorLoader, 
Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SourceConnector) 
connector).alterOffsets(connectorConfig, offsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                offsetStore.configure(config);
+                // This reads to the end of the offsets topic and can be a 
potentially time-consuming operation
+                offsetStore.start();
+
+                // The alterSourceConnectorOffsets method should only be 
called after all the connector's tasks have been stopped, and it's
+                // safe to write offsets via an offset writer
+                offsets.forEach(offsetWriter::offset);
+
+                // We can call begin flush without a timeout because this 
newly created single-purpose offset writer can't do concurrent
+                // offset writes. We can also ignore the return value since it 
returns false if and only if there is no data to be flushed,
+                // and we've just put some data in the previous statement
+                offsetWriter.beginFlush();
+
+                try {
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.initTransactions();
+                        producer.beginTransaction();
+                    }
+                    log.debug("Committing the following partition offsets for 
source connector {}: {}", connName, offsets);
+                    FutureCallback<Void> offsetWriterCallback = new 
FutureCallback<>();
+                    offsetWriter.doFlush(offsetWriterCallback);
+                    if (config.exactlyOnceSourceEnabled()) {
+                        producer.commitTransaction();
+                    }
+                    
offsetWriterCallback.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);

Review Comment:
   Hm this implementation was intentional; I thought it'd make sense to follow 
the same pattern as regular offset commits for source connectors (both regular 
and EoS). Any particular reason we'd want stricter guarantees (i.e. ensuring 
that the write to the global offsets topic is successful) for offset commits 
initiated via the REST API versus the connector tasks themselves?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to