hachikuji commented on a change in pull request #9632:
URL: https://github.com/apache/kafka/pull/9632#discussion_r528007637



##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -301,26 +304,28 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
     }
   }
 
-  def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
-                         producerEpoch: Short,
-                         offset: Long,
-                         timestamp: Long): CompletedTxn = {
+  def appendEndTxnMarker(
+    endTxnMarker: EndTransactionMarker,
+    producerEpoch: Short,
+    offset: Long,
+    timestamp: Long
+  ): Option[CompletedTxn] = {
     checkProducerEpoch(producerEpoch, offset)
     checkCoordinatorEpoch(endTxnMarker, offset)
 
-    val firstOffset = updatedEntry.currentTxnFirstOffset match {
-      case Some(txnFirstOffset) => txnFirstOffset
-      case None =>
-        transactions += new TxnMetadata(producerId, offset)
-        offset
+    // Only emit the `CompletedTxn` for non-empty transactions. A transaction 
marker
+    // without any associated data will not have any impact on the last stable 
offset
+    // and would not need to be reflected in the transaction index.
+    val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
+      CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType 
== ControlRecordType.ABORT)

Review comment:
       Yes, that is right. Additionally, we are not adding the transaction to 
the list of started transactions which are accumulated in the 
`ProducerAppendInfo`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to