dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054822770


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit 
= {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ 
nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to 
${error.exceptionName}")
-          }
-        }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
-    }
-
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (offsetCommitRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-        }
-      }
-      sendResponseCallback(errorMap.toMap)
+      requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else {
-      val authorizedTopicRequestInfoBldr = 
immutable.Map.newBuilder[TopicPartition, 
OffsetCommitRequestData.OffsetCommitRequestPartition]
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetCommitResponse.Builder()
+      val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its 
partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new 
OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+          topic.partitions.forEach { partition =>
+            if (metadataCache.getPartitionInfo(topic.name, 
partition.partitionIndex).nonEmpty) {

Review Comment:
   Yeah, I did this on purpose. If the topic does not exist, it does not make 
sense to verify all the partitions of that topic. Also, in the future, we will 
also add a check for the topic id somewhere here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to