yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1224138055


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map<String,
                 Admin admin = adminFactory.apply(adminConfig);
 
                 try {
-                    List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() != null)
-                            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue())));
-
-                    if (!offsetsToAlter.isEmpty()) {
-                        log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-                                connName, offsetsToAlter);
-                        AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+                    Map<TopicPartition, Long> offsetsToWrite;
+                    if (isReset) {
+                        offsetsToWrite = new HashMap<>();
+                        ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
                                 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-                                alterConsumerGroupOffsetsOptions);
-
-                        
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+                        try {
+                            admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+                                    .partitionsToOffsetAndMetadata()
+                                    
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                                    .forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+                            log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+                                    connName, groupId, 
offsetsToWrite.keySet());
+                        } catch (Exception e) {
+                            Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+                            log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+                            cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+                            return;
+                        }
+                    } else {
+                        offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
                     }
 
-                    Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
-                            .stream()
-                            .filter(entry -> entry.getValue() == null)
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-
-                    if (!partitionsToReset.isEmpty()) {
-                        log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-                                connName, partitionsToReset);
-                        DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-                                (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-                        DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-                                deleteConsumerGroupOffsetsOptions);
+                    boolean alterOffsetsResult;
+                    try {
+                        alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+                    } catch (UnsupportedOperationException e) {
+                        throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+                                "modification of offsets", e);
+                    }
 
-                        
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+                    // This should only occur for an offset reset request when:
+                    // 1. There was a prior attempt to reset offsets
+                    // OR
+                    // 2. No offsets have been committed yet
+                    if (offsetsToWrite.isEmpty()) {

Review Comment:
   I've made this change but it also brought me to the realization that we will 
have undesirable behavior if the connector is targeting a Kafka cluster 
different from the Connect cluster's backing Kafka cluster and the user has 
configured the consumer overrides appropriately for their connector, but not 
the admin overrides (something we also discussed previously 
[here](https://github.com/apache/kafka/pull/13434#discussion_r1144415671)). 
   
   In the above case, if a user attempts to reset their sink connector's 
offsets via the `DELETE /connectors/{connector}/offsets` endpoint, the 
following will occur:
   
   1. We list the consumer group offsets via `Admin::listConsumerGroupOffsets` 
which returns an empty partition offsets map for the sink connector's consumer 
group ID (it exists on a different Kafka cluster to the one that the admin 
client is connecting to).
   2. We call `SinkConnector::alterOffsets` with an empty offsets map which 
could cause the sink connector to propagate the offsets reset related changes 
to the sink system.
   3. We attempt to delete the consumer group via `Admin::deleteConsumerGroups` 
which returns `GroupIdNotFoundException` which we essentially swallow in order 
to keep offsets reset operations idempotent and return a success message to the 
user (even though the real consumer group for the sink connector on the other 
Kafka cluster hasn't been deleted).
   
   This will occur if the connector's admin overrides are missing OR the admin 
overrides are deliberately configured to target a Kafka cluster different from 
the consumer overrides (although like you pointed out in the other linked 
thread, this doesn't seem like a valid use case that we'd even want to 
support). 
   
   I guess we'd want to pursue the approach you suggested where we'd configure 
the admin client with a combination of the connector's admin overrides and 
consumer overrides? 
   
   Another option could potentially be to somehow verify that the 
`admin.override.bootstrap.servers` in the connector's config / 
`admin.bootstrap.servers` in the worker config / `bootstrap.servers` in the 
worker config (in order of preference) correspond to the same Kafka cluster as 
`consumer.override.bootstrap.servers` in the connector's config / 
`consumer.bootstrap.servers` in the worker config / `bootstrap.servers` in the 
worker config (in order of preference) and fail the request if we are able to 
reliably determine that they aren't pointing to the same Kafka clusters? I'm 
not sure that this is a feasible approach however.
   
   Yet another option could be to remove the idempotency guarantee from the 
`DELETE /connectors/{connector}/offsets` endpoint and if we encounter a 
`GroupIdNotFoundException` from `Admin::deleteConsumerGroups`, return an error 
message to the user indicating that either the offsets have already been reset 
previously or else they might need to check their connector's admin overrides 
(this does seem fairly ugly though).
   
   Edit: A more elegant way might be to switch the offset reset mechanism from 
deleting the consumer group to deleting the offsets for all topic partitions 
via `Admin::deleteConsumerGroupOffsets` (similar to what we do for the `PATCH 
/connectors/{connector}/offsets` endpoint when the offset for a partition is 
specified as `null`). This way we could explicitly check for existence of the 
sink connector's consumer group prior to listing its offsets and fail requests 
if the consumer group doesn't exist (the minor down-side is that this will 
require an additional admin client request).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to