dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1064619499


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel,
       offsetFetchResponse
     }
     requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    CompletableFuture.completedFuture[Unit](())
   }
 
-  private def handleOffsetFetchRequestBetweenV1AndV7(request: 
RequestChannel.Request): Unit = {
-    val header = request.header
+  private def handleOffsetFetchRequestFromCoordinator(request: 
RequestChannel.Request): CompletableFuture[Unit] = {
     val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupId = offsetFetchRequest.groupId()
-    val (error, partitionData) = fetchOffsets(groupId, 
offsetFetchRequest.isAllPartitions,
-      offsetFetchRequest.requireStable, offsetFetchRequest.partitions, 
request.context)
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse =
-        if (error != Errors.NONE) {
-          offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
-        } else {
-          new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
partitionData.asJava)
-        }
-      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    val groups = offsetFetchRequest.groups()
+    val requireStable = offsetFetchRequest.requireStable()
+
+    val futures = new 
mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
+    groups.forEach { groupOffsetFetch =>
+      val isAllPartitions = groupOffsetFetch.topics == null
+      val future = if (isAllPartitions) {
+        fetchAllOffsets(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      } else {
+        fetchOffsets(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      }
+      futures += future
     }
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
-  }
-
-  private def handleOffsetFetchRequestV8AndAbove(request: 
RequestChannel.Request): Unit = {
-    val header = request.header
-    val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupIds = offsetFetchRequest.groupIds().asScala
-    val groupToErrorMap =  mutable.Map.empty[String, Errors]
-    val groupToPartitionData =  mutable.Map.empty[String, 
util.Map[TopicPartition, PartitionData]]
-    val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
-    groupIds.foreach(g => {
-      val (error, partitionData) = fetchOffsets(g,
-        offsetFetchRequest.isAllPartitionsForGroup(g),
-        offsetFetchRequest.requireStable(),
-        groupToTopicPartitions.get(g), request.context)
-      groupToErrorMap += (g -> error)
-      groupToPartitionData += (g -> partitionData.asJava)
-    })
 
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
-        groupToErrorMap.asJava, groupToPartitionData.asJava)
-      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) =>
+      val groupResponses = new 
ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size)
+      futures.foreach(future => groupResponses += future.get())
+      requestHelper.sendMaybeThrottle(request, new 
OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion))
     }
-
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
   }
 
-  private def fetchOffsets(groupId: String, isAllPartitions: Boolean, 
requireStable: Boolean,
-                           partitions: util.List[TopicPartition], context: 
RequestContext): (Errors, Map[TopicPartition, 
OffsetFetchResponse.PartitionData]) = {
-    if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
-      (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
-    } else {
-      if (isAllPartitions) {
-        val (error, allPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable)
-        if (error != Errors.NONE) {
-          (error, allPartitionData)
-        } else {
-          // clients are not allowed to see offsets for topics that are not 
authorized for Describe
-          val (authorizedPartitionData, _) = 
authHelper.partitionMapByAuthorized(context,
-            DESCRIBE, TOPIC, allPartitionData)(_.topic)
-          (Errors.NONE, authorizedPartitionData)
-        }
+  private def fetchAllOffsets(

Review Comment:
   Renaming in KafkaApis is reasonable. For GroupCoordinator, the ForGroup 
seems a bit redundant so I would rather keep it as it is there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to