yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1224138055
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting sink connector offsets", e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); + return; + } + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } - Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + // This should only occur for an offset reset request when: + // 1. There was a prior attempt to reset offsets + // OR + // 2. No offsets have been committed yet + if (offsetsToWrite.isEmpty()) { Review Comment: I've made this change but it also brought me to the realization that we will have undesirable behavior if the connector is targeting a Kafka cluster different from the Connect cluster's backing Kafka cluster and the user has configured the consumer overrides appropriately for their connector, but not the admin overrides (something we also discussed previously [here](https://github.com/apache/kafka/pull/13434#discussion_r1144415671)). In the above case, if a user attempts to reset their sink connector's offsets via the `DELETE /connectors/{connector}/offsets` endpoint, the following will occur: 1. We list the consumer group offsets via `Admin::listConsumerGroupOffsets` which returns an empty partition offsets map for the sink connector's consumer group ID (it exists on a different Kafka cluster to the one that the admin client is connecting to). 2. We call `SinkConnector::alterOffsets` with an empty offsets map which could cause the sink connector to propagate the offsets reset related changes to the sink system. 3. We attempt to delete the consumer group via `Admin::deleteConsumerGroups` which returns `GroupIdNotFoundException` which we essentially swallow in order to keep offsets reset operations idempotent and return a success message to the user (even though the real consumer group for the sink connector on the other Kafka cluster hasn't been deleted). This will occur if the connector's admin overrides are missing OR the admin overrides are deliberately configured to target a Kafka cluster different from the consumer overrides (although like you pointed out in the other linked thread, this doesn't seem like a valid use case that we'd even want to support). I guess we'd want to pursue the approach you suggested where we'd configure the admin client with a combination of the connector's admin overrides and consumer overrides? Another option could potentially be to somehow verify that the `admin.override.bootstrap.servers` in the connector's config / `admin.bootstrap.servers` in the worker config / `bootstrap.servers` in the worker config (in order of preference) correspond to the same Kafka cluster as `consumer.override.bootstrap.servers` in the connector's config / `consumer.bootstrap.servers` in the worker config / `bootstrap.servers` in the worker config (in order of preference) and fail the request if we are able to reliably determine that they aren't pointing to the same Kafka clusters? I'm not sure that this is a feasible approach however. Yet another option could be to remove the idempotency guarantee from the `DELETE /connectors/{connector}/offsets` endpoint and if we encounter a `GroupIdNotFoundException` from `Admin::deleteConsumerGroups`, return an error message to the user indicating that either the offsets have already been reset previously or else they might need to check their connector's admin overrides (this does seem fairly ugly though). Edit: A more elegant way might be to switch the offset reset mechanism from deleting the consumer group to deleting the offsets for all topic partitions via `Admin::deleteConsumerGroupOffsets` (similar to what we do for the `PATCH /connectors/{connector}/offsets` endpoint when the offset for a partition is specified as `null`). This way we could explicitly check for existence of the sink connector's consumer group prior to listing its offsets and fail requests if the consumer group doesn't exist (the minor down-side is that this will require an additional admin client request). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org