dajac commented on code in PR #21263:
URL: https://github.com/apache/kafka/pull/21263#discussion_r2668658267
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -2217,63 +2218,6 @@ public CompletableFuture<Void> completeTransaction(
);
}
- /**
- * See {@link GroupCoordinator#onPartitionsDeleted(List, BufferSupplier)}.
- */
- @Override
- public void onPartitionsDeleted(
- List<TopicPartition> topicPartitions,
- BufferSupplier bufferSupplier
- ) throws ExecutionException, InterruptedException {
- throwIfNotActive();
-
- var futures = new ArrayList<CompletableFuture<Void>>();
-
- // Handle the partition deletion for committed offsets.
- futures.addAll(
- FutureUtils.mapExceptionally(
- runtime.scheduleWriteAllOperation(
- "on-partition-deleted",
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator ->
coordinator.onPartitionsDeleted(topicPartitions)
- ),
- exception -> {
- log.error("Could not delete offsets for deleted partitions
{} due to: {}.",
- topicPartitions, exception.getMessage(), exception
- );
- return null;
- }
- )
- );
-
- // Handle the topic deletion for share state.
- if (metadataImage != null) {
- var topicIds = topicPartitions.stream()
- .filter(tp ->
metadataImage.topicMetadata(tp.topic()).isPresent())
- .map(tp -> metadataImage.topicMetadata(tp.topic()).get().id())
- .collect(Collectors.toSet());
-
- if (!topicIds.isEmpty()) {
- futures.addAll(
- FutureUtils.mapExceptionally(
- runtime.scheduleWriteAllOperation(
- "maybe-cleanup-share-group-state",
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator ->
coordinator.maybeCleanupShareGroupState(topicIds)
- ),
- exception -> {
- log.error("Unable to cleanup state for the deleted
topics {}", topicIds, exception);
- return null;
- }
- )
- );
- }
- }
-
- // Wait on the results.
- CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
Review Comment:
There is a but here as the code does not actually waits on the future to
complete.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]