This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e7de280  KAFKA-10702; Skip bookkeeping of empty transactions (#9632)
e7de280 is described below

commit e7de280b0f1c7a924293dba79be77f56a08d0e15
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon Nov 30 14:48:28 2020 -0800

    KAFKA-10702; Skip bookkeeping of empty transactions (#9632)
    
    Compacted topics can accumulate a large number of empty transaction markers 
as the data from the transactions gets cleaned. For each transaction, there is 
some bookkeeping that leaders and followers must do to keep the transaction 
index up to date. The cost of this overhead can degrade performance when a 
replica needs to catch up if the log has mostly empty or small transactions. 
This patch improves the cost by skipping over empty transactions since these 
will have no effect on the la [...]
    
    Reviewers: Lucas Bradstreet <[email protected]>, Jun Rao <[email protected]>
---
 .../scala/kafka/log/ProducerStateManager.scala     | 36 ++++----
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 10 ++-
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 96 +++++++++++++++++-----
 3 files changed, 102 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index d811d12..4a5a64a 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -44,7 +44,11 @@ class CorruptSnapshotException(msg: String) extends 
KafkaException(msg)
 case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short)
 
 
-private[log] case class TxnMetadata(producerId: Long, var firstOffset: 
LogOffsetMetadata, var lastOffset: Option[Long] = None) {
+private[log] case class TxnMetadata(
+  producerId: Long,
+  firstOffset: LogOffsetMetadata,
+  var lastOffset: Option[Long] = None
+) {
   def this(producerId: Long, firstOffset: Long) = this(producerId, 
LogOffsetMetadata(firstOffset))
 
   override def toString: String = {
@@ -247,8 +251,7 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
       if (recordIterator.hasNext) {
         val record = recordIterator.next()
         val endTxnMarker = EndTransactionMarker.deserialize(record)
-        val completedTxn = appendEndTxnMarker(endTxnMarker, 
batch.producerEpoch, batch.baseOffset, record.timestamp)
-        Some(completedTxn)
+        appendEndTxnMarker(endTxnMarker, batch.producerEpoch, 
batch.baseOffset, record.timestamp)
       } else {
         // An empty control batch means the entire transaction has been 
cleaned from the log, so no need to append
         None
@@ -301,18 +304,20 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
     }
   }
 
-  def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
-                         producerEpoch: Short,
-                         offset: Long,
-                         timestamp: Long): CompletedTxn = {
+  def appendEndTxnMarker(
+    endTxnMarker: EndTransactionMarker,
+    producerEpoch: Short,
+    offset: Long,
+    timestamp: Long
+  ): Option[CompletedTxn] = {
     checkProducerEpoch(producerEpoch, offset)
     checkCoordinatorEpoch(endTxnMarker, offset)
 
-    val firstOffset = updatedEntry.currentTxnFirstOffset match {
-      case Some(txnFirstOffset) => txnFirstOffset
-      case None =>
-        transactions += new TxnMetadata(producerId, offset)
-        offset
+    // Only emit the `CompletedTxn` for non-empty transactions. A transaction 
marker
+    // without any associated data will not have any impact on the last stable 
offset
+    // and would not need to be reflected in the transaction index.
+    val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
+      CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType 
== ControlRecordType.ABORT)
     }
 
     updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
@@ -320,7 +325,7 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
     updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch
     updatedEntry.lastTimestamp = timestamp
 
-    CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == 
ControlRecordType.ABORT)
+    completedTxn
   }
 
   def toEntry: ProducerStateEntry = updatedEntry
@@ -575,9 +580,10 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
 
   /**
    * The first undecided offset is the earliest transactional message which 
has not yet been committed
-   * or aborted.
+   * or aborted. Unlike [[firstUnstableOffset]], this does not reflect the 
state of replication (i.e.
+   * whether a completed transaction marker is beyond the high watermark).
    */
-  def firstUndecidedOffset: Option[Long] = 
Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset)
+  private[log] def firstUndecidedOffset: Option[Long] = 
Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset)
 
   /**
    * Returns the last offset of this map
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3759122..a3aea7a 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -4411,9 +4411,11 @@ class LogTest {
     assertEquals(0L, log.lastStableOffset)
 
     // Try the append a second time. The appended offset in the log should 
still increase.
-    assertThrows[KafkaStorageException] {
-      appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, 
coordinatorEpoch = 1)
-    }
+    // Note that the second append does not write to the transaction index 
because the producer
+    // state has already been updated and we do not write index entries for 
empty transactions.
+    // In the future, we may strengthen the fencing logic so that additional 
writes to the
+    // log are not possible after an IO error (see KAFKA-10778).
+    appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, 
coordinatorEpoch = 1)
     assertEquals(12L, log.logEndOffset)
     assertEquals(0L, log.lastStableOffset)
 
@@ -4425,7 +4427,7 @@ class LogTest {
 
     val reopenedLog = createLog(logDir, logConfig, lastShutdownClean = false)
     assertEquals(12L, reopenedLog.logEndOffset)
-    assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
+    assertEquals(1, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
     reopenedLog.updateHighWatermark(12L)
     assertEquals(None, reopenedLog.firstUnstableOffset)
   }
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index dfebf7e..207e129 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.StandardOpenOption
 import java.util.Collections
+import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
@@ -178,29 +179,24 @@ class ProducerStateManagerTest {
   }
 
   @Test
-  def testControlRecordBumpsEpoch(): Unit = {
-    val epoch = 0.toShort
-    append(stateManager, producerId, epoch, 0, 0L)
+  def testControlRecordBumpsProducerEpoch(): Unit = {
+    val producerEpoch = 0.toShort
+    append(stateManager, producerId, producerEpoch, 0, 0L)
 
-    val bumpedEpoch = 1.toShort
-    val (completedTxn, lastStableOffset) = appendEndTxnMarker(stateManager, 
producerId, bumpedEpoch, ControlRecordType.ABORT, 1L)
-    assertEquals(1L, completedTxn.firstOffset)
-    assertEquals(1L, completedTxn.lastOffset)
-    assertEquals(2L, lastStableOffset)
-    assertTrue(completedTxn.isAborted)
-    assertEquals(producerId, completedTxn.producerId)
+    val bumpedProducerEpoch = 1.toShort
+    appendEndTxnMarker(stateManager, producerId, bumpedProducerEpoch, 
ControlRecordType.ABORT, 1L)
 
     val maybeLastEntry = stateManager.lastEntry(producerId)
     assertTrue(maybeLastEntry.isDefined)
 
     val lastEntry = maybeLastEntry.get
-    assertEquals(bumpedEpoch, lastEntry.producerEpoch)
+    assertEquals(bumpedProducerEpoch, lastEntry.producerEpoch)
     assertEquals(None, lastEntry.currentTxnFirstOffset)
     assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.firstSeq)
     assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq)
 
     // should be able to append with the new epoch if we start at sequence 0
-    append(stateManager, producerId, bumpedEpoch, 0, 2L)
+    append(stateManager, producerId, bumpedProducerEpoch, 0, 2L)
     assertEquals(Some(0), stateManager.lastEntry(producerId).map(_.firstSeq))
   }
 
@@ -221,6 +217,64 @@ class ProducerStateManagerTest {
   }
 
   @Test
+  def testSkipEmptyTransactions(): Unit = {
+    val producerEpoch = 0.toShort
+    val coordinatorEpoch = 27
+    val seq = new AtomicInteger(0)
+
+    def appendEndTxn(
+      recordType: ControlRecordType,
+      offset: Long,
+      appendInfo: ProducerAppendInfo
+    ): Option[CompletedTxn] = {
+      appendInfo.appendEndTxnMarker(new EndTransactionMarker(recordType, 
coordinatorEpoch),
+        producerEpoch, offset, time.milliseconds())
+    }
+
+    def appendData(
+      startOffset: Long,
+      endOffset: Long,
+      appendInfo: ProducerAppendInfo
+    ): Unit = {
+      val count = (endOffset - startOffset).toInt
+      appendInfo.appendDataBatch(producerEpoch, seq.get(), 
seq.addAndGet(count), time.milliseconds(),
+        LogOffsetMetadata(startOffset), endOffset, isTransactional = true)
+      seq.incrementAndGet()
+    }
+
+    // Start one transaction in a separate append
+    val firstAppend = stateManager.prepareUpdate(producerId, origin = 
AppendOrigin.Client)
+    appendData(16L, 20L, firstAppend)
+    assertEquals(new TxnMetadata(producerId, 16L), 
firstAppend.startedTransactions.head)
+    stateManager.update(firstAppend)
+    stateManager.onHighWatermarkUpdated(21L)
+    assertEquals(Some(LogOffsetMetadata(16L)), 
stateManager.firstUnstableOffset)
+
+    // Now do a single append which completes the old transaction, mixes in
+    // some empty transactions, one non-empty complete transaction, and one
+    // incomplete transaction
+    val secondAppend = stateManager.prepareUpdate(producerId, origin = 
AppendOrigin.Client)
+    val firstCompletedTxn = appendEndTxn(ControlRecordType.COMMIT, 21, 
secondAppend)
+    assertEquals(Some(CompletedTxn(producerId, 16L, 21, isAborted = false)), 
firstCompletedTxn)
+    assertEquals(None, appendEndTxn(ControlRecordType.COMMIT, 22, 
secondAppend))
+    assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 23, secondAppend))
+    appendData(24L, 27L, secondAppend)
+    val secondCompletedTxn = appendEndTxn(ControlRecordType.ABORT, 28L, 
secondAppend)
+    assertTrue(secondCompletedTxn.isDefined)
+    assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 29L, 
secondAppend))
+    appendData(30L, 31L, secondAppend)
+
+    assertEquals(2, secondAppend.startedTransactions.size)
+    assertEquals(TxnMetadata(producerId, LogOffsetMetadata(24L)), 
secondAppend.startedTransactions.head)
+    assertEquals(TxnMetadata(producerId, LogOffsetMetadata(30L)), 
secondAppend.startedTransactions.last)
+    stateManager.update(secondAppend)
+    stateManager.completeTxn(firstCompletedTxn.get)
+    stateManager.completeTxn(secondCompletedTxn.get)
+    stateManager.onHighWatermarkUpdated(32L)
+    assertEquals(Some(LogOffsetMetadata(30L)), 
stateManager.firstUnstableOffset)
+  }
+
+  @Test
   def testLastStableOffsetCompletedTxn(): Unit = {
     val producerEpoch = 0.toShort
     val segmentBaseOffset = 990000L
@@ -333,7 +387,10 @@ class ProducerStateManagerTest {
     assertEquals(List(new TxnMetadata(producerId, 16L)), 
appendInfo.startedTransactions)
 
     val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 
coordinatorEpoch)
-    val completedTxn = appendInfo.appendEndTxnMarker(endTxnMarker, 
producerEpoch, 40L, time.milliseconds())
+    val completedTxnOpt = appendInfo.appendEndTxnMarker(endTxnMarker, 
producerEpoch, 40L, time.milliseconds())
+    assertTrue(completedTxnOpt.isDefined)
+
+    val completedTxn = completedTxnOpt.get
     assertEquals(producerId, completedTxn.producerId)
     assertEquals(16L, completedTxn.firstOffset)
     assertEquals(40L, completedTxn.lastOffset)
@@ -821,7 +878,6 @@ class ProducerStateManagerTest {
   @Test
   def testAppendEmptyControlBatch(): Unit = {
     val producerId = 23423L
-    val producerEpoch = 145.toShort
     val baseOffset = 15
 
     val batch: RecordBatch = EasyMock.createMock(classOf[RecordBatch])
@@ -830,7 +886,7 @@ class ProducerStateManagerTest {
     EasyMock.replay(batch)
 
     // Appending the empty control batch should not throw and a new 
transaction shouldn't be started
-    append(stateManager, producerId, producerEpoch, baseOffset, batch, origin 
= AppendOrigin.Client)
+    append(stateManager, producerId, baseOffset, batch, origin = 
AppendOrigin.Client)
     assertEquals(None, 
stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
   }
 
@@ -904,15 +960,14 @@ class ProducerStateManagerTest {
                                  controlType: ControlRecordType,
                                  offset: Long,
                                  coordinatorEpoch: Int = 0,
-                                 timestamp: Long = time.milliseconds()): 
(CompletedTxn, Long) = {
+                                 timestamp: Long = time.milliseconds()): 
Option[CompletedTxn] = {
     val producerAppendInfo = stateManager.prepareUpdate(producerId, origin = 
AppendOrigin.Coordinator)
     val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
-    val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, 
producerEpoch, offset, timestamp)
+    val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker, 
producerEpoch, offset, timestamp)
     mapping.update(producerAppendInfo)
-    val lastStableOffset = mapping.lastStableOffset(completedTxn)
-    mapping.completeTxn(completedTxn)
+    completedTxnOpt.foreach(mapping.completeTxn)
     mapping.updateMapEndOffset(offset + 1)
-    (completedTxn, lastStableOffset)
+    completedTxnOpt
   }
 
   private def append(stateManager: ProducerStateManager,
@@ -932,7 +987,6 @@ class ProducerStateManagerTest {
 
   private def append(stateManager: ProducerStateManager,
                      producerId: Long,
-                     producerEpoch: Short,
                      offset: Long,
                      batch: RecordBatch,
                      origin: AppendOrigin): Unit = {

Reply via email to