hachikuji commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1423297221


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -915,64 +975,239 @@ class ReplicaManager(val config: KafkaConfig,
             // probably unblock some follower fetch requests since log end 
offset has been updated
             delayedFetchPurgatory.checkAndComplete(requestKey)
           case LeaderHwChange.NONE =>
-            // nothing
-          }
+          // nothing
         }
+      }
     }
+  }
 
-    recordConversionStatsCallback(localProduceResults.map { case (k, v) => k 
-> v.info.recordValidationStats })
+  /**
+   * Append messages to offsets topic, and wait for them to be replicated to 
other replicas;
+   * the callback function will be triggered either when timeout or the 
required acks are satisfied;
+   * if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   * This method should not return until the write to the local log is 
completed because updating offsets requires updating
+   * the in-memory and persisted state under a lock together.
+   *
+   * Noted that all pending delayed check operations are stored in a queue. 
All callers to ReplicaManager.appendRecords()
+   * are expected to call ActionQueue.tryCompleteActions for all affected 
partitions, without holding any conflicting
+   * locks.
+   *
+   * If appending transactional records, call 
maybeStartTransactionVerificationForPartition(s) and call this method in the 
callback.
+   * For example, the GroupCoordinator will pass `doCommitTxnOffsets` as the 
post-verification callback, and that method eventually call this.
+   *
+   * @param timeout                       maximum time we will wait to append 
before returning
+   * @param requiredAcks                  number of replicas who must 
acknowledge the append before sending the response
+   * @param internalTopicsAllowed         boolean indicating whether internal 
topics can be appended to
+   * @param origin                        source of the append request (ie, 
client, replication, coordinator)
+   * @param entriesPerPartition           the records per partition to be 
appended
+   * @param responseCallback              callback for sending the response
+   * @param delayedProduceLock            lock for the delayed actions
+   * @param requestLocal                  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *                                      thread calling this method
+   * @param verificationGuards            the mapping from topic partition to 
verification guards if transaction verification is used
+   */
+  def appendForGroup(
+    timeout: Long,
+    requiredAcks: Short,
+    entriesPerPartition: Map[TopicPartition, MemoryRecords],
+    responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
+    delayedProduceLock: Option[Lock],
+    requestLocal: RequestLocal,
+    verificationGuards: Map[TopicPartition, VerificationGuard]
+  ): Unit = {
+    if (!isValidRequiredAcks(requiredAcks)) {
+      sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)
+      return
+    }
+
+    val sTime = time.milliseconds
+    val localProduceResults = appendToLocalLog(
+      internalTopicsAllowed = true,
+      origin = AppendOrigin.COORDINATOR,
+      entriesPerPartition,
+      requiredAcks,
+      requestLocal,
+      verificationGuards.toMap
+    )
+
+    debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
 
-    if (delayedProduceRequestRequired(requiredAcks, allEntries, allResults)) {
+    val allResults = localProduceResults
+    val produceStatus = buildProducePartitionStatus(allResults)
+
+    addCompletePurgatoryAction(defaultActionQueue, allResults)
+
+    maybeAddDelayedProduce(
+      requiredAcks,
+      delayedProduceLock,
+      timeout,
+      entriesPerPartition,
+      allResults,
+      produceStatus,
+      responseCallback
+    )
+  }
+
+  private def maybeAddDelayedProduce(
+    requiredAcks: Short,
+    delayedProduceLock: Option[Lock],
+    timeoutMs: Long,
+    entriesPerPartition: Map[TopicPartition, MemoryRecords],
+    initialAppendResults: Map[TopicPartition, LogAppendResult],
+    initialProduceStatus: Map[TopicPartition, ProducePartitionStatus],
+    responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
+  ): Unit = {
+    if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
initialAppendResults)) {
       // create delayed produce operation
-      val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
-      val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, 
responseCallback, delayedProduceLock)
+      val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
+      val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, 
this, responseCallback, delayedProduceLock)
 
       // create a list of (topic, partition) pairs to use as keys for this 
delayed produce operation
-      val producerRequestKeys = 
allEntries.keys.map(TopicPartitionOperationKey(_)).toSeq
+      val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
 
       // try to complete the request immediately, otherwise put it into the 
purgatory
       // this is because while the delayed produce operation is being created, 
new
       // requests may arrive and hence make this operation completable.
       delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
     } else {
       // we can respond immediately
-      val produceResponseStatus = produceStatus.map { case (k, status) => k -> 
status.responseStatus }
+      val produceResponseStatus = initialProduceStatus.map { case (k, status) 
=> k -> status.responseStatus }
       responseCallback(produceResponseStatus)
     }
   }
 
-  private def partitionEntriesForVerification(verificationGuards: 
mutable.Map[TopicPartition, VerificationGuard],
-                                              entriesPerPartition: 
Map[TopicPartition, MemoryRecords],
-                                              verifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-                                              unverifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-                                              errorEntries: 
mutable.Map[TopicPartition, Errors]): Unit= {
-    val transactionalProducerIds = mutable.HashSet[Long]()
-    entriesPerPartition.foreach { case (topicPartition, records) =>
-      try {
-        // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
-        val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
-        transactionalBatches.foreach(batch => 
transactionalProducerIds.add(batch.producerId))
+  private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, 
MemoryRecords],
+                                             responseCallback: 
Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+    // If required.acks is outside accepted range, something is wrong with the 
client
+    // Just return an error and don't handle the request at all
+    val responseStatus = entries.map { case (topicPartition, _) =>
+      topicPartition -> new PartitionResponse(
+        Errors.INVALID_REQUIRED_ACKS,
+        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
+        RecordBatch.NO_TIMESTAMP,
+        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
+      )
+    }
+    responseCallback(responseStatus)
+  }
 
-        if (transactionalBatches.nonEmpty) {
-          // We return VerificationGuard if the partition needs to be 
verified. If no state is present, no need to verify.
-          val firstBatch = records.firstBatch
-          val verificationGuard = 
getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId,
 firstBatch.baseSequence, firstBatch.producerEpoch)
-          if (verificationGuard != VerificationGuard.SENTINEL) {
-            verificationGuards.put(topicPartition, verificationGuard)
-            unverifiedEntries.put(topicPartition, records)
-          } else
-            verifiedEntries.put(topicPartition, records)
-        } else {
-          // If there is no producer ID or transactional records in the 
batches, no need to verify.
-          verifiedEntries.put(topicPartition, records)
+  def maybeStartTransactionVerificationForPartition(

Review Comment:
   Could we add some brief documentation for this method? One detail it would 
be useful to document is the use of `VerificationGuard.SENTINEL` in the 
callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to