lbradstreet commented on a change in pull request #9632: URL: https://github.com/apache/kafka/pull/9632#discussion_r528006935
########## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ########## @@ -301,26 +304,28 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, } } - def appendEndTxnMarker(endTxnMarker: EndTransactionMarker, - producerEpoch: Short, - offset: Long, - timestamp: Long): CompletedTxn = { + def appendEndTxnMarker( + endTxnMarker: EndTransactionMarker, + producerEpoch: Short, + offset: Long, + timestamp: Long + ): Option[CompletedTxn] = { checkProducerEpoch(producerEpoch, offset) checkCoordinatorEpoch(endTxnMarker, offset) - val firstOffset = updatedEntry.currentTxnFirstOffset match { - case Some(txnFirstOffset) => txnFirstOffset - case None => - transactions += new TxnMetadata(producerId, offset) - offset + // Only emit the `CompletedTxn` for non-empty transactions. A transaction marker + // without any associated data will not have any impact on the last stable offset + // and would not need to be reflected in the transaction index. + val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset => + CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT) Review comment: Could you check my understanding? If we have a a non-empty currentTxnFirstOffset value (indicating a non-empty transaction), we'll return a valid CompletedTxn, otherwise we will return None. For the empty transactions this means that we aren't accumulating completed transactions. This saves us from having to call lastStableOffset on every empty completed transaction https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1240? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org