jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1120990845
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel, if (config.interBrokerProtocolVersion.isLessThan(version)) throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}") } - - def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { ensureInterBrokerVersion(IBP_0_11_0_IV0) - val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] - val transactionalId = addPartitionsToTxnRequest.data.transactionalId - val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala - if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) - else { - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - val authorizedPartitions = mutable.Set[TopicPartition]() - - val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, - partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic) - for (topicPartition <- partitionsToAdd) { - if (!authorizedTopics.contains(topicPartition.topic)) - unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED - else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION - else - authorizedPartitions.add(topicPartition) + val addPartitionsToTxnRequest = + if (request.context.apiVersion() < 4) + request.body[AddPartitionsToTxnRequest].normalizeRequest() + else + request.body[AddPartitionsToTxnRequest] + val version = addPartitionsToTxnRequest.version + val responses = new AddPartitionsToTxnResultCollection() + val partitionsByTransaction = addPartitionsToTxnRequest.partitionsByTransaction() + + // Newer versions of the request should only come from other brokers. + if (version >= 4) authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + + // V4 requests introduced batches of transactions. We need all transactions to be handled before sending the + // response so there are a few differences in handling errors and sending responses. + def createResponse(requestThrottleMs: Int): AbstractResponse = { + if (version < 4) { + // There will only be one response in data. Add it to the response data object. + val data = new AddPartitionsToTxnResponseData() + responses.forEach(result => { + data.setResultsByTopicV3AndBelow(result.topicResults()) + data.setThrottleTimeMs(requestThrottleMs) + }) + new AddPartitionsToTxnResponse(data) + } else { + new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses)) } + } - if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) { - // Any failed partition check causes the entire request to fail. We send the appropriate error codes for the - // partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded - // the authorization check to indicate that they were not added to the transaction. - val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ - authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava)) + val txns = addPartitionsToTxnRequest.data.transactions + def maybeSendResponse(): Unit = { + var canSend = false + responses.synchronized { + if (responses.size() == txns.size()) { + canSend = true + } + } + if (canSend) { + requestHelper.sendResponseMaybeThrottle(request, createResponse) + } + } + + txns.forEach( transaction => { + val transactionalId = transaction.transactionalId + val partitionsToAdd = partitionsByTransaction.get(transactionalId).asScala + + // Versions < 4 come from clients and must be authorized to write for the given transaction and for the given topics. + if (version < 4 && !authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) { + responses.add(addPartitionsToTxnRequest.errorResponseForTransaction(transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) + maybeSendResponse() } else { - def sendResponseCallback(error: Errors): Unit = { - def createResponse(requestThrottleMs: Int): AbstractResponse = { - val finalError = - if (addPartitionsToTxnRequest.version < 2 && error == Errors.PRODUCER_FENCED) { + val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() + val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() + val authorizedPartitions = mutable.Set[TopicPartition]() + + val authorizedTopics = if (version < 4) authHelper.filterByAuthorized(request.context, WRITE, TOPIC, + partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic) else partitionsToAdd.map(_.topic).toSet + for (topicPartition <- partitionsToAdd) { + if (!authorizedTopics.contains(topicPartition.topic)) + unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED + else if (!metadataCache.contains(topicPartition)) + nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION + else + authorizedPartitions.add(topicPartition) + } + + if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) { + // Any failed partition check causes the entire transaction to fail. We send the appropriate error codes for the + // partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded + // the authorization check to indicate that they were not added to the transaction. + val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ + authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED) + responses.add(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, partitionErrors.asJava)) + maybeSendResponse() + } else { + def sendResponseCallback(error: Errors): Unit = { + val finalError = { + if (version < 2 && error == Errors.PRODUCER_FENCED) { // For older clients, they could not understand the new PRODUCER_FENCED error code, // so we need to return the old INVALID_PRODUCER_EPOCH to have the same client handling logic. Errors.INVALID_PRODUCER_EPOCH } else { error } - - val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(requestThrottleMs, - partitionsToAdd.map{tp => (tp, finalError)}.toMap.asJava) - trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}") - responseBody + } + responses.synchronized { + responses.add(addPartitionsToTxnRequest.errorResponseForTransaction(transactionalId, finalError)) + } + maybeSendResponse() + } + + def sendVerifyResponseCallback(errors: Map[TopicPartition, Errors]): Unit = { + responses.synchronized { + responses.add(AddPartitionsToTxnResponse.resultForTransaction(transactionalId, errors.asJava)) + } + maybeSendResponse() Review Comment: Maybe. But the parameters may be tricky. 😅 The maybeSendResponse was supposed to be the helper. But I guess it's not helpful enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org