jolshan commented on code in PR #12901: URL: https://github.com/apache/kafka/pull/12901#discussion_r1066362860
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2464,90 +2464,108 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleTxnOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleTxnOffsetCommitRequest( + request: RequestChannel.Request, + requestLocal: RequestLocal + ): CompletableFuture[Unit] = { ensureInterBrokerVersion(IBP_0_11_0_IV0) - val header = request.header val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest] + def sendResponse(response: TxnOffsetCommitResponse): Unit = { + // We need to replace COORDINATOR_LOAD_IN_PROGRESS with COORDINATOR_NOT_AVAILABLE + // for older producer client from 0.11 to prior 2.0, which could potentially crash due + // to unexpected loading error. This bug is fixed later by KAFKA-7296. Clients using + // txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed to have + // the fix to check for the loading error. + if (txnOffsetCommitRequest.version < 2) { + response.data.topics.forEach { topic => + topic.partitions.forEach { partition => + if (partition.errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code) { + partition.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code) + } + } + } + } + + requestHelper.sendMaybeThrottle(request, response) + } + // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization // since it is implied by transactionalId authorization - if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId)) - requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) - else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) - requestHelper.sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) - else { - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]() - val committedOffsets = txnOffsetCommitRequest.offsets.asScala - val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, committedOffsets)(_._1.topic) - - for ((topicPartition, commitedOffset) <- committedOffsets) { - if (!authorizedTopics.contains(topicPartition.topic)) - unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED - else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION - else - authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset) - } + if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId)) { + sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) { + sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + txnOffsetCommitRequest.data.topics.asScala + )(_.name) + + val responseBuilder = new TxnOffsetCommitResponse.Builder() + val authorizedTopicCommittedOffsets = new mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]() + txnOffsetCommitRequest.data.topics.forEach { topic => + if (!authorizedTopics.contains(topic.name)) { + // If the topic is not authorized, we add the topic and all its partitions + // to the response with TOPIC_AUTHORIZATION_FAILED. + responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) + } else if (!metadataCache.contains(topic.name)) { + // If the topic is unknown, we add the topic and all its partitions + // to the response with UNKNOWN_TOPIC_OR_PARTITION. + responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) + } else { + // Otherwise, we check all partitions to ensure that they all exist. + val topicWithValidPartitions = new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name) - // the callback for sending an offset commit response - def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = mutable.Map() ++= authorizedTopicErrors ++= unauthorizedTopicErrors ++= nonExistingTopicErrors - if (isDebugEnabled) - combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { - debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") + topic.partitions.forEach { partition => + if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) { + topicWithValidPartitions.partitions.add(partition) + } else { + responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) } } - // We need to replace COORDINATOR_LOAD_IN_PROGRESS with COORDINATOR_NOT_AVAILABLE - // for older producer client from 0.11 to prior 2.0, which could potentially crash due - // to unexpected loading error. This bug is fixed later by KAFKA-7296. Clients using - // txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed to have - // the fix to check for the loading error. - if (txnOffsetCommitRequest.version < 2) { - combinedCommitStatus ++= combinedCommitStatus.collect { - case (tp, error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS => tp -> Errors.COORDINATOR_NOT_AVAILABLE + if (!topicWithValidPartitions.partitions.isEmpty) { + authorizedTopicCommittedOffsets += topicWithValidPartitions } } - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new TxnOffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) } - if (authorizedTopicCommittedOffsets.isEmpty) - sendResponseCallback(Map.empty) - else { - val offsetMetadata = convertTxnOffsets(authorizedTopicCommittedOffsets.toMap) - groupCoordinator.handleTxnCommitOffsets( - txnOffsetCommitRequest.data.groupId, - txnOffsetCommitRequest.data.producerId, - txnOffsetCommitRequest.data.producerEpoch, - txnOffsetCommitRequest.data.memberId, - Option(txnOffsetCommitRequest.data.groupInstanceId), - txnOffsetCommitRequest.data.generationId, - offsetMetadata, - sendResponseCallback, - requestLocal) + if (authorizedTopicCommittedOffsets.isEmpty) { + sendResponse(responseBuilder.build()) + CompletableFuture.completedFuture[Unit](()) + } else { + val txnOffsetCommitRequestData = new TxnOffsetCommitRequestData() Review Comment: Are we only replacing the topics here? There's a lot of code to move the majority of one request data into a new one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org