C0urante commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1237173042


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1819,16 +1825,25 @@ public void 
testGetSinkConnectorOffsetsAdminClientAsynchronousError() {
         verifyKafkaClusterId();
     }
 
-    @SuppressWarnings("unchecked")
     private void mockAdminListConsumerGroupOffsets(Admin admin, 
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets, Exception e) {
+        mockAdminListConsumerGroupOffsets(admin, consumerGroupOffsets, e, 
null, 0);
+    }
+
+    private void mockAdminListConsumerGroupOffsets(Admin admin, 
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets, Exception e, Time 
time, long delayMs) {
         ListConsumerGroupOffsetsResult result = 
mock(ListConsumerGroupOffsetsResult.class);
         when(admin.listConsumerGroupOffsets(anyString(), 
any(ListConsumerGroupOffsetsOptions.class))).thenReturn(result);
-        KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> adminFuture = 
mock(KafkaFuture.class);
-        when(result.partitionsToOffsetAndMetadata()).thenReturn(adminFuture);
-        when(adminFuture.whenComplete(any())).thenAnswer(invocation -> {
-            ((KafkaFuture.BiConsumer<Map<TopicPartition, OffsetAndMetadata>, 
Throwable>) invocation.getArgument(0))
-                    .accept(consumerGroupOffsets, e);
-            return null;
+        KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> adminFuture = 
new KafkaFutureImpl<>();
+        if (e != null) {
+            adminFuture.completeExceptionally(e);
+        } else {
+            adminFuture.complete(consumerGroupOffsets);
+        }
+        when(result.partitionsToOffsetAndMetadata()).thenAnswer(invocation -> {
+            if (time == null) {
+                return adminFuture;
+            }
+            time.sleep(delayMs);
+            return adminFuture;

Review Comment:
   Nit: can simplify this
   ```suggestion
               if (time != null)
                   time.sleep(delayMs);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -574,6 +574,274 @@ public void 
alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect,
                 "Source connector offsets should reflect the expected number 
of records produced");
     }
 
+    @Test
+    public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws 
Exception {
+        // Create a source connector and stop it
+        connect.configureConnector(CONNECTOR_NAME, 
baseSourceConnectorConfigs());
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+                "Connector tasks did not start in time.");
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+        String url = 
connect.endpointForResource(String.format("connectors/%s/offsets", 
CONNECTOR_NAME));
+
+        String content = "[]";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), containsString("Cannot 
deserialize value"));
+        }
+
+        content = "{}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(400, response.getStatus());
+            assertThat(response.getEntity().toString(), 
containsString("Partitions / offsets need to be provided for an alter offsets 
request"));
+        }
+
+        content = "{\"key\": []}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), 
containsString("Unrecognized field"));
+        }
+
+        content = "{\"offsets\": []}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(400, response.getStatus());
+            assertThat(response.getEntity().toString(), 
containsString("Partitions / offsets need to be provided for an alter offsets 
request"));
+        }
+
+        content = "{\"offsets\": {}}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), containsString("Cannot 
deserialize value"));
+        }
+
+        content = "{\"offsets\": [123]}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), containsString("Cannot 
construct instance"));
+        }
+
+        content = "{\"offsets\": [{\"key\": \"val\"}]}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), 
containsString("Unrecognized field"));
+        }
+
+        content = "{\"offsets\": [{\"partition\": []]}]}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), containsString("Cannot 
deserialize value"));
+        }
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsets() throws Exception {
+        resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), 
connect.kafka());
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() 
throws Exception {
+        Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+        
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.GROUP_ID_CONFIG,
+                "overridden-group-id");
+        resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
+        // Ensure that the overridden consumer group ID was the one actually 
used
+        try (Admin admin = connect.kafka().createAdminClient()) {
+            Collection<ConsumerGroupListing> consumerGroups = 
admin.listConsumerGroups().all().get();
+            assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing 
-> "overridden-group-id".equals(consumerGroupListing.groupId())));
+            assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing 
-> 
SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
+        }
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
+        EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
+
+        try (AutoCloseable ignored = kafkaCluster::stop) {
+            kafkaCluster.start();
+
+            Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+
+            resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster);
+        }
+    }
+
+    private void resetAndVerifySinkConnectorOffsets(Map<String, String> 
connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
+        int numPartitions = 3;
+        int numMessages = 10;
+        kafkaCluster.createTopic(TOPIC, numPartitions);
+
+        // Produce numMessages messages to each partition
+        for (int partition = 0; partition < numPartitions; partition++) {
+            for (int message = 0; message < numMessages; message++) {
+                kafkaCluster.produce(TOPIC, partition, "key", "value");
+            }
+        }
+        // Create sink connector
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+                "Connector tasks did not start in time.");
+
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions, numMessages,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Reset the sink connector's offsets
+        String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
+                "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
+
+        waitForEmptySinkConnectorOffsets(CONNECTOR_NAME);
+
+        // Reset the sink connector's offsets again while it is still in a 
STOPPED state and ensure that there is no error
+        response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
+                "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
+
+        waitForEmptySinkConnectorOffsets(CONNECTOR_NAME);
+
+        // Resume the connector and expect its offsets to catch up to the 
latest offsets
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                NUM_TASKS,
+                "Connector tasks did not resume in time"
+        );
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions, 10,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsZombieSinkTasks() throws 
Exception {
+        connect.kafka().createTopic(TOPIC, 1);
+
+        // Produce 10 messages
+        for (int message = 0; message < 10; message++) {
+            connect.kafka().produce(TOPIC, 0, "key", "value");
+        }
+
+        // Configure a sink connector whose sink task blocks in its stop method
+        Map<String, String> connectorConfigs = new HashMap<>();
+        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSinkConnector.class.getName());
+        connectorConfigs.put(TOPICS_CONFIG, TOPIC);
+        connectorConfigs.put("block", "Task::stop");
+
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+                "Connector tasks did not start in time.");
+
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, 10,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+
+        connect.stopConnector(CONNECTOR_NAME);
+
+        // Try to reset the offsets
+        ConnectRestException e = assertThrows(ConnectRestException.class, () 
-> connect.resetConnectorOffsets(CONNECTOR_NAME));
+        assertThat(e.getMessage(), containsString("zombie sink task"));
+    }
+
+    @Test
+    public void testResetSourceConnectorOffsets() throws Exception {
+        resetAndVerifySourceConnectorOffsets(connect, 
baseSourceConnectorConfigs());
+    }
+
+    @Test
+    public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws 
Exception {
+        Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
+        connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, 
"custom-offsets-topic");
+        resetAndVerifySourceConnectorOffsets(connect, connectorConfigs);
+    }
+
+    @Test
+    public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
+        EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
+
+        try (AutoCloseable ignored = kafkaCluster::stop) {
+            kafkaCluster.start();
+
+            Map<String, String> connectorConfigs = 
baseSourceConnectorConfigs();
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX 
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+
+            resetAndVerifySourceConnectorOffsets(connect, connectorConfigs);
+        }
+    }
+
+    @Test
+    public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled() 
throws Exception {
+        Properties brokerProps = new Properties();
+        brokerProps.put("transaction.state.log.replication.factor", "1");
+        brokerProps.put("transaction.state.log.min.isr", "1");
+        workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, 
"enabled");
+
+        // This embedded Connect cluster will internally spin up its own 
embedded Kafka cluster
+        EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new 
EmbeddedConnectCluster.Builder()

Review Comment:
   One more thought: in this case (and one other), we don't use the embedded 
Connect cluster created in the `@Before` method for this test suite. This is a 
little inefficient and can bloat test runtime.
   
   We can follow the same approach as the 
[ConnectWorkerIntegrationTest](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java),
 where we create an `EmbeddedConnectCluster.Builder` instance in the 
`@Before`-annotated method for the test suite, but only build and start the 
actual Connect cluster in each test case. It's a little more verbose but IMO 
it's worth the tradeoff since we gain more in decreased test time than we lose 
in extra lines of code.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions()
+                                .timeoutMs((int) timer.remainingMs());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    .get(timer.remainingMs(), 
TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            timer.update();
+                            log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, 
offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+                            log.error("Failed to list offsets prior to 
resetting offsets for sink connector {}", connName, e);
+                            cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting offsets for sink connector " + connName, e), 
null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        log.error("Failed to modify offsets for connector {} 
because it doesn't support external modification of offsets",
+                                connName, e);
+                        throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
                     }
+                    updateTimerAndCheckExpiry(timer, "Timed out while calling 
the 'alterOffsets' method for sink connector " + connName);

Review Comment:
   Should we add a note to `SourceConnector::alterOffsets` and 
`SinkConnector::alterOffsets` that this method may be invoked multiple times 
with the same arguments if an external operation (writing to the offsets topic, 
deleting a consumer group, etc.) fails? That way, developers can know to 
silently ignore requests to reset offsets that don't appear to be present based 
on the connector's own external tracking logic.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -2240,6 +2255,193 @@ public void 
testAlterOffsetsSinkConnectorSynchronousError() throws Exception {
         verifyKafkaClusterId();
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testResetOffsetsSourceConnectorExactlyOnceSupportEnabled() 
throws Exception {
+        Map<String, String> workerProps = new HashMap<>(this.workerProps);
+        workerProps.put("exactly.once.source.support", "enabled");
+        workerProps.put("bootstrap.servers", "localhost:9092");
+        workerProps.put("group.id", "connect-cluster");
+        workerProps.put("config.storage.topic", "connect-configs");
+        workerProps.put("offset.storage.topic", "connect-offsets");
+        workerProps.put("status.storage.topic", "connect-statuses");
+        config = new DistributedConfig(workerProps);
+        mockKafkaClusterId();
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, Executors.newSingleThreadExecutor(),
+                allConnectorClientConfigOverridePolicy, null);
+        worker.start();
+
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+        when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenReturn(true);
+        ConnectorOffsetBackingStore offsetStore = 
mock(ConnectorOffsetBackingStore.class);
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        OffsetStorageWriter offsetWriter = mock(OffsetStorageWriter.class);
+
+        Set<Map<String, Object>> connectorPartitions = new HashSet<>();
+        connectorPartitions.add(Collections.singletonMap("partitionKey1", new 
Object()));
+        connectorPartitions.add(Collections.singletonMap("partitionKey2", new 
Object()));
+        
when(offsetStore.connectorPartitions(eq(CONNECTOR_ID))).thenReturn(connectorPartitions);
+        when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
+            invocation.getArgument(0, Callback.class).onCompletion(null, null);
+            return null;
+        });
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.modifySourceConnectorOffsets(CONNECTOR_ID, sourceConnector, 
connectorProps, null, offsetStore, producer,
+                offsetWriter, Thread.currentThread().getContextClassLoader(), 
cb);
+        assertEquals("The offsets for this connector have been reset 
successfully", cb.get(1000, TimeUnit.MILLISECONDS).message());
+
+        InOrder inOrder = Mockito.inOrder(offsetStore, offsetWriter, producer);
+        inOrder.verify(offsetStore).start();
+        connectorPartitions.forEach(partition -> 
inOrder.verify(offsetWriter).offset(partition, null));
+        inOrder.verify(offsetWriter).beginFlush();
+        inOrder.verify(producer).initTransactions();
+        inOrder.verify(producer).beginTransaction();
+        inOrder.verify(offsetWriter).doFlush(any());
+        inOrder.verify(producer).commitTransaction();
+        inOrder.verify(offsetStore, timeout(1000)).stop();
+        verifyKafkaClusterId();
+    }
+
+    @Test
+    public void testResetOffsetsSinkConnector() throws Exception {
+        mockKafkaClusterId();
+        String connectorClass = SampleSinkConnector.class.getName();
+        connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
+
+        Admin admin = mock(Admin.class);
+        Time time = new MockTime();
+        worker = new Worker(WORKER_ID, time, plugins, config, 
offsetBackingStore, Executors.newCachedThreadPool(),
+                allConnectorClientConfigOverridePolicy, config -> admin);
+        worker.start();
+
+        when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
+
+        TopicPartition tp = new TopicPartition("test-topic", 0);
+        mockAdminListConsumerGroupOffsets(admin, Collections.singletonMap(tp, 
new OffsetAndMetadata(10L)), null, time, 2000);
+        when(sinkConnector.alterOffsets(eq(connectorProps), 
eq(Collections.singletonMap(tp, null)))).thenAnswer(invocation -> {
+            time.sleep(3000);
+            return true;
+        });
+
+        DeleteConsumerGroupsResult deleteConsumerGroupsResult = 
mock(DeleteConsumerGroupsResult.class);
+        when(admin.deleteConsumerGroups(anyCollection(), 
any(DeleteConsumerGroupsOptions.class))).thenReturn(deleteConsumerGroupsResult);
+        
when(deleteConsumerGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(null));
+
+        FutureCallback<Message> cb = new FutureCallback<>();
+        worker.modifySinkConnectorOffsets(CONNECTOR_ID, sinkConnector, 
connectorProps, null,
+                Thread.currentThread().getContextClassLoader(), cb);
+        assertEquals("The offsets for this connector have been reset 
successfully", cb.get(1000, TimeUnit.MILLISECONDS).message());
+
+        ArgumentCaptor<DeleteConsumerGroupsOptions> 
deleteConsumerGroupsOptionsArgumentCaptor = 
ArgumentCaptor.forClass(DeleteConsumerGroupsOptions.class);
+        verify(admin).deleteConsumerGroups(anyCollection(), 
deleteConsumerGroupsOptionsArgumentCaptor.capture());
+        // Expect the call to Admin::deleteConsumerGroups to have a timeout 
value equal to the overall timeout value of DEFAULT_REST_REQUEST_TIMEOUT_MS
+        // minus the delay introduced in the call to 
Admin::listConsumerGroupOffsets (2000 ms) and the delay introduced in the call 
to
+        // SinkConnector::alterOffsets (3000 ms)
+        assertEquals((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS - 
2000L - 3000L,

Review Comment:
   Very nice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to