yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1221937078
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String connName, Callback<Message> c // If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above), // we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop). // Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled - // naturally because requests to alter consumer group offsets will fail if there are still active members in the group. + // naturally because requests to alter consumer group offsets / delete consumer groups will fail if there are still active members + // in the group. if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { - callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. This " + - "can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); + callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified externally. " + Review Comment: I think just "modified" should be clear enough to users since this is synchronously surfaced to the user via the alter / reset offsets REST API response 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting sink connector offsets", e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); + return; + } + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } - Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + // This should only occur for an offset reset request when: + // 1. There was a prior attempt to reset offsets + // OR + // 2. No offsets have been committed yet + if (offsetsToWrite.isEmpty()) { + completeAlterOffsetsCallback(alterOffsetsResult, isReset, cb); + return; } - @SuppressWarnings("rawtypes") - KafkaFuture<Void> compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0])); - - compositeAdminFuture.whenComplete((ignored, error) -> { - if (error != null) { - // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client - // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped - // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to - // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException - if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + - "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + - "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + - "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), - null); - } else { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); - } - } else { - completeAlterOffsetsCallback(alterOffsetsResult, cb); - } - }).whenComplete((ignored, ignoredError) -> { - // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw - // an exception itself, and we can thus ignore the error here - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); - }); + if (isReset) { + resetSinkConnectorOffsets(connName, groupId, admin, cb, alterOffsetsResult); + } else { + alterSinkConnectorOffsets(connName, groupId, admin, offsetsToWrite, cb, alterOffsetsResult); + } } catch (Throwable t) { - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + Utils.closeQuietly(admin, "Offset modification admin for sink connector " + connName); throw t; } } catch (Throwable t) { - cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null); + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify offsets for sink connector " + connName), null); } })); } /** - * Alter a source connector's offsets. + * Alter a sink connector's consumer group offsets. This is done via calls to {@link Admin#alterConsumerGroupOffsets} + * and / or {@link Admin#deleteConsumerGroupOffsets}. * - * @param connName the name of the source connector whose offsets are to be altered + * @param connName the name of the sink connector whose offsets are to be altered + * @param groupId the sink connector's consumer group ID + * @param admin the {@link Admin admin client} to be used for altering the consumer group offsets; should be closed after use Review Comment: Good catch, thanks. I think this was leftover from a previous refactor (earlier it was the caller's responsibility to close the admin client). ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting sink connector offsets", e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); + return; + } + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } - Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + // This should only occur for an offset reset request when: + // 1. There was a prior attempt to reset offsets + // OR + // 2. No offsets have been committed yet + if (offsetsToWrite.isEmpty()) { + completeAlterOffsetsCallback(alterOffsetsResult, isReset, cb); + return; } - @SuppressWarnings("rawtypes") - KafkaFuture<Void> compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0])); - - compositeAdminFuture.whenComplete((ignored, error) -> { - if (error != null) { - // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client - // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped - // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to - // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException - if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + - "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + - "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + - "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), - null); - } else { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); - } - } else { - completeAlterOffsetsCallback(alterOffsetsResult, cb); - } - }).whenComplete((ignored, ignoredError) -> { - // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw - // an exception itself, and we can thus ignore the error here - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); - }); + if (isReset) { + resetSinkConnectorOffsets(connName, groupId, admin, cb, alterOffsetsResult); + } else { + alterSinkConnectorOffsets(connName, groupId, admin, offsetsToWrite, cb, alterOffsetsResult); + } } catch (Throwable t) { - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + Utils.closeQuietly(admin, "Offset modification admin for sink connector " + connName); throw t; } } catch (Throwable t) { - cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null); + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify offsets for sink connector " + connName), null); } })); } /** - * Alter a source connector's offsets. + * Alter a sink connector's consumer group offsets. This is done via calls to {@link Admin#alterConsumerGroupOffsets} + * and / or {@link Admin#deleteConsumerGroupOffsets}. * - * @param connName the name of the source connector whose offsets are to be altered + * @param connName the name of the sink connector whose offsets are to be altered + * @param groupId the sink connector's consumer group ID + * @param admin the {@link Admin admin client} to be used for altering the consumer group offsets; should be closed after use + * @param offsetsToWrite a mapping from topic partitions to offsets that need to be written; may not be null or empty + * @param cb callback to invoke upon completion + * @param alterOffsetsResult the result of the call to {@link SinkConnector#alterOffsets} for the connector + */ + private void alterSinkConnectorOffsets(String connName, String groupId, Admin admin, Map<TopicPartition, Long> offsetsToWrite, + Callback<Message> cb, boolean alterOffsetsResult) { + List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); + + Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = offsetsToWrite.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + + if (!offsetsToAlter.isEmpty()) { + log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", + connName, offsetsToAlter); + AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, + alterConsumerGroupOffsetsOptions); + + adminFutures.add(alterConsumerGroupOffsetsResult.all()); + } + + Set<TopicPartition> partitionsToReset = offsetsToWrite.entrySet() + .stream() + .filter(entry -> entry.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + if (!partitionsToReset.isEmpty()) { + log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", + connName, partitionsToReset); + DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, + deleteConsumerGroupOffsetsOptions); + + adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + } + + @SuppressWarnings("rawtypes") + KafkaFuture<Void> compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0])); + + compositeAdminFuture.whenComplete((ignored, error) -> { + if (error != null) { + // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client + // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped + // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to + // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException + if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), + null); + } else { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); + } + } else { + completeAlterOffsetsCallback(alterOffsetsResult, false, cb); + } + }).whenComplete((ignored, ignoredError) -> { + // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw + // an exception itself, and we can thus ignore the error here + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + }); Review Comment: Yeah, no changes here. I'm surprised that the GitHub diff UI isn't smarter about things like this 🤷♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org