yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1224138055
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName,
Connector connector, Map<String,
Admin admin = adminFactory.apply(adminConfig);
try {
- List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
- Map<TopicPartition, OffsetAndMetadata> offsetsToAlter =
parsedOffsets.entrySet()
- .stream()
- .filter(entry -> entry.getValue() != null)
- .collect(Collectors.toMap(Map.Entry::getKey, e ->
new OffsetAndMetadata(e.getValue())));
-
- if (!offsetsToAlter.isEmpty()) {
- log.debug("Committing the following consumer group
offsets using an admin client for sink connector {}: {}.",
- connName, offsetsToAlter);
- AlterConsumerGroupOffsetsOptions
alterConsumerGroupOffsetsOptions = new
AlterConsumerGroupOffsetsOptions().timeoutMs(
+ Map<TopicPartition, Long> offsetsToWrite;
+ if (isReset) {
+ offsetsToWrite = new HashMap<>();
+ ListConsumerGroupOffsetsOptions
listConsumerGroupOffsetsOptions = new
ListConsumerGroupOffsetsOptions().timeoutMs(
(int)
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
- AlterConsumerGroupOffsetsResult
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId,
offsetsToAlter,
- alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+ try {
+ admin.listConsumerGroupOffsets(groupId,
listConsumerGroupOffsetsOptions)
+ .partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .forEach((topicPartition,
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+ log.debug("Found the following topic partitions
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+ connName, groupId,
offsetsToWrite.keySet());
+ } catch (Exception e) {
+ Utils.closeQuietly(admin, "Offset reset admin for
sink connector " + connName);
+ log.error("Failed to list offsets prior to
resetting sink connector offsets", e);
+ cb.onCompletion(new ConnectException("Failed to
list offsets prior to resetting sink connector offsets", e), null);
+ return;
+ }
+ } else {
+ offsetsToWrite =
SinkUtils.parseSinkConnectorOffsets(offsets);
}
- Set<TopicPartition> partitionsToReset =
parsedOffsets.entrySet()
- .stream()
- .filter(entry -> entry.getValue() == null)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- if (!partitionsToReset.isEmpty()) {
- log.debug("Deleting the consumer group offsets for the
following topic partitions using an admin client for sink connector {}: {}.",
- connName, partitionsToReset);
- DeleteConsumerGroupOffsetsOptions
deleteConsumerGroupOffsetsOptions = new
DeleteConsumerGroupOffsetsOptions().timeoutMs(
- (int)
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
- DeleteConsumerGroupOffsetsResult
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId,
partitionsToReset,
- deleteConsumerGroupOffsetsOptions);
+ boolean alterOffsetsResult;
+ try {
+ alterOffsetsResult = ((SinkConnector)
connector).alterOffsets(connectorConfig, offsetsToWrite);
+ } catch (UnsupportedOperationException e) {
+ throw new ConnectException("Failed to modify offsets
for connector " + connName + " because it doesn't support external " +
+ "modification of offsets", e);
+ }
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+ // This should only occur for an offset reset request when:
+ // 1. There was a prior attempt to reset offsets
+ // OR
+ // 2. No offsets have been committed yet
+ if (offsetsToWrite.isEmpty()) {
Review Comment:
I've made this change but it also brought me to the realization that we will
have undesirable behavior if the connector is targeting a Kafka cluster
different from the Connect cluster's backing Kafka cluster and the user has
configured the consumer overrides appropriately for their connector, but not
the admin overrides (something we also discussed previously
[here](https://github.com/apache/kafka/pull/13434#discussion_r1144415671)).
In the above case, if a user attempts to reset their sink connector's
offsets via the `DELETE /connectors/{connector}/offsets` endpoint, the
following will occur:
1. We list the consumer group offsets via `Admin::listConsumerGroupOffsets`
which returns an empty partition offsets map for the sink connector's consumer
group ID (it exists on a different Kafka cluster to the one that the admin
client is connecting to).
2. We call `SinkConnector::alterOffsets` with an empty offsets map which
could cause the sink connector to propagate the offsets reset related changes
to the sink system.
3. We attempt to delete the consumer group via `Admin::
deleteConsumerGroups` which returns `GroupIdNotFoundException` which we
essentially swallow in order to keep offsets reset operations idempotent and
return a success message to the user (even though the real consumer group for
the sink connector on the other Kafka cluster hasn't been deleted).
This will occur if the connector's admin overrides are missing OR the admin
overrides are deliberately configured to target a Kafka cluster different from
the consumer overrides (although like you pointed out in the other linked
thread, this doesn't seem like a valid use case that we'd even want to
support).
I guess we'd want to pursue the approach you suggested where we'd configure
the admin client with a combination of the connector's admin overrides and
consumer overrides?
Another option could potentially be to somehow verify that the
`admin.override.bootstrap.servers` in the connector's config /
`admin.bootstrap.servers` in the worker config / `bootstrap.servers` in the
worker config (in order of preference) correspond to the same Kafka cluster as
`consumer.override.bootstrap.servers` in the connector's config /
`consumer.bootstrap.servers` in the worker config / `bootstrap.servers` in the
worker config (in order of preference) and fail the request if we are able to
reliably determine that they aren't pointing to the same Kafka clusters? I'm
not sure that this is a feasible approach however.
Yet another option could be to remove the idempotency guarantee from the
`DELETE /connectors/{connector}/offsets` endpoint and if we encounter a
`GroupIdNotFoundException` from `Admin::deleteConsumerGroups`, return an error
message to the user indicating that either the offsets have already been reset
previously or else they might need to check their connector's admin overrides
(this does seem fairly ugly though).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]