C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1204681794


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testAlterOffsetsConnectorNotFound() throws Exception {

Review Comment:
   Nit: it's a little strange that we're using "not found" in the name here but 
elsewhere in this and the `StandaloneHerderTest` suite we use "unknown"



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsetsTest.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConnectorOffsetsTest {
+
+    @Test
+    public void testConnectorOffsetsToMap() {
+        // Using arbitrary partition and offset formats here to demonstrate 
that source connector offsets don't
+        // follow a standard pattern
+        Map<String, Object> partition1 = new HashMap<>();
+        partition1.put("partitionKey1", "partitionValue");
+        partition1.put("k", 123);
+        Map<String, Object> offset1 = new HashMap<>();
+        offset1.put("offset", 3.14);

Review Comment:
   Yum!



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/SinkUtilsTest.java:
##########
@@ -46,4 +52,124 @@ public void testConsumerGroupOffsetsToConnectorOffsets() {
         expectedPartition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
         assertEquals(expectedPartition, 
connectorOffsets.offsets().get(0).partition());
     }
+
+    @Test
+    public void testValidateAndParseEmptyPartitionOffsetMap() {
+        // expect no exception to be thrown
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(new HashMap<>());
+        assertTrue(parsedOffsets.isEmpty());
+    }
+
+    @Test
+    public void testValidateAndParseInvalidPartition() {
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic");
+        Map<String, Object> offset = new HashMap<>();
+        offset.put(SinkUtils.KAFKA_OFFSET_KEY, 100);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(partition, offset);
+
+        // missing partition key
+        ConnectException e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+        assertThat(e.getMessage(), containsString("The partition for a sink 
connector offset must contain the keys 'kafka_topic' and 'kafka_partition'"));
+
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, "not a number");
+        // bad partition key
+        e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+        assertThat(e.getMessage(), containsString("Failed to parse the 
following Kafka partition value in the provided offsets: 'not a number'"));
+
+        partition.remove(SinkUtils.KAFKA_TOPIC_KEY);
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, "5");
+        // missing topic key
+        e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+        assertThat(e.getMessage(), containsString("The partition for a sink 
connector offset must contain the keys 'kafka_topic' and 'kafka_partition'"));
+    }
+
+    @Test
+    public void testValidateAndParseInvalidOffset() {
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic");
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, 10);
+        Map<String, Object> offset = new HashMap<>();
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(partition, offset);
+
+        // missing offset key
+        ConnectException e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+        assertThat(e.getMessage(), containsString("The offset for a sink 
connector should either be null or contain the key 'kafka_offset'"));
+
+        // bad offset key
+        offset.put(SinkUtils.KAFKA_OFFSET_KEY, "not a number");
+        e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+        assertThat(e.getMessage(), containsString("Failed to parse the 
following Kafka offset value in the provided offsets: 'not a number'"));
+    }
+
+    @Test
+    public void testValidateAndParseStringPartitionValue() {
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = 
createPartitionOffsetMap("topic", "10", "100");
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets);
+        assertEquals(1, parsedOffsets.size());
+        TopicPartition tp = parsedOffsets.keySet().iterator().next();
+        assertEquals(10, tp.partition());
+    }
+
+    @Test
+    public void testValidateAndParseIntegerPartitionValue() {
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = 
createPartitionOffsetMap("topic", 10, "100");
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets);
+        assertEquals(1, parsedOffsets.size());
+        TopicPartition tp = parsedOffsets.keySet().iterator().next();
+        assertEquals(10, tp.partition());
+    }
+
+    @Test
+    public void testValidateAndParseStringOffsetValue() {
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = 
createPartitionOffsetMap("topic", "10", "100");
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets);
+        assertEquals(1, parsedOffsets.size());
+        Long offsetValue = parsedOffsets.values().iterator().next();
+        assertEquals(100L, offsetValue.longValue());
+    }
+
+    @Test
+    public void testValidateAndParseIntegerOffsetValue() {
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = 
createPartitionOffsetMap("topic", "10", 100);
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets);
+        assertEquals(1, parsedOffsets.size());
+        Long offsetValue = parsedOffsets.values().iterator().next();
+        assertEquals(100L, offsetValue.longValue());
+    }
+
+    @Test
+    public void testNullOffset() {
+        Map<String, Object> partitionMap = new HashMap<>();
+        partitionMap.put(SinkUtils.KAFKA_TOPIC_KEY, "topic");
+        partitionMap.put(SinkUtils.KAFKA_PARTITION_KEY, 10);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(partitionMap, null);
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets);
+        assertEquals(1, parsedOffsets.size());
+        assertNull(parsedOffsets.values().iterator().next());
+    }
+
+    @Test
+    public void testNullPartition() {

Review Comment:
   These tests are fantastic for the most part. For the sake of completeness, 
should we include a case with a null topic name as well?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1889,6 +1902,352 @@ public void testGetSourceConnectorOffsetsError() {
         verifyKafkaClusterId();
     }
 
+    @Test
+    public void testAlterOffsetsConnectorDoesNotSupportOffsetAlteration() {
+        mockKafkaClusterId();
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        mockGenericIsolation();
+        when(plugins.newConnector(anyString())).thenReturn(sourceConnector);
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't 
" +
+                "support altering of offsets"));
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterConnectorOffsets(CONNECTOR_ID, connectorProps,
+                
Collections.singletonMap(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
+                cb);
+
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
cb.get(1000, TimeUnit.MILLISECONDS));
+        assertEquals(ConnectException.class, e.getCause().getClass());
+        assertEquals("Failed to alter offsets for connector " + CONNECTOR_ID + 
" because it doesn't support external modification of offsets",
+                e.getCause().getMessage());
+
+        verifyGenericIsolation();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAlterOffsetsSourceConnector() throws Exception {
+        mockKafkaClusterId();
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenReturn(true);
+        ConnectorOffsetBackingStore offsetStore = 
mock(ConnectorOffsetBackingStore.class);
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class);
+
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue"));
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue"));
+
+        when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
+            invocation.getArgument(0, Callback.class).onCompletion(null, null);
+            return null;
+        });
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, 
connectorProps, partitionOffsets, offsetStore, producer,
+                offsetWriter, Thread.currentThread().getContextClassLoader(), 
cb);
+        assertEquals("The offsets for this connector have been altered 
successfully", cb.get(1000, TimeUnit.MILLISECONDS).message());
+
+        verify(offsetStore).configure(config);
+        verify(offsetStore).start();
+        partitionOffsets.forEach((partition, offset) -> 
verify(offsetWriter).offset(partition, offset));
+        verify(offsetWriter).beginFlush();
+        verify(offsetWriter).doFlush(any());
+        verify(offsetStore).stop();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAlterOffsetsSourceConnectorError() throws Exception {
+        mockKafkaClusterId();
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenReturn(true);
+        ConnectorOffsetBackingStore offsetStore = 
mock(ConnectorOffsetBackingStore.class);
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class);
+
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue"));
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue"));
+
+        when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
+            invocation.getArgument(0, Callback.class).onCompletion(new 
RuntimeException("Test exception"), null);
+            return null;
+        });
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, 
connectorProps, partitionOffsets, offsetStore, producer,
+                offsetWriter, Thread.currentThread().getContextClassLoader(), 
cb);
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
cb.get(1000, TimeUnit.MILLISECONDS).message());
+        assertEquals(ConnectException.class, e.getCause().getClass());
+
+        verify(offsetStore).configure(config);
+        verify(offsetStore).start();
+        partitionOffsets.forEach((partition, offset) -> 
verify(offsetWriter).offset(partition, offset));
+        verify(offsetWriter).beginFlush();
+        verify(offsetWriter).doFlush(any());
+        verify(offsetStore).stop();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    public void testAlterOffsetsSinkConnectorNoResets() throws Exception {
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10");
+        partitionOffsets.put(partition1, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 500));
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20");
+        partitionOffsets.put(partition2, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 100));
+
+        // A null value for deleteOffsetsSetCapture indicates that we don't 
expect any call to Admin::deleteConsumerGroupOffsets
+        alterOffsetsSinkConnector(partitionOffsets, alterOffsetsMapCapture, 
null);
+
+        assertEquals(2, alterOffsetsMapCapture.getValue().size());
+        assertEquals(500, alterOffsetsMapCapture.getValue().get(new 
TopicPartition("test_topic", 10)).offset());
+        assertEquals(100, alterOffsetsMapCapture.getValue().get(new 
TopicPartition("test_topic", 20)).offset());
+    }
+
+    @Test
+    public void testAlterOffsetSinkConnectorOnlyResets() throws Exception {
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = 
ArgumentCaptor.forClass(Set.class);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10");
+        partitionOffsets.put(partition1, null);
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20");
+        partitionOffsets.put(partition2, null);
+
+        // A null value for alterOffsetsMapCapture indicates that we don't 
expect any call to Admin::alterConsumerGroupOffsets
+        alterOffsetsSinkConnector(partitionOffsets, null, 
deleteOffsetsSetCapture);
+
+        Set<TopicPartition> expectedTopicPartitionsForOffsetDelete = new 
HashSet<>();
+        expectedTopicPartitionsForOffsetDelete.add(new 
TopicPartition("test_topic", 10));
+        expectedTopicPartitionsForOffsetDelete.add(new 
TopicPartition("test_topic", 20));
+
+        assertEquals(2, deleteOffsetsSetCapture.getValue().size());
+
+        // Verify that contents are equal without caring about order
+        
assertTrue(expectedTopicPartitionsForOffsetDelete.containsAll(deleteOffsetsSetCapture.getValue()));
+        
assertTrue(deleteOffsetsSetCapture.getValue().containsAll(expectedTopicPartitionsForOffsetDelete));

Review Comment:
   Can't we simplify this by comparing the two `Set` instances for equality?
   
   ```suggestion
           // Verify that contents are equal without caring about order
           assertEquals(expectedTopicPartitionsForOffsetDelete, 
deleteOffsetsSetCapture.getValue());
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1515,6 +1520,79 @@ public void connectorOffsets(String connName, 
Callback<ConnectorOffsets> cb) {
         );
     }
 
+    @Override
+    public void alterConnectorOffsets(String connName, Map<Map<String, ?>, 
Map<String, ?>> offsets, Callback<Message> callback) {
+        log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+        addRequest(() -> {
+            if (!alterConnectorOffsetsChecks(connName, callback)) {
+                return null;
+            }
+            // At this point, we should be the leader (the call to 
alterConnectorOffsetsChecks makes sure of that) and can safely run
+            // a zombie fencing request
+            if (isSourceConnector(connName) && 
config.exactlyOnceSourceEnabled()) {
+                log.debug("Performing a round of zombie fencing before 
altering offsets for source connector {} with exactly-once support enabled.", 
connName);
+                doFenceZombieSourceTasks(connName, (error, ignored) -> {
+                    if (error != null) {
+                        log.error("Failed to perform zombie fencing for source 
connector prior to altering offsets", error);
+                        callback.onCompletion(new ConnectException("Failed to 
perform zombie fencing for source connector prior to altering offsets",
+                                error), null);
+                    } else {
+                        log.debug("Successfully completed zombie fencing for 
source connector {}; proceeding to alter offsets.", connName);
+                        // We need to ensure that we perform the necessary 
checks again before proceeding to actually altering the connector offsets since
+                        // zombie fencing is done asynchronously and the 
conditions could have changed since the previous check
+                        addRequest(() -> {
+                            if (alterConnectorOffsetsChecks(connName, 
callback)) {
+                                worker.alterConnectorOffsets(connName, 
configState.connectorConfig(connName), offsets, callback);
+                            }
+                            return null;
+                        }, forwardErrorCallback(callback));
+                    }
+                });
+            } else {
+                worker.alterConnectorOffsets(connName, 
configState.connectorConfig(connName), offsets, callback);
+            }
+            return null;
+        }, forwardErrorCallback(callback));
+    }
+
+    /**
+     * This method performs a few checks for alter connector offsets request 
and completes the callback exceptionally
+     * if any check fails.
+     * @param connName the name of the connector whose offsets are to be 
altered
+     * @param callback callback to invoke upon completion
+     * @return true if all the checks passed, false otherwise
+     */
+    private boolean alterConnectorOffsetsChecks(String connName, 
Callback<Message> callback) {
+        if (checkRebalanceNeeded(callback)) {
+            return false;
+        }
+
+        if (!isLeader()) {
+            callback.onCompletion(new NotLeaderException("Only the leader can 
process alter offsets requests", leaderUrl()), null);
+            return false;
+        }
+
+        if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
+            throw new ConnectException("Failed to read to end of config topic 
before altering connector offsets");
+        }
+
+        if (!configState.contains(connName)) {
+            callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found", null), null);
+            return false;
+        }
+
+        // If the target state for the connector is stopped, its task count is 
0, and there is no rebalance pending (checked above),
+        // we can be sure that the tasks have at least been attempted to be 
stopped (or cancelled if they took too long to stop).
+        // Zombie tasks are handled by a round of zombie fencing for exactly 
once source connectors. Zombie sink tasks are handled
+        // naturally because requests to alter consumer group offsets will 
fail if there are still active members in the group.
+        if (configState.targetState(connName) != TargetState.STOPPED || 
configState.taskCount(connName) != 0) {
+            callback.onCompletion(new BadRequestException("The connector needs 
to be stopped before its offsets can be altered"), null);

Review Comment:
   We should be really careful about the wording in this message (and the one 
in `StandaloneHerder`), since "stopping" a connector used to be synonymous with 
"deleting" one.
   
   A quick sketch of a more-informative error message:
   > "Connectors must be in the STOPPED state before their offsets can be 
altered. This can be done for the specified connector by issuing a PUT request 
to the /connectors/" + connName + "/stop endpoint.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1889,6 +1902,352 @@ public void testGetSourceConnectorOffsetsError() {
         verifyKafkaClusterId();
     }
 
+    @Test
+    public void testAlterOffsetsConnectorDoesNotSupportOffsetAlteration() {
+        mockKafkaClusterId();
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        mockGenericIsolation();
+        when(plugins.newConnector(anyString())).thenReturn(sourceConnector);
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't 
" +
+                "support altering of offsets"));
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterConnectorOffsets(CONNECTOR_ID, connectorProps,
+                
Collections.singletonMap(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
+                cb);
+
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
cb.get(1000, TimeUnit.MILLISECONDS));
+        assertEquals(ConnectException.class, e.getCause().getClass());
+        assertEquals("Failed to alter offsets for connector " + CONNECTOR_ID + 
" because it doesn't support external modification of offsets",
+                e.getCause().getMessage());
+
+        verifyGenericIsolation();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAlterOffsetsSourceConnector() throws Exception {
+        mockKafkaClusterId();
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenReturn(true);
+        ConnectorOffsetBackingStore offsetStore = 
mock(ConnectorOffsetBackingStore.class);
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class);
+
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue"));
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue"));
+
+        when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
+            invocation.getArgument(0, Callback.class).onCompletion(null, null);
+            return null;
+        });
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, 
connectorProps, partitionOffsets, offsetStore, producer,
+                offsetWriter, Thread.currentThread().getContextClassLoader(), 
cb);
+        assertEquals("The offsets for this connector have been altered 
successfully", cb.get(1000, TimeUnit.MILLISECONDS).message());
+
+        verify(offsetStore).configure(config);
+        verify(offsetStore).start();
+        partitionOffsets.forEach((partition, offset) -> 
verify(offsetWriter).offset(partition, offset));
+        verify(offsetWriter).beginFlush();
+        verify(offsetWriter).doFlush(any());
+        verify(offsetStore).stop();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAlterOffsetsSourceConnectorError() throws Exception {
+        mockKafkaClusterId();
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenReturn(true);
+        ConnectorOffsetBackingStore offsetStore = 
mock(ConnectorOffsetBackingStore.class);
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class);
+
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue"));
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue"));
+
+        when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
+            invocation.getArgument(0, Callback.class).onCompletion(new 
RuntimeException("Test exception"), null);
+            return null;
+        });
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, 
connectorProps, partitionOffsets, offsetStore, producer,
+                offsetWriter, Thread.currentThread().getContextClassLoader(), 
cb);
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
cb.get(1000, TimeUnit.MILLISECONDS).message());
+        assertEquals(ConnectException.class, e.getCause().getClass());
+
+        verify(offsetStore).configure(config);
+        verify(offsetStore).start();
+        partitionOffsets.forEach((partition, offset) -> 
verify(offsetWriter).offset(partition, offset));
+        verify(offsetWriter).beginFlush();
+        verify(offsetWriter).doFlush(any());
+        verify(offsetStore).stop();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    public void testAlterOffsetsSinkConnectorNoResets() throws Exception {
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10");
+        partitionOffsets.put(partition1, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 500));
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20");
+        partitionOffsets.put(partition2, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 100));
+
+        // A null value for deleteOffsetsSetCapture indicates that we don't 
expect any call to Admin::deleteConsumerGroupOffsets
+        alterOffsetsSinkConnector(partitionOffsets, alterOffsetsMapCapture, 
null);
+
+        assertEquals(2, alterOffsetsMapCapture.getValue().size());
+        assertEquals(500, alterOffsetsMapCapture.getValue().get(new 
TopicPartition("test_topic", 10)).offset());
+        assertEquals(100, alterOffsetsMapCapture.getValue().get(new 
TopicPartition("test_topic", 20)).offset());
+    }
+
+    @Test
+    public void testAlterOffsetSinkConnectorOnlyResets() throws Exception {
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = 
ArgumentCaptor.forClass(Set.class);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10");
+        partitionOffsets.put(partition1, null);
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20");
+        partitionOffsets.put(partition2, null);
+
+        // A null value for alterOffsetsMapCapture indicates that we don't 
expect any call to Admin::alterConsumerGroupOffsets
+        alterOffsetsSinkConnector(partitionOffsets, null, 
deleteOffsetsSetCapture);
+
+        Set<TopicPartition> expectedTopicPartitionsForOffsetDelete = new 
HashSet<>();
+        expectedTopicPartitionsForOffsetDelete.add(new 
TopicPartition("test_topic", 10));
+        expectedTopicPartitionsForOffsetDelete.add(new 
TopicPartition("test_topic", 20));
+
+        assertEquals(2, deleteOffsetsSetCapture.getValue().size());
+
+        // Verify that contents are equal without caring about order
+        
assertTrue(expectedTopicPartitionsForOffsetDelete.containsAll(deleteOffsetsSetCapture.getValue()));
+        
assertTrue(deleteOffsetsSetCapture.getValue().containsAll(expectedTopicPartitionsForOffsetDelete));
+    }
+
+    @Test
+    public void testAlterOffsetsSinkConnectorAltersAndResets() throws 
Exception {
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class);
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = 
ArgumentCaptor.forClass(Set.class);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10");
+        partitionOffsets.put(partition1, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100"));
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20");
+        partitionOffsets.put(partition2, null);
+
+        alterOffsetsSinkConnector(partitionOffsets, alterOffsetsMapCapture, 
deleteOffsetsSetCapture);
+
+        assertEquals(1, alterOffsetsMapCapture.getValue().size());
+        assertEquals(100, alterOffsetsMapCapture.getValue().get(new 
TopicPartition("test_topic", 10)).offset());
+
+        assertEquals(1, deleteOffsetsSetCapture.getValue().size());
+        assertEquals(new TopicPartition("test_topic", 20), 
deleteOffsetsSetCapture.getValue().iterator().next());

Review Comment:
   Same thought RE comparing `Set` instances for equality here



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1889,6 +1902,352 @@ public void testGetSourceConnectorOffsetsError() {
         verifyKafkaClusterId();
     }
 
+    @Test
+    public void testAlterOffsetsConnectorDoesNotSupportOffsetAlteration() {
+        mockKafkaClusterId();
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        mockGenericIsolation();
+        when(plugins.newConnector(anyString())).thenReturn(sourceConnector);
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't 
" +
+                "support altering of offsets"));
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterConnectorOffsets(CONNECTOR_ID, connectorProps,
+                
Collections.singletonMap(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")),
+                cb);
+
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
cb.get(1000, TimeUnit.MILLISECONDS));
+        assertEquals(ConnectException.class, e.getCause().getClass());
+        assertEquals("Failed to alter offsets for connector " + CONNECTOR_ID + 
" because it doesn't support external modification of offsets",
+                e.getCause().getMessage());
+
+        verifyGenericIsolation();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAlterOffsetsSourceConnector() throws Exception {
+        mockKafkaClusterId();
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenReturn(true);
+        ConnectorOffsetBackingStore offsetStore = 
mock(ConnectorOffsetBackingStore.class);
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class);
+
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue"));
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue"));
+
+        when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
+            invocation.getArgument(0, Callback.class).onCompletion(null, null);
+            return null;
+        });
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, 
connectorProps, partitionOffsets, offsetStore, producer,
+                offsetWriter, Thread.currentThread().getContextClassLoader(), 
cb);
+        assertEquals("The offsets for this connector have been altered 
successfully", cb.get(1000, TimeUnit.MILLISECONDS).message());
+
+        verify(offsetStore).configure(config);
+        verify(offsetStore).start();
+        partitionOffsets.forEach((partition, offset) -> 
verify(offsetWriter).offset(partition, offset));
+        verify(offsetWriter).beginFlush();
+        verify(offsetWriter).doFlush(any());
+        verify(offsetStore).stop();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAlterOffsetsSourceConnectorError() throws Exception {
+        mockKafkaClusterId();
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenReturn(true);
+        ConnectorOffsetBackingStore offsetStore = 
mock(ConnectorOffsetBackingStore.class);
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class);
+
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue"), Collections.singletonMap("offsetKey", "offsetValue"));
+        partitionOffsets.put(Collections.singletonMap("partitionKey", 
"partitionValue2"), Collections.singletonMap("offsetKey", "offsetValue"));
+
+        when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
+            invocation.getArgument(0, Callback.class).onCompletion(new 
RuntimeException("Test exception"), null);
+            return null;
+        });
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterSourceConnectorOffsets(CONNECTOR_ID, sourceConnector, 
connectorProps, partitionOffsets, offsetStore, producer,
+                offsetWriter, Thread.currentThread().getContextClassLoader(), 
cb);
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
cb.get(1000, TimeUnit.MILLISECONDS).message());
+        assertEquals(ConnectException.class, e.getCause().getClass());
+
+        verify(offsetStore).configure(config);
+        verify(offsetStore).start();
+        partitionOffsets.forEach((partition, offset) -> 
verify(offsetWriter).offset(partition, offset));
+        verify(offsetWriter).beginFlush();
+        verify(offsetWriter).doFlush(any());
+        verify(offsetStore).stop();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    public void testAlterOffsetsSinkConnectorNoResets() throws Exception {
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10");
+        partitionOffsets.put(partition1, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 500));
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20");
+        partitionOffsets.put(partition2, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 100));
+
+        // A null value for deleteOffsetsSetCapture indicates that we don't 
expect any call to Admin::deleteConsumerGroupOffsets
+        alterOffsetsSinkConnector(partitionOffsets, alterOffsetsMapCapture, 
null);
+
+        assertEquals(2, alterOffsetsMapCapture.getValue().size());
+        assertEquals(500, alterOffsetsMapCapture.getValue().get(new 
TopicPartition("test_topic", 10)).offset());
+        assertEquals(100, alterOffsetsMapCapture.getValue().get(new 
TopicPartition("test_topic", 20)).offset());
+    }
+
+    @Test
+    public void testAlterOffsetSinkConnectorOnlyResets() throws Exception {
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = 
ArgumentCaptor.forClass(Set.class);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10");
+        partitionOffsets.put(partition1, null);
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20");
+        partitionOffsets.put(partition2, null);
+
+        // A null value for alterOffsetsMapCapture indicates that we don't 
expect any call to Admin::alterConsumerGroupOffsets
+        alterOffsetsSinkConnector(partitionOffsets, null, 
deleteOffsetsSetCapture);
+
+        Set<TopicPartition> expectedTopicPartitionsForOffsetDelete = new 
HashSet<>();
+        expectedTopicPartitionsForOffsetDelete.add(new 
TopicPartition("test_topic", 10));
+        expectedTopicPartitionsForOffsetDelete.add(new 
TopicPartition("test_topic", 20));
+
+        assertEquals(2, deleteOffsetsSetCapture.getValue().size());
+
+        // Verify that contents are equal without caring about order
+        
assertTrue(expectedTopicPartitionsForOffsetDelete.containsAll(deleteOffsetsSetCapture.getValue()));
+        
assertTrue(deleteOffsetsSetCapture.getValue().containsAll(expectedTopicPartitionsForOffsetDelete));
+    }
+
+    @Test
+    public void testAlterOffsetsSinkConnectorAltersAndResets() throws 
Exception {
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
alterOffsetsMapCapture = ArgumentCaptor.forClass(Map.class);
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Set<TopicPartition>> deleteOffsetsSetCapture = 
ArgumentCaptor.forClass(Set.class);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10");
+        partitionOffsets.put(partition1, 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100"));
+        Map<String, String> partition2 = new HashMap<>();
+        partition2.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition2.put(SinkUtils.KAFKA_PARTITION_KEY, "20");
+        partitionOffsets.put(partition2, null);
+
+        alterOffsetsSinkConnector(partitionOffsets, alterOffsetsMapCapture, 
deleteOffsetsSetCapture);
+
+        assertEquals(1, alterOffsetsMapCapture.getValue().size());
+        assertEquals(100, alterOffsetsMapCapture.getValue().get(new 
TopicPartition("test_topic", 10)).offset());
+
+        assertEquals(1, deleteOffsetsSetCapture.getValue().size());
+        assertEquals(new TopicPartition("test_topic", 20), 
deleteOffsetsSetCapture.getValue().iterator().next());
+    }
+
+    private void alterOffsetsSinkConnector(Map<Map<String, ?>, Map<String, ?>> 
partitionOffsets,
+                                           ArgumentCaptor<Map<TopicPartition, 
OffsetAndMetadata>> alterOffsetsMapCapture,
+                                           ArgumentCaptor<Set<TopicPartition>> 
deleteOffsetsSetCapture) throws Exception {
+        mockKafkaClusterId();
+        String connectorClass = SampleSinkConnector.class.getName();
+        connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
+
+        Admin admin = mock(Admin.class);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newCachedThreadPool(),
+                allConnectorClientConfigOverridePolicy, config -> admin);
+        worker.start();
+
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sinkConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenReturn(true);
+
+        // If alterOffsetsMapCapture is null, then we won't stub any of the 
following methods resulting in test failures in case
+        // offsets for certain topic partitions are actually attempted to be 
altered.
+        if (alterOffsetsMapCapture != null) {
+            AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = 
mock(AlterConsumerGroupOffsetsResult.class);
+            when(admin.alterConsumerGroupOffsets(anyString(), 
alterOffsetsMapCapture.capture(), any(AlterConsumerGroupOffsetsOptions.class)))
+                    .thenReturn(alterConsumerGroupOffsetsResult);
+            KafkaFuture<Void> alterFuture = KafkaFuture.completedFuture(null);
+            
when(alterConsumerGroupOffsetsResult.all()).thenReturn(alterFuture);
+        }
+
+        // If deleteOffsetsSetCapture is null, then we won't stub any of the 
following methods resulting in test failures in case
+        // offsets for certain topic partitions are actually attempted to be 
deleted.
+        if (deleteOffsetsSetCapture != null) {
+            DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult 
= mock(DeleteConsumerGroupOffsetsResult.class);
+            when(admin.deleteConsumerGroupOffsets(anyString(), 
deleteOffsetsSetCapture.capture(), 
any(DeleteConsumerGroupOffsetsOptions.class)))
+                    .thenReturn(deleteConsumerGroupOffsetsResult);
+            KafkaFuture<Void> deleteFuture = KafkaFuture.completedFuture(null);
+            
when(deleteConsumerGroupOffsetsResult.all()).thenReturn(deleteFuture);
+        }
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, 
connectorProps, partitionOffsets,
+                Thread.currentThread().getContextClassLoader(), cb);
+        assertEquals("The offsets for this connector have been altered 
successfully", cb.get(1000, TimeUnit.MILLISECONDS).message());
+
+        verify(admin, timeout(1000)).close();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    public void testAlterOffsetsSinkConnectorAlterOffsetsError() throws 
Exception {
+        mockKafkaClusterId();
+        String connectorClass = SampleSinkConnector.class.getName();
+        connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
+
+        Admin admin = mock(Admin.class);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, config -> admin);
+        worker.start();
+
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sinkConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenReturn(true);
+
+        AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = 
mock(AlterConsumerGroupOffsetsResult.class);
+        when(admin.alterConsumerGroupOffsets(anyString(), anyMap(), 
any(AlterConsumerGroupOffsetsOptions.class)))
+                .thenReturn(alterConsumerGroupOffsetsResult);
+        KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
+        alterFuture.completeExceptionally(new 
ClusterAuthorizationException("Test exception"));
+        when(alterConsumerGroupOffsetsResult.all()).thenReturn(alterFuture);
+
+        Map<String, String> partition1 = new HashMap<>();
+        partition1.put(SinkUtils.KAFKA_TOPIC_KEY, "test_topic");
+        partition1.put(SinkUtils.KAFKA_PARTITION_KEY, "10");
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = 
Collections.singletonMap(partition1,
+                Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, "100"));
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.alterSinkConnectorOffsets(CONNECTOR_ID, sinkConnector, 
connectorProps, partitionOffsets,
+                Thread.currentThread().getContextClassLoader(), cb);
+
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
cb.get(1000, TimeUnit.MILLISECONDS));
+        assertEquals(ConnectException.class, e.getCause().getClass());
+
+        verify(admin, timeout(1000)).close();
+        verifyNoMoreInteractions(admin);
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")

Review Comment:
   Nit: unnecessary suppression
   ```suggestion
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testAlterOffsetsConnectorNotFound() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof NotFoundException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsConnectorNotInStoppedState() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof BadRequestException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsNotLeader() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();

Review Comment:
   So this part is missing a succeeding call to `PowerMock::expectLastCall`, 
but the test case passes anyways (and if we comment out this line, it fails).
   
   I did some experimenting and we can comment out every call to 
`PowerMock::expectLastCall` except for the last one on line 4216 and the test 
will still pass.
   
   Probably best to just add a call after this line for consistency's sake, but 
interesting behavior from PowerMock. If we weren't migrating to Mockito it 
might be worth looking into to see if that behavior is intentional and, if so, 
if we could leverage it to reduce noise in our test cases.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testAlterOffsetsConnectorNotFound() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof NotFoundException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsConnectorNotInStoppedState() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof BadRequestException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsNotLeader() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof NotLeaderException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws 
Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
+        worker.alterConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture));
+        Message msg = new Message("The offsets for this connector have been 
altered successfully");
+        EasyMock.expectLastCall().andAnswer(() -> {
+            workerCallbackCapture.getValue().onCompletion(null, msg);
+            return null;
+        });
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, offsets, callback);
+        herder.tick();
+        assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertEquals("The offsets for this connector have been altered 
successfully", msg.message());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsSourceConnectorExactlyOnceEnabled() throws 
Exception {
+        // Setup herder with exactly-once support for source connectors enabled
+        herder = exactlyOnceHerder();
+        rebalanceListener = herder.new RebalanceListener(time);
+        PowerMock.expectPrivate(herder, 
"updateDeletedConnectorStatus").andVoid().anyTimes();
+        PowerMock.expectPrivate(herder, 
"updateDeletedTaskStatus").andVoid().anyTimes();
+
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall().anyTimes();
+
+        // Now handle the alter connector offsets request
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        member.wakeup();
+        PowerMock.expectLastCall().anyTimes();
+        member.ensureActive();
+        PowerMock.expectLastCall().anyTimes();
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        
EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SOURCE).anyTimes();
+
+        // Expect a round of zombie fencing to occur
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        KafkaFuture<Void> workerFencingFuture = 
EasyMock.mock(KafkaFuture.class);
+        KafkaFuture<Void> herderFencingFuture = 
EasyMock.mock(KafkaFuture.class);
+        EasyMock.expect(worker.fenceZombies(CONN1, 
SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1), 
CONN1_CONFIG)).andReturn(workerFencingFuture);
+        
EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void,
 Void>>anyObject())).andReturn(herderFencingFuture);
