yangy0000 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1341917129
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -705,9 +783,39 @@ public CompletableFuture<OffsetDeleteResponseData> deleteOffsets( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + if (!isGroupIdNotEmpty(request.groupId())) { + return CompletableFuture.completedFuture(new OffsetDeleteResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + ); + } + + return runtime.scheduleWriteOperation( + "delete-offsets", + topicPartitionFor(request.groupId()), + coordinator -> coordinator.deleteOffsets(context, request) + ).exceptionally(exception -> { + if (exception instanceof UnknownTopicOrPartitionException || + exception instanceof NotEnoughReplicasException) { Review Comment: Error handling codes in lines 555-586 and 797-816 are very similar. An alternative implementation is create a util method: ``` private Errors transformExceptionCode(Exception e) if (exception instanceof UnknownTopicOrPartitionException || exception instanceof NotEnoughReplicasException) { return Errors.COORDINATOR_NOT_AVAILABLE } ... ) ``` within deleteGroups : ``` exception-> { return DeleteGroupsRequest.getErrorResultCollection( groupList, transformExceptionCode(e)) } ``` within deleteOffsets: ``` exception-> { return new OffsetDeleteResponseData() .setErrorCode( transformExceptionCode(e).code()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org