yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1205023533


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testAlterOffsetsConnectorNotFound() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof NotFoundException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsConnectorNotInStoppedState() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof BadRequestException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsNotLeader() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();

Review Comment:
   Yeah, it does seem like it is intentional - apparently `expectLastCall` 
(both `EasyMock` and `PowerMock`) is optional for void methods and is mainly 
used for clarity. FWIW, I was able to get the test to pass even after removing 
the last one (after the call to `member.poll()`).



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/SinkUtilsTest.java:
##########
@@ -46,4 +52,124 @@ public void testConsumerGroupOffsetsToConnectorOffsets() {
         expectedPartition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
         assertEquals(expectedPartition, 
connectorOffsets.offsets().get(0).partition());
     }
+
+    @Test
+    public void testValidateAndParseEmptyPartitionOffsetMap() {
+        // expect no exception to be thrown
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(new HashMap<>());
+        assertTrue(parsedOffsets.isEmpty());
+    }
+
+    @Test
+    public void testValidateAndParseInvalidPartition() {
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic");
+        Map<String, Object> offset = new HashMap<>();
+        offset.put(SinkUtils.KAFKA_OFFSET_KEY, 100);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(partition, offset);
+
+        // missing partition key
+        ConnectException e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+        assertThat(e.getMessage(), containsString("The partition for a sink 
connector offset must contain the keys 'kafka_topic' and 'kafka_partition'"));
+
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, "not a number");
+        // bad partition key
+        e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+        assertThat(e.getMessage(), containsString("Failed to parse the 
following Kafka partition value in the provided offsets: 'not a number'"));
+
+        partition.remove(SinkUtils.KAFKA_TOPIC_KEY);
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, "5");
+        // missing topic key
+        e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+        assertThat(e.getMessage(), containsString("The partition for a sink 
connector offset must contain the keys 'kafka_topic' and 'kafka_partition'"));
+    }
+
+    @Test
+    public void testValidateAndParseInvalidOffset() {
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic");
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, 10);
+        Map<String, Object> offset = new HashMap<>();
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(partition, offset);
+
+        // missing offset key
+        ConnectException e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+        assertThat(e.getMessage(), containsString("The offset for a sink 
connector should either be null or contain the key 'kafka_offset'"));
+
+        // bad offset key
+        offset.put(SinkUtils.KAFKA_OFFSET_KEY, "not a number");
+        e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+        assertThat(e.getMessage(), containsString("Failed to parse the 
following Kafka offset value in the provided offsets: 'not a number'"));
+    }
+
+    @Test
+    public void testValidateAndParseStringPartitionValue() {
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = 
createPartitionOffsetMap("topic", "10", "100");
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets);
+        assertEquals(1, parsedOffsets.size());
+        TopicPartition tp = parsedOffsets.keySet().iterator().next();
+        assertEquals(10, tp.partition());
+    }
+
+    @Test
+    public void testValidateAndParseIntegerPartitionValue() {
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = 
createPartitionOffsetMap("topic", 10, "100");
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets);
+        assertEquals(1, parsedOffsets.size());
+        TopicPartition tp = parsedOffsets.keySet().iterator().next();
+        assertEquals(10, tp.partition());
+    }
+
+    @Test
+    public void testValidateAndParseStringOffsetValue() {
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = 
createPartitionOffsetMap("topic", "10", "100");
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets);
+        assertEquals(1, parsedOffsets.size());
+        Long offsetValue = parsedOffsets.values().iterator().next();
+        assertEquals(100L, offsetValue.longValue());
+    }
+
+    @Test
+    public void testValidateAndParseIntegerOffsetValue() {
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = 
createPartitionOffsetMap("topic", "10", 100);
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets);
+        assertEquals(1, parsedOffsets.size());
+        Long offsetValue = parsedOffsets.values().iterator().next();
+        assertEquals(100L, offsetValue.longValue());
+    }
+
+    @Test
+    public void testNullOffset() {
+        Map<String, Object> partitionMap = new HashMap<>();
+        partitionMap.put(SinkUtils.KAFKA_TOPIC_KEY, "topic");
+        partitionMap.put(SinkUtils.KAFKA_PARTITION_KEY, 10);
+        Map<Map<String, ?>, Map<String, ?>> partitionOffsets = new HashMap<>();
+        partitionOffsets.put(partitionMap, null);
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets);
+        assertEquals(1, parsedOffsets.size());
+        assertNull(parsedOffsets.values().iterator().next());
+    }
+
+    @Test
+    public void testNullPartition() {

Review Comment:
   Ah, I hadn't noticed that we're treating null topic names as the Kafka topic 
"null". While that is a valid Kafka topic name, it should be provided as the 
string value "null" and it makes more sense to invalidate a null topic name. 
I've updated the `parseSinkConnectorOffsets` method to include this check and 
added a new test for this case, thanks!



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testAlterOffsetsConnectorNotFound() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof NotFoundException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsConnectorNotInStoppedState() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof BadRequestException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsNotLeader() throws Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+        herder.tick();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertTrue(e.getCause() instanceof NotLeaderException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws 
Exception {
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Now handle the alter connector offsets request
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
+        worker.alterConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture));
+        Message msg = new Message("The offsets for this connector have been 
altered successfully");
+        EasyMock.expectLastCall().andAnswer(() -> {
+            workerCallbackCapture.getValue().onCompletion(null, msg);
+            return null;
+        });
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, offsets, callback);
+        herder.tick();
+        assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
+        assertEquals("The offsets for this connector have been altered 
successfully", msg.message());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAlterOffsetsSourceConnectorExactlyOnceEnabled() throws 
Exception {
+        // Setup herder with exactly-once support for source connectors enabled
+        herder = exactlyOnceHerder();
+        rebalanceListener = herder.new RebalanceListener(time);
+        PowerMock.expectPrivate(herder, 
"updateDeletedConnectorStatus").andVoid().anyTimes();
+        PowerMock.expectPrivate(herder, 
"updateDeletedTaskStatus").andVoid().anyTimes();
+
+        // Get the initial assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall().anyTimes();
+
+        // Now handle the alter connector offsets request
+        Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
+        member.wakeup();
+        PowerMock.expectLastCall().anyTimes();
+        member.ensureActive();
+        PowerMock.expectLastCall().anyTimes();
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        
EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SOURCE).anyTimes();
+
+        // Expect a round of zombie fencing to occur
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+        KafkaFuture<Void> workerFencingFuture = 
EasyMock.mock(KafkaFuture.class);
+        KafkaFuture<Void> herderFencingFuture = 
EasyMock.mock(KafkaFuture.class);
+        EasyMock.expect(worker.fenceZombies(CONN1, 
SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1), 
CONN1_CONFIG)).andReturn(workerFencingFuture);
+        
EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void,
 Void>>anyObject())).andReturn(herderFencingFuture);
+
+        // Two fencing callbacks are added - one is in ZombieFencing::start 
itself to remove the connector from the active
+        // fencing list. The other is the callback passed from 
DistributedHerder::alterConnectorOffsets in order to
+        // queue up the actual alter offsets request if the zombie fencing 
succeeds.
+        for (int i = 0; i < 2; i++) {
+            Capture<KafkaFuture.BiConsumer<Void, Throwable>> 
herderFencingCallback = EasyMock.newCapture();
+            
EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(()
 -> {
+                herderFencingCallback.getValue().accept(null, null);
+                return null;
+            });
+        }
+
+        Capture<Callback<Message>> workerCallbackCapture = 
Capture.newInstance();
+        Message msg = new Message("The offsets for this connector have been 
altered successfully");
+        worker.alterConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            workerCallbackCapture.getValue().onCompletion(null, msg);
+            return null;
+        });
+
+        // Handle the second alter connector offsets request. No zombie 
fencing request to the worker is expected now since we
+        // already did a round of zombie fencing last time and no new tasks 
came up in the meanwhile. The config snapshot is
+        // refreshed once at the beginning of the 
DistributedHerder::alterConnectorOffsets method, once before checking
+        // whether zombie fencing is required, and once before actually 
proceeding to alter connector offsets.
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
+        expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED);
+        Capture<Callback<Message>> workerCallbackCapture2 = 
Capture.newInstance();
+        worker.alterConnectorOffsets(EasyMock.eq(CONN1), 
EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), 
capture(workerCallbackCapture2));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            workerCallbackCapture2.getValue().onCompletion(null, msg);
+            return null;
+        });
+
+        PowerMock.replayAll(workerFencingFuture, herderFencingFuture);
+
+        herder.tick();
+        FutureCallback<Message> callback = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, offsets, callback);
+        // Process the zombie fencing request
+        herder.tick();
+        // Process the alter offsets request
+        herder.tick();
+        assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
+
+        FutureCallback<Message> callback2 = new FutureCallback<>();
+        herder.alterConnectorOffsets(CONN1, offsets, callback2);
+        herder.tick();
+        assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void 
testAlterOffsetsSourceConnectorExactlyOnceEnabledZombieFencingFailure() throws 
Exception {

Review Comment:
   Thanks! Yeah, I hadn't added one for sink connectors because in the current 
shape it's identical to the non-EoS source case. However, additional test cases 
don't hurt and if the behavior ever diverges in the future for sink connectors 
versus non-EoS source connectors, it'll be easier if there's already a test 
case in place to update. The newly added test looks slightly messy (due to the 
need to reset previous expectations and add new ones in the EasyMock / 
PowerMock paradigm) but will look cleaner when I migrate it to Mockito along 
with the other tests here.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be 
altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> 
connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> 
offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+                alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", 
connName);
+                alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> 
offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = 
connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, 
sinkConnectorConfig,
+                        sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = 
KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {
+                    if (!offsetsToAlter.isEmpty()) {
+                        log.debug("Committing the following consumer group 
topic partition offsets using an admin client for sink connector {}: {}.",
+                                connName, offsetsToAlter);
+                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
+                                alterConsumerGroupOffsetsOptions);
+
+                        adminFuture = alterConsumerGroupOffsetsResult.all();
+                    }
+
+                    adminFuture.whenComplete((ignored, error) -> {
+                        if (error != null) {
+                            Utils.closeQuietly(admin, "Offset alter admin for 
sink connector " + connName);
+                            // When a consumer group is non-empty, only group 
members can commit offsets. The above attempt to alter offsets via the admin
+                            // client will result in an 
UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink 
tasks haven't stopped
+                            // completely or if the connector is resumed while 
the alter offsets request is being processed)
+                            if (error instanceof UnknownMemberIdException) {
+                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for connector " + connName + " either because 
its tasks " +
+                                                "haven't stopped completely 
yet or the connector was resumed before the request to alter its offsets could 
be successfully " +
+                                                "completed. If the connector 
is in a stopped state, this operation can be safely retried. If it doesn't 
eventually succeed, the " +
+                                                "Connect cluster may need to 
be restarted to get rid of the zombie sink tasks."),
+                                        null);
+                            } else {
+                                cb.onCompletion(new ConnectException("Failed 
to alter consumer group offsets for topic partitions " + 
offsetsToAlter.keySet() + " for " +
+                                                "connector " + connName, 
error),
+                                        null);
+                            }
+                        } else if (!partitionsToReset.isEmpty()) {
+                            log.debug("Deleting the consumer group offsets for 
the following topic partitions using an admin client for sink connector {}: 
{}.",
+                                    connName, partitionsToReset);
+                            DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
+                                    (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
+                                    deleteConsumerGroupOffsetsOptions);
+                            
deleteConsumerGroupOffsetsResult.all().whenComplete((ignored2, error2) -> {
+                                Utils.closeQuietly(admin, "Offset alter admin 
for sink connector " + connName);
+                                if (error2 != null) {
+                                    // The attempt to delete offsets for 
certain topic partitions via the admin client will result in a
+                                    // GroupSubscribedToTopicException if the 
consumer group is non-empty (i.e. if the sink tasks haven't stopped completely
+                                    // or if the connector is resumed while 
the alter offsets request is being processed).
+                                    if (error2 instanceof 
GroupSubscribedToTopicException) {
+                                        cb.onCompletion(new 
ConnectException("Failed to alter consumer group offsets for connector " + 
connName + " either because its tasks " +
+                                                        "haven't stopped 
completely yet or the connector was resumed before the request to alter its 
offsets could be successfully " +
+                                                        "completed. If the 
connector is in a stopped state, this operation can be safely retried. If it 
doesn't eventually succeed, the " +
+                                                        "Connect cluster may 
need to be restarted to get rid of the zombie sink tasks."),
+                                                null);
+                                    } else {
+                                        cb.onCompletion(new 
ConnectException("Failed to delete consumer group offsets for topic partitions 
" + partitionsToReset + " for connector "
+                                                        + connName, error2),
+                                                null);
+                                    }
+                                } else {
+                                    
completeAlterOffsetsCallback(alterOffsetsResult, cb);
+                                }
+                            });
+                        } else {
+                            Utils.closeQuietly(admin, "Offset alter admin for 
sink connector " + connName);
+                            completeAlterOffsetsCallback(alterOffsetsResult, 
cb);
+                        }
+                    });
+                } catch (Throwable t) {
+                    Utils.closeQuietly(admin, "Offset alter admin for sink 
connector " + connName);
+                    throw t;
+                }
+            } catch (Throwable t) {
+                cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter 
offsets for sink connector " + connName), null);
+            }
+        }));
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     *
+     * @param connName the name of the source connector whose offsets are to 
be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be 
overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    private void alterSourceConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                             Map<Map<String, ?>, Map<String, 
?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new 
ConnectorTaskId(connName, 0), config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId)
+                : baseProducerConfigs(connName, "connector-offset-producer-" + 
connName, config, sourceConfig,
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId);
+        KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+        ConnectorOffsetBackingStore offsetStore = 
config.exactlyOnceSourceEnabled()
+                ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, 
connName, connector, producer)
+                : offsetStoreForRegularSourceConnector(sourceConfig, connName, 
connector, producer);
+
+        OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, connName, internalKeyConverter, 
internalValueConverter);
+        alterSourceConnectorOffsets(connName, connector, connectorConfig, 
offsets, offsetStore, producer, offsetWriter, connectorLoader, cb);
+    }
+
+    // Visible for testing
+    void alterSourceConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                     Map<Map<String, ?>, Map<String, ?>> 
offsets, ConnectorOffsetBackingStore offsetStore,
+                                     KafkaProducer<byte[], byte[]> producer, 
OffsetStorageWriter offsetWriter,
+                                     ClassLoader connectorLoader, 
Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SourceConnector) 
connector).alterOffsets(connectorConfig, offsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                offsetStore.configure(config);
+                // This reads to the end of the offsets topic and can be a 
potentially time-consuming operation
+                offsetStore.start();

Review Comment:
   Ah okay, I get it now, thanks. I've moved the `configure` call to the top 
level `alterSourceConnectorOffsets` method and moved the finally block where 
cleanup occurs to the top level try catch in the overloaded method (this is 
because the `KafkaBasedLog` is instantiated in the `configure` method and we 
need it to ensure resource cleanup in the call to 
`KafkaOffsetBackingStore::stop`). This also assumes that the call to 
`KafkaOffsetBackingStore::configure` won't throw an exception because at this 
point the connector tasks should already have successfully configured and 
started their own offset stores. 
   
   On a side note, it doesn't look like we have exception handling in place for 
calls to `KafkaOffsetBackingStore::configure` elsewhere either and could lead 
to leaking clients created for the store.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to