jeffkbkim commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1426320341


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -892,6 +895,43 @@ public void replay(
         }
     }
 
+    /**
+     * Applies the given transaction marker.
+     *
+     * @param producerId    The producer id.
+     * @param result        The result of the transaction.
+     * @throws RuntimeException if the transaction can not be completed.
+     */
+    public void completeTransaction(
+        long producerId,
+        TransactionResult result
+    ) throws RuntimeException {
+        Offsets pendingOffsets = 
pendingTransactionalOffsets.remove(producerId);
+
+        if (result == TransactionResult.COMMIT) {
+            log.debug("Committed transactional offset commits for producer id 
{}.", producerId);
+            if (pendingOffsets == null) return;
+
+            pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
+                topicOffsets.forEach((topicName, partitionOffsets) -> {
+                    partitionOffsets.forEach((partitionId, offsetAndMetadata) 
-> {
+                        log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
+                            "with topic {}, partition {}, and offset {}.",
+                            producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
+                        offsets.put(
+                            groupId,
+                            topicName,
+                            partitionId,
+                            offsetAndMetadata
+                        );

Review Comment:
   Some points i would like to confirm:
   1. During load, let's say there is an uncommitted end txn marker and we 
update the `offsets` map. Until the high watermark advances, we shouldn't 
consider the transaction as committed.
   2. During normal txn write operation, we append the end txn marker and 
update the last written offset to the end txn marker offset + 1. Once it's 
committed we can advance the HWM / last committed offset.
   
   Is this right? i think i'm confused on actually how we differentiate a 
replicated end txn marker from an end txn marker that was only appended to the 
leader.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2416,14 +2418,49 @@ class KafkaApis(val requestChannel: RequestChannel,
         numAppends.decrementAndGet()
         skippedMarkers += 1
       } else {
-        val controlRecords = partitionsWithCompatibleMessageFormat.map { 
partition =>
-          val controlRecordType = marker.transactionResult match {
-            case TransactionResult.COMMIT => ControlRecordType.COMMIT
-            case TransactionResult.ABORT => ControlRecordType.ABORT
+        val controlRecordType = marker.transactionResult match {
+          case TransactionResult.COMMIT => ControlRecordType.COMMIT
+          case TransactionResult.ABORT => ControlRecordType.ABORT
+        }
+
+        val markerResults = new ConcurrentHashMap[TopicPartition, Errors]()
+        def maybeComplete(): Unit = {
+          if (partitionsWithCompatibleMessageFormat.size == 
markerResults.size) {
+            maybeSendResponseCallback(producerId, marker.transactionResult, 
markerResults)
+          }
+        }
+
+        val controlRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
+        partitionsWithCompatibleMessageFormat.foreach { partition =>
+          if (config.isNewGroupCoordinatorEnabled && partition.topic == 
GROUP_METADATA_TOPIC_NAME) {
+            // When the new group coordinator is used, writing the end marker 
is fully delegated
+            // the group coordinator.

Review Comment:
   nit: "to the"



##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -135,7 +136,22 @@ class CoordinatorLoaderImpl[T](
 
             memoryRecords.batches.forEach { batch =>
               if (batch.isControlBatch) {
-                throw new IllegalStateException("Control batches are not 
supported yet.")
+                batch.asScala.foreach { record =>

Review Comment:
   +1, both writing and loading the end txn marker are referred to as 
completeTransaction which is confusing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to