jeffkbkim commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1430514027


##########
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##########
@@ -190,4 +170,62 @@ class CoordinatorPartitionWriter[T](
         throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
     }
   }
+
+  /**
+   * Write the transaction end marker.
+   *
+   * @param tp                The partition to write records to.
+   * @param producerId        The producer id.
+   * @param producerEpoch     The producer epoch.
+   * @param coordinatorEpoch  The epoch of the transaction coordinator.
+   * @param result            The transaction result.
+   * @return The log end offset right after the written records.
+   * @throws KafkaException Any KafkaException caught during the write 
operation.
+   */
+  override def appendTransactionEndMarker(

Review Comment:
   nit: should this be appendEndTransactionMarker? also in L175



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2416,14 +2419,56 @@ class KafkaApis(val requestChannel: RequestChannel,
         numAppends.decrementAndGet()
         skippedMarkers += 1
       } else {
-        val controlRecords = partitionsWithCompatibleMessageFormat.map { 
partition =>
-          val controlRecordType = marker.transactionResult match {
-            case TransactionResult.COMMIT => ControlRecordType.COMMIT
-            case TransactionResult.ABORT => ControlRecordType.ABORT
+        val controlRecordType = marker.transactionResult match {
+          case TransactionResult.COMMIT => ControlRecordType.COMMIT
+          case TransactionResult.ABORT => ControlRecordType.ABORT
+        }
+
+        val markerResults = new ConcurrentHashMap[TopicPartition, Errors]()
+        def maybeComplete(): Unit = {
+          if (partitionsWithCompatibleMessageFormat.size == 
markerResults.size) {
+            maybeSendResponseCallback(producerId, marker.transactionResult, 
markerResults)
+          }
+        }
+
+        val controlRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
+        partitionsWithCompatibleMessageFormat.foreach { partition =>
+          if (config.isNewGroupCoordinatorEnabled && partition.topic == 
GROUP_METADATA_TOPIC_NAME) {
+            // When the new group coordinator is used, writing the end marker 
is fully delegated
+            // to the group coordinator.
+            groupCoordinator.completeTransaction(
+              partition,
+              marker.producerId,
+              marker.producerEpoch,
+              marker.coordinatorEpoch,
+              marker.transactionResult,
+              Duration.ofMillis(config.requestTimeoutMs.toLong)
+            ).whenComplete { (_, exception) =>
+              val error = if (exception == null) {
+                Errors.NONE
+              } else {
+                Errors.forException(exception) match {
+                  case Errors.COORDINATOR_NOT_AVAILABLE | 
Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR =>
+                    // The transaction coordinator does not expect those 
errors so we translate them
+                    // to NOT_LEADER_OR_FOLLOWER to signal to it that the 
coordinator is not ready yet.
+                    Errors.NOT_LEADER_OR_FOLLOWER
+                  case error =>
+                    error
+                }
+              }
+              markerResults.put(partition, error)
+              maybeComplete()
+            }
+          } else {
+            // Otherwise, the regular appendRecords path is used for all the 
non __consumer_offsets
+            // partitions or for all partitions when the group coordinator is 
disabled.

Review Comment:
   nit: when the "new group coordinator" is disabled



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2416,14 +2419,56 @@ class KafkaApis(val requestChannel: RequestChannel,
         numAppends.decrementAndGet()
         skippedMarkers += 1
       } else {
-        val controlRecords = partitionsWithCompatibleMessageFormat.map { 
partition =>
-          val controlRecordType = marker.transactionResult match {
-            case TransactionResult.COMMIT => ControlRecordType.COMMIT
-            case TransactionResult.ABORT => ControlRecordType.ABORT
+        val controlRecordType = marker.transactionResult match {
+          case TransactionResult.COMMIT => ControlRecordType.COMMIT
+          case TransactionResult.ABORT => ControlRecordType.ABORT
+        }
+
+        val markerResults = new ConcurrentHashMap[TopicPartition, Errors]()

Review Comment:
   to understand: 
    * markerResults represents the to-be appended error result for each topic 
partition in the marker. 
    * it is concurrent since it also gets updated via appendRecords() response 
callback.
    * it is updated separately in the new group coordinator as we append 
through the new runtime framework and not via appendRecords().
    * we check `if (partitionsWithCompatibleMessageFormat.size == 
markerResults.size)` to only call `maybeSendResponseCallback` once for all 
partitions in a marker. `numAppends` serves a similar purpose but it applies to 
a one final response for all markers.
   
   does this sound right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to