dajac commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1457569421


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -717,17 +717,16 @@ class KafkaApis(val requestChannel: RequestChannel,
       val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
 
       // call the replica manager to append messages to the replicas
-      replicaManager.appendRecords(
+      replicaManager.handleProduceAppend(
         timeout = produceRequest.timeout.toLong,
         requiredAcks = produceRequest.acks,
         internalTopicsAllowed = internalTopicsAllowed,
         origin = AppendOrigin.CLIENT,

Review Comment:
   nit: I wonder if we should remove the `origin` parameter from 
`handleProduceAppend` as it should always come from a client by definition.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3041,31 +3007,57 @@ class ReplicaManagerTest {
       origin = origin,
       entriesPerPartition = Map(partition -> records),
       responseCallback = appendCallback,
-      transactionalId = transactionalId,
     )
 
     result
   }
 
-  private def appendRecordsToMultipleTopics(replicaManager: ReplicaManager,
-                                            entriesToAppend: 
Map[TopicPartition, MemoryRecords],
-                                            transactionalId: String,
-                                            origin: AppendOrigin = 
AppendOrigin.CLIENT,
-                                            requiredAcks: Short = -1): 
CallbackResult[Map[TopicPartition, PartitionResponse]] = {
+  private def handleProduceAppendToMultipleTopics(replicaManager: 
ReplicaManager,
+                                                  entriesToAppend: 
Map[TopicPartition, MemoryRecords],
+                                                  transactionalId: String,
+                                                  origin: AppendOrigin = 
AppendOrigin.CLIENT,
+                                                  requiredAcks: Short = -1): 
CallbackResult[Map[TopicPartition, PartitionResponse]] = {
     val result = new CallbackResult[Map[TopicPartition, PartitionResponse]]()
     def appendCallback(responses: Map[TopicPartition, PartitionResponse]): 
Unit = {
       responses.foreach( response => 
assertTrue(responses.get(response._1).isDefined))
       result.fire(responses)
     }
+      replicaManager.handleProduceAppend(

Review Comment:
   nit: Indentation seems off here.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig,
       requestLocal: RequestLocal,
       verificationErrors: Map[TopicPartition, Errors]
     ): Unit = {
-      // Map transaction coordinator errors to known errors for the response
-      val convertedErrors = verificationErrors.map { case (tp, error) =>
-        error match {
-          case Errors.CONCURRENT_TRANSACTIONS |
-            Errors.COORDINATOR_LOAD_IN_PROGRESS |
-            Errors.COORDINATOR_NOT_AVAILABLE |
-            Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS
-          case _ => tp -> error
-        }
-
-      }

Review Comment:
   For my understanding, we remove this here and we adds it back in 
handleProduceAppend and we rely on the conversion in the group coordinator. Did 
I get it right? In the group coordinator, we don't handle 
`CONCURRENT_TRANSACTIONS`, I think. I need to double check.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2560,41 +2562,6 @@ class ReplicaManagerTest {
     }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", 
"CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", 
"COORDINATOR_NOT_AVAILABLE"))
-  def testMaybeVerificationErrorConversions(error: Errors): Unit = {

Review Comment:
   Don't we need to keep this one as we still have those conversion but in a 
different place now?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -762,167 +763,124 @@ class ReplicaManager(val config: KafkaConfig,
                     delayedProduceLock: Option[Lock] = None,
                     recordValidationStatsCallback: Map[TopicPartition, 
RecordValidationStats] => Unit = _ => (),
                     requestLocal: RequestLocal = RequestLocal.NoCaching,
-                    transactionalId: String = null,
-                    actionQueue: ActionQueue = this.defaultActionQueue): Unit 
= {
-    if (isValidRequiredAcks(requiredAcks)) {
-
-      val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] = 
mutable.Map[TopicPartition, VerificationGuard]()
-      val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition, 
errorsPerPartition) =
-        if (transactionalId == null || 
!config.transactionPartitionVerificationEnable)
-          (entriesPerPartition, Map.empty[TopicPartition, MemoryRecords], 
Map.empty[TopicPartition, Errors])
-        else {
-          val verifiedEntries = mutable.Map[TopicPartition, MemoryRecords]()
-          val unverifiedEntries = mutable.Map[TopicPartition, MemoryRecords]()
-          val errorEntries = mutable.Map[TopicPartition, Errors]()
-          partitionEntriesForVerification(verificationGuards, 
entriesPerPartition, verifiedEntries, unverifiedEntries, errorEntries)
-          (verifiedEntries.toMap, unverifiedEntries.toMap, errorEntries.toMap)
-        }
-
-      if (notYetVerifiedEntriesPerPartition.isEmpty || 
addPartitionsToTxnManager.isEmpty) {
-        appendEntries(verifiedEntriesPerPartition, internalTopicsAllowed, 
origin, requiredAcks, verificationGuards.toMap,
-          errorsPerPartition, recordValidationStatsCallback, timeout, 
responseCallback, delayedProduceLock, actionQueue)(requestLocal, Map.empty)
-      } else {
-        // For unverified entries, send a request to verify. When verified, 
the append process will proceed via the callback.
-        // We verify above that all partitions use the same producer ID.
-        val batchInfo = notYetVerifiedEntriesPerPartition.head._2.firstBatch()
-        addPartitionsToTxnManager.foreach(_.verifyTransaction(
-          transactionalId = transactionalId,
-          producerId = batchInfo.producerId,
-          producerEpoch = batchInfo.producerEpoch,
-          topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq,
-          callback = KafkaRequestHandler.wrapAsyncCallback(
-            appendEntries(
-              entriesPerPartition,
-              internalTopicsAllowed,
-              origin,
-              requiredAcks,
-              verificationGuards.toMap,
-              errorsPerPartition,
-              recordValidationStatsCallback,
-              timeout,
-              responseCallback,
-              delayedProduceLock,
-              actionQueue
-            ),
-            requestLocal)
-        ))
-      }
-    } else {
-      // If required.acks is outside accepted range, something is wrong with 
the client
-      // Just return an error and don't handle the request at all
-      val responseStatus = entriesPerPartition.map { case (topicPartition, _) 
=>
-        topicPartition -> new PartitionResponse(
-          Errors.INVALID_REQUIRED_ACKS,
-          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
-          RecordBatch.NO_TIMESTAMP,
-          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
-        )
-      }
-      responseCallback(responseStatus)
+                    actionQueue: ActionQueue = this.defaultActionQueue,
+                    verificationGuards: Map[TopicPartition, VerificationGuard] 
= Map.empty): Unit = {
+    if (!isValidRequiredAcks(requiredAcks)) {
+      sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)
+      return
     }
-  }
 
-  /*
-   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
-   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
-   */
-  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
-                            internalTopicsAllowed: Boolean,
-                            origin: AppendOrigin,
-                            requiredAcks: Short,
-                            verificationGuards: Map[TopicPartition, 
VerificationGuard],
-                            errorsPerPartition: Map[TopicPartition, Errors],
-                            recordConversionStatsCallback: Map[TopicPartition, 
RecordValidationStats] => Unit,
-                            timeout: Long,
-                            responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
-                            delayedProduceLock: Option[Lock],
-                            actionQueue: ActionQueue)
-                           (requestLocal: RequestLocal, unverifiedEntries: 
Map[TopicPartition, Errors]): Unit = {
     val sTime = time.milliseconds
-    val verifiedEntries =
-      if (unverifiedEntries.isEmpty)
-        allEntries
-      else
-        allEntries.filter { case (tp, _) =>
-          !unverifiedEntries.contains(tp)
-        }
-
     val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
-      origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
+      origin, entriesPerPartition, requiredAcks, requestLocal, 
verificationGuards.toMap)
     debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
 
-    val errorResults = (unverifiedEntries ++ errorsPerPartition).map {
-      case (topicPartition, error) =>
-        // translate transaction coordinator errors to known producer response 
errors
-        val customException =
-          error match {
-            case Errors.INVALID_TXN_STATE => Some(error.exception("Partition 
was not added to the transaction"))
-            case Errors.CONCURRENT_TRANSACTIONS |
-                 Errors.COORDINATOR_LOAD_IN_PROGRESS |
-                 Errors.COORDINATOR_NOT_AVAILABLE |
-                 Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException(
-              s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
-            case _ => None
-          }
-        topicPartition -> LogAppendResult(
-          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-          Some(customException.getOrElse(error.exception)),
-          hasCustomErrorMessage = customException.isDefined
-        )
-    }
-
-    val allResults = localProduceResults ++ errorResults
-    val produceStatus = buildProducePartitionStatus(allResults)
+    val produceStatus = buildProducePartitionStatus(localProduceResults)
 
-    addCompletePurgatoryAction(actionQueue, allResults)
-    recordConversionStatsCallback(localProduceResults.map { case (k, v) =>
+    addCompletePurgatoryAction(actionQueue, localProduceResults)
+    recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
       k -> v.info.recordValidationStats
     })
 
     maybeAddDelayedProduce(
       requiredAcks,
       delayedProduceLock,
       timeout,
-      allEntries,
-      allResults,
+      entriesPerPartition,
+      localProduceResults,
       produceStatus,
       responseCallback
     )
   }
 
-  private def partitionEntriesForVerification(verificationGuards: 
mutable.Map[TopicPartition, VerificationGuard],
-                                              entriesPerPartition: 
Map[TopicPartition, MemoryRecords],
-                                              verifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-                                              unverifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-                                              errorEntries: 
mutable.Map[TopicPartition, Errors]): Unit = {
-    val transactionalProducerIds = mutable.HashSet[Long]()
+  /**
+   * Handles the produce request by starting any transactional verification 
before appending.
+   *
+   * @param timeout                       maximum time we will wait to append 
before returning
+   * @param requiredAcks                  number of replicas who must 
acknowledge the append before sending the response
+   * @param internalTopicsAllowed         boolean indicating whether internal 
topics can be appended to
+   * @param origin                        source of the append request (ie, 
client, replication, coordinator)
+   * @param transactionalId               the transactional ID for the produce 
request or null if there is none.
+   * @param entriesPerPartition           the records per partition to be 
appended
+   * @param responseCallback              callback for sending the response
+   * @param recordValidationStatsCallback callback for updating stats on 
record conversions
+   * @param requestLocal                  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *                                      thread calling this method
+   * @param actionQueue                   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   */
+  def handleProduceAppend(timeout: Long,
+                          requiredAcks: Short,
+                          internalTopicsAllowed: Boolean,
+                          origin: AppendOrigin,
+                          transactionalId: String,
+                          entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
+                          responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
+                          recordValidationStatsCallback: Map[TopicPartition, 
RecordValidationStats] => Unit = _ => (),
+                          requestLocal: RequestLocal = RequestLocal.NoCaching,
+                          actionQueue: ActionQueue = this.defaultActionQueue): 
Unit = {
+
+    val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
+    val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
     entriesPerPartition.foreach { case (topicPartition, records) =>
-      try {
-        // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
-        val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
-        transactionalBatches.foreach(batch => 
transactionalProducerIds.add(batch.producerId))
-
-        if (transactionalBatches.nonEmpty) {
-          // We return VerificationGuard if the partition needs to be 
verified. If no state is present, no need to verify.
-          val firstBatch = records.firstBatch
-          val verificationGuard = 
getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId,
 firstBatch.baseSequence, firstBatch.producerEpoch)
-          if (verificationGuard != VerificationGuard.SENTINEL) {
-            verificationGuards.put(topicPartition, verificationGuard)
-            unverifiedEntries.put(topicPartition, records)
-          } else
-            verifiedEntries.put(topicPartition, records)
-        } else {
-          // If there is no producer ID or transactional records in the 
batches, no need to verify.
-          verifiedEntries.put(topicPartition, records)
-        }
-      } catch {
-        case e: Exception => errorEntries.put(topicPartition, 
Errors.forException(e))
-      }
+      // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
+      val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
+      transactionalBatches.foreach(batch => 
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
+      if (!transactionalBatches.isEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
     }
-    // We should have exactly one producer ID for transactional records
-    if (transactionalProducerIds.size > 1) {
+    if (transactionalProducerInfo.size > 1) {
       throw new InvalidPidMappingException("Transactional records contained 
more than one producer ID")
     }
+
+    def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors],
+                                 newRequestLocal: RequestLocal,
+                                 verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
+      val errorResults = preAppendErrors.map {
+        case (topicPartition, error) =>
+          // translate transaction coordinator errors to known producer 
response errors
+          val customException =
+            error match {
+              case Errors.INVALID_TXN_STATE => Some(error.exception("Partition 
was not added to the transaction"))
+              case Errors.CONCURRENT_TRANSACTIONS |
+                   Errors.COORDINATOR_LOAD_IN_PROGRESS |
+                   Errors.COORDINATOR_NOT_AVAILABLE |
+                   Errors.NOT_COORDINATOR => Some(new 
NotEnoughReplicasException(
+                s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
+              case _ => None
+            }
+          topicPartition -> LogAppendResult(
+            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+            Some(customException.getOrElse(error.exception)),
+            hasCustomErrorMessage = customException.isDefined
+          )
+      }
+      val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case 
(key, _) => !errorResults.contains(key) }
+
+      val preAppendPartitionResponses = 
buildProducePartitionStatus(errorResults).map { case (k, status) => k -> 
status.responseStatus }
+
+      def newResponseCallback(responses: Map[TopicPartition, 
PartitionResponse]): Unit = {
+        responseCallback(preAppendPartitionResponses ++ responses)
+      }
+
+      appendRecords(
+        timeout = timeout,
+        requiredAcks = requiredAcks,
+        internalTopicsAllowed = internalTopicsAllowed,
+        origin = origin,
+        entriesPerPartition = entriesWithoutErrorsPerPartition,
+        responseCallback = newResponseCallback,
+        recordValidationStatsCallback = recordValidationStatsCallback,
+        requestLocal = newRequestLocal,
+        actionQueue = actionQueue,
+        verificationGuards = verificationGuards
+      )
+    }
+
+    if (transactionalProducerInfo.size < 1) {
+      postVerificationCallback(Map.empty[TopicPartition, Errors], 
requestLocal, Map.empty[TopicPartition, VerificationGuard])
+      return
+    }
+    maybeStartTransactionVerificationForPartitions(topicPartitionBatchInfo, 
transactionalId,

Review Comment:
   nit: Should we put an empty line before this one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to