dajac commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1457478495
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1098,30 +1054,36 @@ private static boolean isGroupIdNotEmpty(String groupId) { return groupId != null && !groupId.isEmpty(); } - /** - * Handles the exception in the scheduleWriteOperation. - * @return The Errors instance associated with the given exception. - */ - private static Errors normalizeException(Throwable exception) { - exception = Errors.maybeUnwrapException(exception); - - if (exception instanceof UnknownTopicOrPartitionException || - exception instanceof NotEnoughReplicasException || - exception instanceof TimeoutException) { - return Errors.COORDINATOR_NOT_AVAILABLE; - } - - if (exception instanceof NotLeaderOrFollowerException || - exception instanceof KafkaStorageException) { - return Errors.NOT_COORDINATOR; - } - - if (exception instanceof RecordTooLargeException || - exception instanceof RecordBatchTooLargeException || - exception instanceof InvalidFetchSizeException) { - return Errors.UNKNOWN_SERVER_ERROR; + private <REQ, RSP> RSP handleOperationException( + String requestName, + REQ request, + Throwable exception, + BiFunction<Errors, String, RSP> responseBuilder + ) { + ApiError apiError = ApiError.fromThrowable(exception); + + switch (apiError.error()) { + case UNKNOWN_SERVER_ERROR: + log.error("{} request {} hit an unexpected exception: {}.", + requestName, request, exception.getMessage(), exception); + return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, null); + + case UNKNOWN_TOPIC_OR_PARTITION: + case NOT_ENOUGH_REPLICAS: + case REQUEST_TIMED_OUT: + return responseBuilder.apply(Errors.COORDINATOR_NOT_AVAILABLE, null); + + case NOT_LEADER_OR_FOLLOWER: + case KAFKA_STORAGE_ERROR: + return responseBuilder.apply(Errors.NOT_COORDINATOR, null); + + case MESSAGE_TOO_LARGE: + case RECORD_LIST_TOO_LARGE: + case INVALID_FETCH_SIZE: + return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, null); + + default: + return responseBuilder.apply(apiError.error(), apiError.message()); Review Comment: if you look at GroupMetadataManager.consumerGroupHeartbeat, there are many cases where a non-default message is provided in order to give more information to the client. all those non-default messages will be set here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org