lbradstreet commented on a change in pull request #9632:
URL: https://github.com/apache/kafka/pull/9632#discussion_r528006935



##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -301,26 +304,28 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
     }
   }
 
-  def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
-                         producerEpoch: Short,
-                         offset: Long,
-                         timestamp: Long): CompletedTxn = {
+  def appendEndTxnMarker(
+    endTxnMarker: EndTransactionMarker,
+    producerEpoch: Short,
+    offset: Long,
+    timestamp: Long
+  ): Option[CompletedTxn] = {
     checkProducerEpoch(producerEpoch, offset)
     checkCoordinatorEpoch(endTxnMarker, offset)
 
-    val firstOffset = updatedEntry.currentTxnFirstOffset match {
-      case Some(txnFirstOffset) => txnFirstOffset
-      case None =>
-        transactions += new TxnMetadata(producerId, offset)
-        offset
+    // Only emit the `CompletedTxn` for non-empty transactions. A transaction 
marker
+    // without any associated data will not have any impact on the last stable 
offset
+    // and would not need to be reflected in the transaction index.
+    val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
+      CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType 
== ControlRecordType.ABORT)

Review comment:
       Could you check my understanding? If we have a a non-empty 
currentTxnFirstOffset value (indicating a non-empty transaction), we'll return 
a valid CompletedTxn, otherwise we will return None. For the empty transactions 
this means that we aren't accumulating completed transactions. This saves us 
from having to call lastStableOffset on every empty completed transaction 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1240?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to