dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1064619499
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel, offsetFetchResponse } requestHelper.sendResponseMaybeThrottle(request, createResponse) + CompletableFuture.completedFuture[Unit](()) } - private def handleOffsetFetchRequestBetweenV1AndV7(request: RequestChannel.Request): Unit = { - val header = request.header + private def handleOffsetFetchRequestFromCoordinator(request: RequestChannel.Request): CompletableFuture[Unit] = { val offsetFetchRequest = request.body[OffsetFetchRequest] - val groupId = offsetFetchRequest.groupId() - val (error, partitionData) = fetchOffsets(groupId, offsetFetchRequest.isAllPartitions, - offsetFetchRequest.requireStable, offsetFetchRequest.partitions, request.context) - def createResponse(requestThrottleMs: Int): AbstractResponse = { - val offsetFetchResponse = - if (error != Errors.NONE) { - offsetFetchRequest.getErrorResponse(requestThrottleMs, error) - } else { - new OffsetFetchResponse(requestThrottleMs, Errors.NONE, partitionData.asJava) - } - trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.") - offsetFetchResponse + val groups = offsetFetchRequest.groups() + val requireStable = offsetFetchRequest.requireStable() + + val futures = new mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size) + groups.forEach { groupOffsetFetch => + val isAllPartitions = groupOffsetFetch.topics == null + val future = if (isAllPartitions) { + fetchAllOffsets( + request.context, + groupOffsetFetch, + requireStable + ) + } else { + fetchOffsets( + request.context, + groupOffsetFetch, + requireStable + ) + } + futures += future } - requestHelper.sendResponseMaybeThrottle(request, createResponse) - } - - private def handleOffsetFetchRequestV8AndAbove(request: RequestChannel.Request): Unit = { - val header = request.header - val offsetFetchRequest = request.body[OffsetFetchRequest] - val groupIds = offsetFetchRequest.groupIds().asScala - val groupToErrorMap = mutable.Map.empty[String, Errors] - val groupToPartitionData = mutable.Map.empty[String, util.Map[TopicPartition, PartitionData]] - val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions() - groupIds.foreach(g => { - val (error, partitionData) = fetchOffsets(g, - offsetFetchRequest.isAllPartitionsForGroup(g), - offsetFetchRequest.requireStable(), - groupToTopicPartitions.get(g), request.context) - groupToErrorMap += (g -> error) - groupToPartitionData += (g -> partitionData.asJava) - }) - def createResponse(requestThrottleMs: Int): AbstractResponse = { - val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs, - groupToErrorMap.asJava, groupToPartitionData.asJava) - trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.") - offsetFetchResponse + CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) => + val groupResponses = new ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size) + futures.foreach(future => groupResponses += future.get()) + requestHelper.sendMaybeThrottle(request, new OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion)) } - - requestHelper.sendResponseMaybeThrottle(request, createResponse) } - private def fetchOffsets(groupId: String, isAllPartitions: Boolean, requireStable: Boolean, - partitions: util.List[TopicPartition], context: RequestContext): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = { - if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) { - (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty) - } else { - if (isAllPartitions) { - val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable) - if (error != Errors.NONE) { - (error, allPartitionData) - } else { - // clients are not allowed to see offsets for topics that are not authorized for Describe - val (authorizedPartitionData, _) = authHelper.partitionMapByAuthorized(context, - DESCRIBE, TOPIC, allPartitionData)(_.topic) - (Errors.NONE, authorizedPartitionData) - } + private def fetchAllOffsets( Review Comment: Renaming in KafkaApis is reasonable. For GroupCoordinator, the ForGroup seems a bit redundant so I would rather keep it as it is there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org