yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1238244318


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions()
+                                .timeoutMs((int) timer.remainingMs());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    .get(timer.remainingMs(), 
TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            timer.update();
+                            log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, 
offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+                            log.error("Failed to list offsets prior to 
resetting offsets for sink connector {}", connName, e);
+                            cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting offsets for sink connector " + connName, e), 
null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        log.error("Failed to modify offsets for connector {} 
because it doesn't support external modification of offsets",
+                                connName, e);
+                        throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
                     }
+                    updateTimerAndCheckExpiry(timer, "Timed out while calling 
the 'alterOffsets' method for sink connector " + connName);

Review Comment:
   Hm good point, we really don't want to get into a situation where let's say 
a call to `SinkConnector::alterOffsets` succeeds but the call to delete the 
consumer group fails and then any subsequent external requests to delete the 
connector's offsets fails because the `SinkConnector::alterOffsets` method 
bails out..
   
   On the other hand, I'm also concerned about making the Javadoc for those 
methods overly complicated. Furthermore, since we'll be calling `alterOffsets` 
on separate `Connector` instances, it's not like they can track state between 
multiple calls. Any suggestions on the wording so that we can maintain the 
right balance? Maybe we could add a hint that offsets that appear non-existent 
should be handled gracefully if possible (this also covers the case where an 
alter offsets request is used to skip over some data for sink connectors)?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -574,6 +574,274 @@ public void 
alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect,
                 "Source connector offsets should reflect the expected number 
of records produced");
     }
 
+    @Test
+    public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws 
Exception {
+        // Create a source connector and stop it
+        connect.configureConnector(CONNECTOR_NAME, 
baseSourceConnectorConfigs());
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+                "Connector tasks did not start in time.");
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+        String url = 
connect.endpointForResource(String.format("connectors/%s/offsets", 
CONNECTOR_NAME));
+
+        String content = "[]";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), containsString("Cannot 
deserialize value"));
+        }
+
+        content = "{}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(400, response.getStatus());
+            assertThat(response.getEntity().toString(), 
containsString("Partitions / offsets need to be provided for an alter offsets 
request"));
+        }
+
+        content = "{\"key\": []}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), 
containsString("Unrecognized field"));
+        }
+
+        content = "{\"offsets\": []}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(400, response.getStatus());
+            assertThat(response.getEntity().toString(), 
containsString("Partitions / offsets need to be provided for an alter offsets 
request"));
+        }
+
+        content = "{\"offsets\": {}}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), containsString("Cannot 
deserialize value"));
+        }
+
+        content = "{\"offsets\": [123]}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), containsString("Cannot 
construct instance"));
+        }
+
+        content = "{\"offsets\": [{\"key\": \"val\"}]}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), 
containsString("Unrecognized field"));
+        }
+
+        content = "{\"offsets\": [{\"partition\": []]}]}";
+        try (Response response = connect.requestPatch(url, content)) {
+            assertEquals(500, response.getStatus());
+            assertThat(response.getEntity().toString(), containsString("Cannot 
deserialize value"));
+        }
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsets() throws Exception {
+        resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), 
connect.kafka());
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() 
throws Exception {
+        Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+        
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.GROUP_ID_CONFIG,
+                "overridden-group-id");
+        resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
+        // Ensure that the overridden consumer group ID was the one actually 
used
+        try (Admin admin = connect.kafka().createAdminClient()) {
+            Collection<ConsumerGroupListing> consumerGroups = 
admin.listConsumerGroups().all().get();
+            assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing 
-> "overridden-group-id".equals(consumerGroupListing.groupId())));
+            assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing 
-> 
SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
+        }
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
+        EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
+
+        try (AutoCloseable ignored = kafkaCluster::stop) {
+            kafkaCluster.start();
+
+            Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+
+            resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster);
+        }
+    }
+
+    private void resetAndVerifySinkConnectorOffsets(Map<String, String> 
connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
+        int numPartitions = 3;
+        int numMessages = 10;
+        kafkaCluster.createTopic(TOPIC, numPartitions);
+
+        // Produce numMessages messages to each partition
+        for (int partition = 0; partition < numPartitions; partition++) {
+            for (int message = 0; message < numMessages; message++) {
+                kafkaCluster.produce(TOPIC, partition, "key", "value");
+            }
+        }
+        // Create sink connector
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+                "Connector tasks did not start in time.");
+
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions, numMessages,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Reset the sink connector's offsets
+        String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
+                "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
+
+        waitForEmptySinkConnectorOffsets(CONNECTOR_NAME);
+
+        // Reset the sink connector's offsets again while it is still in a 
STOPPED state and ensure that there is no error
+        response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+        assertThat(response, containsString("The Connect framework-managed 
offsets for this connector have been reset successfully. " +
+                "However, if this connector manages offsets externally, they 
will need to be manually reset in the system that the connector uses."));
+
+        waitForEmptySinkConnectorOffsets(CONNECTOR_NAME);
+
+        // Resume the connector and expect its offsets to catch up to the 
latest offsets
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                NUM_TASKS,
+                "Connector tasks did not resume in time"
+        );
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 
numPartitions, 10,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+    }
+
+    @Test
+    public void testResetSinkConnectorOffsetsZombieSinkTasks() throws 
Exception {
+        connect.kafka().createTopic(TOPIC, 1);
+
+        // Produce 10 messages
+        for (int message = 0; message < 10; message++) {
+            connect.kafka().produce(TOPIC, 0, "key", "value");
+        }
+
+        // Configure a sink connector whose sink task blocks in its stop method
+        Map<String, String> connectorConfigs = new HashMap<>();
+        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSinkConnector.class.getName());
+        connectorConfigs.put(TOPICS_CONFIG, TOPIC);
+        connectorConfigs.put("block", "Task::stop");
+
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+                "Connector tasks did not start in time.");
+
+        waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, 10,
+                "Sink connector consumer group offsets should catch up to the 
topic end offsets");
+
+        connect.stopConnector(CONNECTOR_NAME);
+
+        // Try to reset the offsets
+        ConnectRestException e = assertThrows(ConnectRestException.class, () 
-> connect.resetConnectorOffsets(CONNECTOR_NAME));
+        assertThat(e.getMessage(), containsString("zombie sink task"));
+    }
+
+    @Test
+    public void testResetSourceConnectorOffsets() throws Exception {
+        resetAndVerifySourceConnectorOffsets(connect, 
baseSourceConnectorConfigs());
+    }
+
+    @Test
+    public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws 
Exception {
+        Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
+        connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, 
"custom-offsets-topic");
+        resetAndVerifySourceConnectorOffsets(connect, connectorConfigs);
+    }
+
+    @Test
+    public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted() 
throws Exception {
+        EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new 
Properties());
+
+        try (AutoCloseable ignored = kafkaCluster::stop) {
+            kafkaCluster.start();
+
+            Map<String, String> connectorConfigs = 
baseSourceConnectorConfigs();
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX 
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+            
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                    kafkaCluster.bootstrapServers());
+
+            resetAndVerifySourceConnectorOffsets(connect, connectorConfigs);
+        }
+    }
+
+    @Test
+    public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled() 
throws Exception {
+        Properties brokerProps = new Properties();
+        brokerProps.put("transaction.state.log.replication.factor", "1");
+        brokerProps.put("transaction.state.log.min.isr", "1");
+        workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, 
"enabled");
+
+        // This embedded Connect cluster will internally spin up its own 
embedded Kafka cluster
+        EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new 
EmbeddedConnectCluster.Builder()

Review Comment:
   Hah, I was considering doing the same thing earlier but discarded it 
thinking that it'd add more noise for review (since we'd touch existing test 
cases too). Thanks for the push in the right direction (I agree that adding a 
couple of lines of code to tests here and there is worth it for reducing test 
runtime bloat); I've made this change 👍 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
                                 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, 
offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+                            log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+                            cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
+                    }
 
-                        
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    // This should only occur for an offset reset request when:
+                    // 1. There was a prior attempt to reset offsets
+                    // OR
+                    // 2. No offsets have been committed yet
+                    if (offsetsToWrite.isEmpty()) {

Review Comment:
   Yeah, that makes sense. I've filed this follow-up Jira ticket so that we can 
re-visit if necessary - https://issues.apache.org/jira/browse/KAFKA-15113 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to