jolshan commented on code in PR #12901:
URL: https://github.com/apache/kafka/pull/12901#discussion_r1066362860


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2464,90 +2464,108 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleTxnOffsetCommitRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleTxnOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     ensureInterBrokerVersion(IBP_0_11_0_IV0)
-    val header = request.header
     val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest]
 
+    def sendResponse(response: TxnOffsetCommitResponse): Unit = {
+      // We need to replace COORDINATOR_LOAD_IN_PROGRESS with 
COORDINATOR_NOT_AVAILABLE
+      // for older producer client from 0.11 to prior 2.0, which could 
potentially crash due
+      // to unexpected loading error. This bug is fixed later by KAFKA-7296. 
Clients using
+      // txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed to 
have
+      // the fix to check for the loading error.
+      if (txnOffsetCommitRequest.version < 2) {
+        response.data.topics.forEach { topic =>
+          topic.partitions.forEach { partition =>
+            if (partition.errorCode == 
Errors.COORDINATOR_LOAD_IN_PROGRESS.code) {
+              partition.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
+            }
+          }
+        }
+      }
+
+      requestHelper.sendMaybeThrottle(request, response)
+    }
+
     // authorize for the transactionalId and the consumer group. Note that we 
skip producerId authorization
     // since it is implied by transactionalId authorization
-    if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
txnOffsetCommitRequest.data.transactionalId))
-      requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
-    else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId))
-      requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.GROUP_AUTHORIZATION_FAILED.exception)
-    else {
-      val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, 
TxnOffsetCommitRequest.CommittedOffset]()
-      val committedOffsets = txnOffsetCommitRequest.offsets.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
READ, TOPIC, committedOffsets)(_._1.topic)
-
-      for ((topicPartition, commitedOffset) <- committedOffsets) {
-        if (!authorizedTopics.contains(topicPartition.topic))
-          unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-        else if (!metadataCache.contains(topicPartition))
-          nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-        else
-          authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset)
-      }
+    if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
txnOffsetCommitRequest.data.transactionalId)) {
+      
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
+      
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        txnOffsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new TxnOffsetCommitResponse.Builder()
+      val authorizedTopicCommittedOffsets = new 
mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]()
+      txnOffsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its 
partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          
responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          
responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name)
 
-      // the callback for sending an offset commit response
-      def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition, 
Errors]): Unit = {
-        val combinedCommitStatus = mutable.Map() ++= authorizedTopicErrors ++= 
unauthorizedTopicErrors ++= nonExistingTopicErrors
-        if (isDebugEnabled)
-          combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-            if (error != Errors.NONE) {
-              debug(s"TxnOffsetCommit with correlation id 
${header.correlationId} from client ${header.clientId} " +
-                s"on partition $topicPartition failed due to 
${error.exceptionName}")
+          topic.partitions.forEach { partition =>
+            if (metadataCache.getPartitionInfo(topic.name, 
partition.partitionIndex).nonEmpty) {
+              topicWithValidPartitions.partitions.add(partition)
+            } else {
+              responseBuilder.addPartition(topic.name, 
partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
             }
           }
 
-        // We need to replace COORDINATOR_LOAD_IN_PROGRESS with 
COORDINATOR_NOT_AVAILABLE
-        // for older producer client from 0.11 to prior 2.0, which could 
potentially crash due
-        // to unexpected loading error. This bug is fixed later by KAFKA-7296. 
Clients using
-        // txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed 
to have
-        // the fix to check for the loading error.
-        if (txnOffsetCommitRequest.version < 2) {
-          combinedCommitStatus ++= combinedCommitStatus.collect {
-            case (tp, error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS 
=> tp -> Errors.COORDINATOR_NOT_AVAILABLE
+          if (!topicWithValidPartitions.partitions.isEmpty) {
+            authorizedTopicCommittedOffsets += topicWithValidPartitions
           }
         }
-
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-          new TxnOffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
       }
 
-      if (authorizedTopicCommittedOffsets.isEmpty)
-        sendResponseCallback(Map.empty)
-      else {
-        val offsetMetadata = 
convertTxnOffsets(authorizedTopicCommittedOffsets.toMap)
-        groupCoordinator.handleTxnCommitOffsets(
-          txnOffsetCommitRequest.data.groupId,
-          txnOffsetCommitRequest.data.producerId,
-          txnOffsetCommitRequest.data.producerEpoch,
-          txnOffsetCommitRequest.data.memberId,
-          Option(txnOffsetCommitRequest.data.groupInstanceId),
-          txnOffsetCommitRequest.data.generationId,
-          offsetMetadata,
-          sendResponseCallback,
-          requestLocal)
+      if (authorizedTopicCommittedOffsets.isEmpty) {
+        sendResponse(responseBuilder.build())
+        CompletableFuture.completedFuture[Unit](())
+      } else {
+        val txnOffsetCommitRequestData = new TxnOffsetCommitRequestData()

Review Comment:
   Are we only replacing the topics here? There's a lot of code to move the 
majority of one request data into a new one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to