jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054648954


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle an offset commit request
    */
   def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-    val header = request.header
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit 
= {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ 
nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to 
${error.exceptionName}")
+    def sendResponse(response: OffsetCommitResponse): Unit = {
+      trace(s"Sending offset commit response $response for correlation id 
${request.header.correlationId} to " +
+        s"client ${request.header.clientId}.")
+
+      if (isDebugEnabled) {
+        response.data.topics.forEach { topic =>
+          topic.partitions.forEach { partition =>
+            if (partition.errorCode != Errors.NONE.code) {
+              debug(s"Offset commit request with correlation id 
${request.header.correlationId} from client ${request.header.clientId} " +
+                s"on partition ${topic.name}-${partition.partitionIndex} 
failed due to ${Errors.forCode(partition.errorCode)}")
+            }
           }
         }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
+      }
+
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      
sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
     } else if (offsetCommitRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
+      
sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    } else {
+      val offsetCommitResponseData = new OffsetCommitResponseData()
+      val topicsPendingPartitions = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      def makePartitionResponse(
+        partitionIndex: Int,
+        error: Errors
+      ): OffsetCommitResponseData.OffsetCommitResponsePartition = {
+        new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(partitionIndex)
+          .setErrorCode(error.code)
+      }
+
+      def addTopicToResponse(
+        topic: OffsetCommitRequestData.OffsetCommitRequestTopic,
+        error: Errors
+      ): Unit = {
+        val topicResponse = new 
OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name)
+        offsetCommitResponseData.topics.add(topicResponse)
+        topic.partitions.forEach { partition =>
+          
topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, 
error))
         }
       }
-      sendResponseCallback(errorMap.toMap)
-    } else {
-      val authorizedTopicRequestInfoBldr = 
immutable.Map.newBuilder[TopicPartition, 
OffsetCommitRequestData.OffsetCommitRequestPartition]
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+      val authorizedTopicsRequest = new 
util.ArrayList[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its 
partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          addTopicToResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          addTopicToResponse(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          var topicRequestWithValidPartitions: 
OffsetCommitRequestData.OffsetCommitRequestTopic = null

Review Comment:
   Ah interesting. For offsetfetch, do we basically try to fetch and if its 
missing we return the error there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to