dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1064619499
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel,
offsetFetchResponse
}
requestHelper.sendResponseMaybeThrottle(request, createResponse)
+ CompletableFuture.completedFuture[Unit](())
}
- private def handleOffsetFetchRequestBetweenV1AndV7(request:
RequestChannel.Request): Unit = {
- val header = request.header
+ private def handleOffsetFetchRequestFromCoordinator(request:
RequestChannel.Request): CompletableFuture[Unit] = {
val offsetFetchRequest = request.body[OffsetFetchRequest]
- val groupId = offsetFetchRequest.groupId()
- val (error, partitionData) = fetchOffsets(groupId,
offsetFetchRequest.isAllPartitions,
- offsetFetchRequest.requireStable, offsetFetchRequest.partitions,
request.context)
- def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val offsetFetchResponse =
- if (error != Errors.NONE) {
- offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
- } else {
- new OffsetFetchResponse(requestThrottleMs, Errors.NONE,
partitionData.asJava)
- }
- trace(s"Sending offset fetch response $offsetFetchResponse for
correlation id ${header.correlationId} to client ${header.clientId}.")
- offsetFetchResponse
+ val groups = offsetFetchRequest.groups()
+ val requireStable = offsetFetchRequest.requireStable()
+
+ val futures = new
mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
+ groups.forEach { groupOffsetFetch =>
+ val isAllPartitions = groupOffsetFetch.topics == null
+ val future = if (isAllPartitions) {
+ fetchAllOffsets(
+ request.context,
+ groupOffsetFetch,
+ requireStable
+ )
+ } else {
+ fetchOffsets(
+ request.context,
+ groupOffsetFetch,
+ requireStable
+ )
+ }
+ futures += future
}
- requestHelper.sendResponseMaybeThrottle(request, createResponse)
- }
-
- private def handleOffsetFetchRequestV8AndAbove(request:
RequestChannel.Request): Unit = {
- val header = request.header
- val offsetFetchRequest = request.body[OffsetFetchRequest]
- val groupIds = offsetFetchRequest.groupIds().asScala
- val groupToErrorMap = mutable.Map.empty[String, Errors]
- val groupToPartitionData = mutable.Map.empty[String,
util.Map[TopicPartition, PartitionData]]
- val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
- groupIds.foreach(g => {
- val (error, partitionData) = fetchOffsets(g,
- offsetFetchRequest.isAllPartitionsForGroup(g),
- offsetFetchRequest.requireStable(),
- groupToTopicPartitions.get(g), request.context)
- groupToErrorMap += (g -> error)
- groupToPartitionData += (g -> partitionData.asJava)
- })
- def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
- groupToErrorMap.asJava, groupToPartitionData.asJava)
- trace(s"Sending offset fetch response $offsetFetchResponse for
correlation id ${header.correlationId} to client ${header.clientId}.")
- offsetFetchResponse
+ CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) =>
+ val groupResponses = new
ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size)
+ futures.foreach(future => groupResponses += future.get())
+ requestHelper.sendMaybeThrottle(request, new
OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion))
}
-
- requestHelper.sendResponseMaybeThrottle(request, createResponse)
}
- private def fetchOffsets(groupId: String, isAllPartitions: Boolean,
requireStable: Boolean,
- partitions: util.List[TopicPartition], context:
RequestContext): (Errors, Map[TopicPartition,
OffsetFetchResponse.PartitionData]) = {
- if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
- (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
- } else {
- if (isAllPartitions) {
- val (error, allPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable)
- if (error != Errors.NONE) {
- (error, allPartitionData)
- } else {
- // clients are not allowed to see offsets for topics that are not
authorized for Describe
- val (authorizedPartitionData, _) =
authHelper.partitionMapByAuthorized(context,
- DESCRIBE, TOPIC, allPartitionData)(_.topic)
- (Errors.NONE, authorizedPartitionData)
- }
+ private def fetchAllOffsets(
Review Comment:
Renaming in KafkaApis is reasonable. For GroupCoordinator, the ForGroup
seems a bit redundant so I would rather keep it as it is there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]