+
+        // Two fencing callbacks are added - one is in ZombieFencing::start 
itself to remove the connector from the active
+        // fencing list. The other is the callback passed from 
DistributedHerder::alterConnectorOffsets in order to
+        // queue up the actual alter offsets request if the zombie fencing 
succeeds.
+        for (int i = 0; i < 2; i++) {
+            Capture<KafkaFuture.BiConsumer<Void, Throwable>> 
herderFencingCallback = EasyMock.newCapture();
+            
EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(()
 -> {
+                herderFencingCallback.getValue().accept(null, null);
+                return null;
+            });
+        }
+
+        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
+        Message msg = new Message("The offsets for this connector have been 
altered successfully");
+        worker.alterConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            workerCallbackCapture.getValue().onCompletion(null, msg);
+            return null;
+        });
+
+        // Handle the second alter connector offsets request. No zombie 
fencing request to the worker is expected now since we
+        // already did a round of zombie fencing last time and no new tasks 
came up in the meanwhile. The config snapshot is
+        // refreshed once at the beginning of the 
DistributedHerder::alterConnectorOffsets method, once before checking
+        // whether zombie fencing is required, and once before actually 
proceeding to alter connector offsets.
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
+        Capture<Callback<Message>> workerCallbackCapture2 = 
Capture.newInstance();
+        worker.alterConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture2));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            workerCallbackCapture2.getValue().onCompletion(null, msg);
+            return null;
+        });
+
+        PowerMock.replayAll(workerFencingFuture, herderFencingFuture);
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, offsets, callback);
+        // Process the zombie fencing request
+        herder.tick();
+        // Process the alter offsets request
+        herder.tick();
+        assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
+
+        FutureCallback<Message> callback2 = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, offsets, callback2);
+        herder.tick();
+        assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void 
testAlterOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() throws 
Exception {

Review Comment:
   This goes above and beyond what I was expecting for coverage here, great 
stuff 👍
   
   Do you think it's also worth adding a case for a sink connector, or is that 
too similar to the non-exactly-once source case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to