This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e7de280 KAFKA-10702; Skip bookkeeping of empty transactions (#9632)
e7de280 is described below
commit e7de280b0f1c7a924293dba79be77f56a08d0e15
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon Nov 30 14:48:28 2020 -0800
KAFKA-10702; Skip bookkeeping of empty transactions (#9632)
Compacted topics can accumulate a large number of empty transaction markers
as the data from the transactions gets cleaned. For each transaction, there is
some bookkeeping that leaders and followers must do to keep the transaction
index up to date. The cost of this overhead can degrade performance when a
replica needs to catch up if the log has mostly empty or small transactions.
This patch improves the cost by skipping over empty transactions since these
will have no effect on the la [...]
Reviewers: Lucas Bradstreet <[email protected]>, Jun Rao <[email protected]>
---
.../scala/kafka/log/ProducerStateManager.scala | 36 ++++----
core/src/test/scala/unit/kafka/log/LogTest.scala | 10 ++-
.../unit/kafka/log/ProducerStateManagerTest.scala | 96 +++++++++++++++++-----
3 files changed, 102 insertions(+), 40 deletions(-)
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index d811d12..4a5a64a 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -44,7 +44,11 @@ class CorruptSnapshotException(msg: String) extends
KafkaException(msg)
case class LastRecord(lastDataOffset: Option[Long], producerEpoch: Short)
-private[log] case class TxnMetadata(producerId: Long, var firstOffset:
LogOffsetMetadata, var lastOffset: Option[Long] = None) {
+private[log] case class TxnMetadata(
+ producerId: Long,
+ firstOffset: LogOffsetMetadata,
+ var lastOffset: Option[Long] = None
+) {
def this(producerId: Long, firstOffset: Long) = this(producerId,
LogOffsetMetadata(firstOffset))
override def toString: String = {
@@ -247,8 +251,7 @@ private[log] class ProducerAppendInfo(val topicPartition:
TopicPartition,
if (recordIterator.hasNext) {
val record = recordIterator.next()
val endTxnMarker = EndTransactionMarker.deserialize(record)
- val completedTxn = appendEndTxnMarker(endTxnMarker,
batch.producerEpoch, batch.baseOffset, record.timestamp)
- Some(completedTxn)
+ appendEndTxnMarker(endTxnMarker, batch.producerEpoch,
batch.baseOffset, record.timestamp)
} else {
// An empty control batch means the entire transaction has been
cleaned from the log, so no need to append
None
@@ -301,18 +304,20 @@ private[log] class ProducerAppendInfo(val topicPartition:
TopicPartition,
}
}
- def appendEndTxnMarker(endTxnMarker: EndTransactionMarker,
- producerEpoch: Short,
- offset: Long,
- timestamp: Long): CompletedTxn = {
+ def appendEndTxnMarker(
+ endTxnMarker: EndTransactionMarker,
+ producerEpoch: Short,
+ offset: Long,
+ timestamp: Long
+ ): Option[CompletedTxn] = {
checkProducerEpoch(producerEpoch, offset)
checkCoordinatorEpoch(endTxnMarker, offset)
- val firstOffset = updatedEntry.currentTxnFirstOffset match {
- case Some(txnFirstOffset) => txnFirstOffset
- case None =>
- transactions += new TxnMetadata(producerId, offset)
- offset
+ // Only emit the `CompletedTxn` for non-empty transactions. A transaction
marker
+ // without any associated data will not have any impact on the last stable
offset
+ // and would not need to be reflected in the transaction index.
+ val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset =>
+ CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType
== ControlRecordType.ABORT)
}
updatedEntry.maybeUpdateProducerEpoch(producerEpoch)
@@ -320,7 +325,7 @@ private[log] class ProducerAppendInfo(val topicPartition:
TopicPartition,
updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch
updatedEntry.lastTimestamp = timestamp
- CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType ==
ControlRecordType.ABORT)
+ completedTxn
}
def toEntry: ProducerStateEntry = updatedEntry
@@ -575,9 +580,10 @@ class ProducerStateManager(val topicPartition:
TopicPartition,
/**
* The first undecided offset is the earliest transactional message which
has not yet been committed
- * or aborted.
+ * or aborted. Unlike [[firstUnstableOffset]], this does not reflect the
state of replication (i.e.
+ * whether a completed transaction marker is beyond the high watermark).
*/
- def firstUndecidedOffset: Option[Long] =
Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset)
+ private[log] def firstUndecidedOffset: Option[Long] =
Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset)
/**
* Returns the last offset of this map
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3759122..a3aea7a 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -4411,9 +4411,11 @@ class LogTest {
assertEquals(0L, log.lastStableOffset)
// Try the append a second time. The appended offset in the log should
still increase.
- assertThrows[KafkaStorageException] {
- appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT,
coordinatorEpoch = 1)
- }
+ // Note that the second append does not write to the transaction index
because the producer
+ // state has already been updated and we do not write index entries for
empty transactions.
+ // In the future, we may strengthen the fencing logic so that additional
writes to the
+ // log are not possible after an IO error (see KAFKA-10778).
+ appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT,
coordinatorEpoch = 1)
assertEquals(12L, log.logEndOffset)
assertEquals(0L, log.lastStableOffset)
@@ -4425,7 +4427,7 @@ class LogTest {
val reopenedLog = createLog(logDir, logConfig, lastShutdownClean = false)
assertEquals(12L, reopenedLog.logEndOffset)
- assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
+ assertEquals(1, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
reopenedLog.updateHighWatermark(12L)
assertEquals(None, reopenedLog.firstUnstableOffset)
}
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index dfebf7e..207e129 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.StandardOpenOption
import java.util.Collections
+import java.util.concurrent.atomic.AtomicInteger
import kafka.server.LogOffsetMetadata
import kafka.utils.TestUtils
@@ -178,29 +179,24 @@ class ProducerStateManagerTest {
}
@Test
- def testControlRecordBumpsEpoch(): Unit = {
- val epoch = 0.toShort
- append(stateManager, producerId, epoch, 0, 0L)
+ def testControlRecordBumpsProducerEpoch(): Unit = {
+ val producerEpoch = 0.toShort
+ append(stateManager, producerId, producerEpoch, 0, 0L)
- val bumpedEpoch = 1.toShort
- val (completedTxn, lastStableOffset) = appendEndTxnMarker(stateManager,
producerId, bumpedEpoch, ControlRecordType.ABORT, 1L)
- assertEquals(1L, completedTxn.firstOffset)
- assertEquals(1L, completedTxn.lastOffset)
- assertEquals(2L, lastStableOffset)
- assertTrue(completedTxn.isAborted)
- assertEquals(producerId, completedTxn.producerId)
+ val bumpedProducerEpoch = 1.toShort
+ appendEndTxnMarker(stateManager, producerId, bumpedProducerEpoch,
ControlRecordType.ABORT, 1L)
val maybeLastEntry = stateManager.lastEntry(producerId)
assertTrue(maybeLastEntry.isDefined)
val lastEntry = maybeLastEntry.get
- assertEquals(bumpedEpoch, lastEntry.producerEpoch)
+ assertEquals(bumpedProducerEpoch, lastEntry.producerEpoch)
assertEquals(None, lastEntry.currentTxnFirstOffset)
assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.firstSeq)
assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq)
// should be able to append with the new epoch if we start at sequence 0
- append(stateManager, producerId, bumpedEpoch, 0, 2L)
+ append(stateManager, producerId, bumpedProducerEpoch, 0, 2L)
assertEquals(Some(0), stateManager.lastEntry(producerId).map(_.firstSeq))
}
@@ -221,6 +217,64 @@ class ProducerStateManagerTest {
}
@Test
+ def testSkipEmptyTransactions(): Unit = {
+ val producerEpoch = 0.toShort
+ val coordinatorEpoch = 27
+ val seq = new AtomicInteger(0)
+
+ def appendEndTxn(
+ recordType: ControlRecordType,
+ offset: Long,
+ appendInfo: ProducerAppendInfo
+ ): Option[CompletedTxn] = {
+ appendInfo.appendEndTxnMarker(new EndTransactionMarker(recordType,
coordinatorEpoch),
+ producerEpoch, offset, time.milliseconds())
+ }
+
+ def appendData(
+ startOffset: Long,
+ endOffset: Long,
+ appendInfo: ProducerAppendInfo
+ ): Unit = {
+ val count = (endOffset - startOffset).toInt
+ appendInfo.appendDataBatch(producerEpoch, seq.get(),
seq.addAndGet(count), time.milliseconds(),
+ LogOffsetMetadata(startOffset), endOffset, isTransactional = true)
+ seq.incrementAndGet()
+ }
+
+ // Start one transaction in a separate append
+ val firstAppend = stateManager.prepareUpdate(producerId, origin =
AppendOrigin.Client)
+ appendData(16L, 20L, firstAppend)
+ assertEquals(new TxnMetadata(producerId, 16L),
firstAppend.startedTransactions.head)
+ stateManager.update(firstAppend)
+ stateManager.onHighWatermarkUpdated(21L)
+ assertEquals(Some(LogOffsetMetadata(16L)),
stateManager.firstUnstableOffset)
+
+ // Now do a single append which completes the old transaction, mixes in
+ // some empty transactions, one non-empty complete transaction, and one
+ // incomplete transaction
+ val secondAppend = stateManager.prepareUpdate(producerId, origin =
AppendOrigin.Client)
+ val firstCompletedTxn = appendEndTxn(ControlRecordType.COMMIT, 21,
secondAppend)
+ assertEquals(Some(CompletedTxn(producerId, 16L, 21, isAborted = false)),
firstCompletedTxn)
+ assertEquals(None, appendEndTxn(ControlRecordType.COMMIT, 22,
secondAppend))
+ assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 23, secondAppend))
+ appendData(24L, 27L, secondAppend)
+ val secondCompletedTxn = appendEndTxn(ControlRecordType.ABORT, 28L,
secondAppend)
+ assertTrue(secondCompletedTxn.isDefined)
+ assertEquals(None, appendEndTxn(ControlRecordType.ABORT, 29L,
secondAppend))
+ appendData(30L, 31L, secondAppend)
+
+ assertEquals(2, secondAppend.startedTransactions.size)
+ assertEquals(TxnMetadata(producerId, LogOffsetMetadata(24L)),
secondAppend.startedTransactions.head)
+ assertEquals(TxnMetadata(producerId, LogOffsetMetadata(30L)),
secondAppend.startedTransactions.last)
+ stateManager.update(secondAppend)
+ stateManager.completeTxn(firstCompletedTxn.get)
+ stateManager.completeTxn(secondCompletedTxn.get)
+ stateManager.onHighWatermarkUpdated(32L)
+ assertEquals(Some(LogOffsetMetadata(30L)),
stateManager.firstUnstableOffset)
+ }
+
+ @Test
def testLastStableOffsetCompletedTxn(): Unit = {
val producerEpoch = 0.toShort
val segmentBaseOffset = 990000L
@@ -333,7 +387,10 @@ class ProducerStateManagerTest {
assertEquals(List(new TxnMetadata(producerId, 16L)),
appendInfo.startedTransactions)
val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT,
coordinatorEpoch)
- val completedTxn = appendInfo.appendEndTxnMarker(endTxnMarker,
producerEpoch, 40L, time.milliseconds())
+ val completedTxnOpt = appendInfo.appendEndTxnMarker(endTxnMarker,
producerEpoch, 40L, time.milliseconds())
+ assertTrue(completedTxnOpt.isDefined)
+
+ val completedTxn = completedTxnOpt.get
assertEquals(producerId, completedTxn.producerId)
assertEquals(16L, completedTxn.firstOffset)
assertEquals(40L, completedTxn.lastOffset)
@@ -821,7 +878,6 @@ class ProducerStateManagerTest {
@Test
def testAppendEmptyControlBatch(): Unit = {
val producerId = 23423L
- val producerEpoch = 145.toShort
val baseOffset = 15
val batch: RecordBatch = EasyMock.createMock(classOf[RecordBatch])
@@ -830,7 +886,7 @@ class ProducerStateManagerTest {
EasyMock.replay(batch)
// Appending the empty control batch should not throw and a new
transaction shouldn't be started
- append(stateManager, producerId, producerEpoch, baseOffset, batch, origin
= AppendOrigin.Client)
+ append(stateManager, producerId, baseOffset, batch, origin =
AppendOrigin.Client)
assertEquals(None,
stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
}
@@ -904,15 +960,14 @@ class ProducerStateManagerTest {
controlType: ControlRecordType,
offset: Long,
coordinatorEpoch: Int = 0,
- timestamp: Long = time.milliseconds()):
(CompletedTxn, Long) = {
+ timestamp: Long = time.milliseconds()):
Option[CompletedTxn] = {
val producerAppendInfo = stateManager.prepareUpdate(producerId, origin =
AppendOrigin.Coordinator)
val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
- val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker,
producerEpoch, offset, timestamp)
+ val completedTxnOpt = producerAppendInfo.appendEndTxnMarker(endTxnMarker,
producerEpoch, offset, timestamp)
mapping.update(producerAppendInfo)
- val lastStableOffset = mapping.lastStableOffset(completedTxn)
- mapping.completeTxn(completedTxn)
+ completedTxnOpt.foreach(mapping.completeTxn)
mapping.updateMapEndOffset(offset + 1)
- (completedTxn, lastStableOffset)
+ completedTxnOpt
}
private def append(stateManager: ProducerStateManager,
@@ -932,7 +987,6 @@ class ProducerStateManagerTest {
private def append(stateManager: ProducerStateManager,
producerId: Long,
- producerEpoch: Short,
offset: Long,
batch: RecordBatch,
origin: AppendOrigin): Unit = {