jolshan commented on code in PR #15196:
URL: https://github.com/apache/kafka/pull/15196#discussion_r1467019262


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
     }
 
     /**
-     * Handles the exception in the scheduleWriteOperation.
-     * @return The Errors instance associated with the given exception.
+     * This is the handler commonly used by all the operations that requires 
to convert errors to
+     * coordinator errors. The handler also handles and log unexpected errors.
+     *
+     * @param requestName       The name of the request.
+     * @param request           The request itself for logging purposes.
+     * @param exception         The exception to handle.
+     * @param responseBuilder   A function which takes an Errors and a String 
and returns
+     *                          the response. The String can be null.
+     * @return The response.
+     * @param <REQ> The type of the request.

Review Comment:
   Should these generics be limited to request/response types?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -444,21 +416,21 @@ public CompletableFuture<HeartbeatResponseData> heartbeat(
         return runtime.scheduleReadOperation("classic-group-heartbeat",
             topicPartitionFor(request.groupId()),
             (coordinator, __) -> coordinator.classicGroupHeartbeat(context, 
request)
-        ).exceptionally(exception -> {
-            if (!(exception instanceof KafkaException)) {
-                log.error("Heartbeat request {} hit an unexpected exception: 
{}",
-                    request, exception.getMessage());
-            }
-
-            if (exception instanceof CoordinatorLoadInProgressException) {
-                // The group is still loading, so blindly respond
-                return new HeartbeatResponseData()
-                    .setErrorCode(Errors.NONE.code());
+        ).exceptionally(exception -> handleOperationException(
+            "Heartbeat",

Review Comment:
   Maybe worth leaving a comment in the helper that some requests further 
transform errors.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -524,37 +495,39 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             );
         }
 
-        final CompletableFuture<ListGroupsResponseData> future = new 
CompletableFuture<>();

Review Comment:
   so instead of having one future and the results in a list, we now have a 
list of futures where each list item is a list of the groups per topic 
partition?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -360,14 +334,13 @@ public CompletableFuture<JoinGroupResponseData> joinGroup(
             Duration.ofMillis(config.offsetCommitTimeoutMs),
             coordinator -> coordinator.classicGroupJoin(context, request, 
responseFuture)
         ).exceptionally(exception -> {
-            if (!(exception instanceof KafkaException)) {

Review Comment:
   There are a few of these requests that handled it like this but now we will 
return specific errors and not say "unexpected exception"
   
   Is it the case that these were never really unexpected (besides unknown 
server error)
   Do we want to flag any other errors as unexpected? Or just handle any of 
them/return in the response without a log.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -524,37 +495,39 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
             );
         }
 
-        final CompletableFuture<ListGroupsResponseData> future = new 
CompletableFuture<>();
-        final List<ListGroupsResponseData.ListedGroup> results = new 
ArrayList<>();
         final Set<TopicPartition> existingPartitionSet = runtime.partitions();
-        final AtomicInteger cnt = new 
AtomicInteger(existingPartitionSet.size());
 
         if (existingPartitionSet.isEmpty()) {
             return CompletableFuture.completedFuture(new 
ListGroupsResponseData());
         }
 
+        final 
List<CompletableFuture<List<ListGroupsResponseData.ListedGroup>>> futures =
+            new ArrayList<>();
+
         for (TopicPartition tp : existingPartitionSet) {
-            runtime.scheduleReadOperation(
+            futures.add(runtime.scheduleReadOperation(
                 "list-groups",
                 tp,
                 (coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
-            ).handle((groups, exception) -> {
-                if (exception == null) {
-                    synchronized (results) {
-                        results.addAll(groups);
-                    }
+            ).exceptionally(exception -> {
+                exception = Errors.maybeUnwrapException(exception);

Review Comment:
   did we not unwrap the exception correctly before 😓 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -595,27 +568,17 @@ public 
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
                     "consumer-group-describe",
                     topicPartition,
                     (coordinator, lastCommittedOffset) -> 
coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset)
-                ).exceptionally(exception -> {
-                    if (!(exception instanceof KafkaException)) {
-                        log.error("ConsumerGroupDescribe request {} hit an 
unexpected exception: {}.",
-                            groupList, exception.getMessage());
-                    }
-
-                    return 
ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
-                        groupList,
-                        Errors.forException(exception)
-                    );
-                });
+                ).exceptionally(exception -> handleOperationException(
+                    "ConsumerGroupDescribe",
+                    groupList,
+                    exception,
+                    (error, __) -> 
ConsumerGroupDescribeRequest.getErrorDescribedGroupList(groupList, error)
+                ));
 
             futures.add(future);
         });
 
-        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

Review Comment:
   Is line 581 just a one line for these lines? EDIT: I see the helper now 👍 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1098,30 +1054,36 @@ private static boolean isGroupIdNotEmpty(String 
groupId) {
         return groupId != null && !groupId.isEmpty();
     }
 
-    /**
-     * Handles the exception in the scheduleWriteOperation.
-     * @return The Errors instance associated with the given exception.
-     */
-    private static Errors normalizeException(Throwable exception) {
-        exception = Errors.maybeUnwrapException(exception);
-
-        if (exception instanceof UnknownTopicOrPartitionException ||
-            exception instanceof NotEnoughReplicasException ||
-            exception instanceof TimeoutException) {
-            return Errors.COORDINATOR_NOT_AVAILABLE;
-        }
-
-        if (exception instanceof NotLeaderOrFollowerException ||
-            exception instanceof KafkaStorageException) {
-            return Errors.NOT_COORDINATOR;
-        }
-
-        if (exception instanceof RecordTooLargeException ||
-            exception instanceof RecordBatchTooLargeException ||
-            exception instanceof InvalidFetchSizeException) {
-            return Errors.UNKNOWN_SERVER_ERROR;
+    private <REQ, RSP> RSP handleOperationException(
+        String requestName,
+        REQ request,
+        Throwable exception,
+        BiFunction<Errors, String, RSP> responseBuilder
+    ) {
+        ApiError apiError = ApiError.fromThrowable(exception);
+
+        switch (apiError.error()) {
+            case UNKNOWN_SERVER_ERROR:
+                log.error("{} request {} hit an unexpected exception: {}.",
+                    requestName, request, exception.getMessage(), exception);
+                return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, 
null);
+
+            case UNKNOWN_TOPIC_OR_PARTITION:
+            case NOT_ENOUGH_REPLICAS:
+            case REQUEST_TIMED_OUT:
+                return responseBuilder.apply(Errors.COORDINATOR_NOT_AVAILABLE, 
null);
+
+            case NOT_LEADER_OR_FOLLOWER:
+            case KAFKA_STORAGE_ERROR:
+                return responseBuilder.apply(Errors.NOT_COORDINATOR, null);
+
+            case MESSAGE_TOO_LARGE:
+            case RECORD_LIST_TOO_LARGE:
+            case INVALID_FETCH_SIZE:
+                return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, 
null);
+
+            default:
+                return responseBuilder.apply(apiError.error(), 
apiError.message());

Review Comment:
   And any of the errors that are converted expect an empty message? Just 
wanted to confirm that we only have custom messages in the specific case where 
we want them



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -360,14 +334,13 @@ public CompletableFuture<JoinGroupResponseData> joinGroup(
             Duration.ofMillis(config.offsetCommitTimeoutMs),
             coordinator -> coordinator.classicGroupJoin(context, request, 
responseFuture)
         ).exceptionally(exception -> {
-            if (!(exception instanceof KafkaException)) {

Review Comment:
   Do we also want to add tests for some of the new error handling?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to