This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 17157ed82a6 KAFKA-17941: Fix TransactionStateManager handling of empty
batch when loading transaction metadata (#20529)
17157ed82a6 is described below
commit 17157ed82a61e909c4d02b6b3fe036e34229bcb4
Author: Calvin Liu <[email protected]>
AuthorDate: Wed Sep 17 15:09:55 2025 -0700
KAFKA-17941: Fix TransactionStateManager handling of empty batch when
loading transaction metadata (#20529)
When loading transaction metadata from a transaction log partition, if
the partition contains a segment ending with an empty batch,
"currOffset" update logic at will be skipped for the last batch. Since
"currOffset" is not advanced to next offset of last batch properly,
TransactionStateManager.loadTransactionMetadata method will be stuck in
the "while" loop.
This change fixes the issue by updating "currOffset" after processing
each batch, whether the batch is empty or not.
---------
Co-authored-by: Vincent Jiang
<[email protected]>
Reviewers: Justine Olshan <[email protected]>, Jun Rao <[email protected]>
---
.../transaction/TransactionStateManager.scala | 2 +-
.../transaction/TransactionStateManagerTest.scala | 65 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 2894bbdd94b..ce4bbafd246 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -477,7 +477,6 @@ class TransactionStateManager(brokerId: Int,
case Some(txnMetadata) =>
loadedTransactions.put(transactionalId, txnMetadata)
}
- currOffset = batch.nextOffset
case unknownKey: UnknownKey =>
warn(s"Unknown message key with version
${unknownKey.version}" +
@@ -485,6 +484,7 @@ class TransactionStateManager(brokerId: Int,
"It could be a left over from an aborted upgrade.")
}
}
+ currOffset = batch.nextOffset
}
}
} catch {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index bfb9c5245d4..d901f95fe79 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -941,6 +941,71 @@ class TransactionStateManagerTest {
assertEquals(0, transactionManager.loadingPartitions.size)
}
+ private def createEmptyBatch(baseOffset: Long, lastOffset: Long):
MemoryRecords = {
+ val buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD)
+ DefaultRecordBatch.writeEmptyHeader(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset,
lastOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ TimestampType.CREATE_TIME, System.currentTimeMillis, false, false)
+ buffer.flip
+ MemoryRecords.readableRecords(buffer)
+ }
+
+ @Test
+ def testLoadTransactionMetadataContainingSegmentEndingWithEmptyBatch(): Unit
= {
+ // Simulate a case where a log contains two segments and the first segment
ending with an empty batch.
+ txnMetadata1.state = PrepareCommit
+ txnMetadata1.addPartitions(Set[TopicPartition](new
TopicPartition("topic1", 0)))
+ txnMetadata2.state = Ongoing
+ txnMetadata2.addPartitions(Set[TopicPartition](new
TopicPartition("topic2", 0)))
+
+ // Create the first segment which contains two batches.
+ // The first batch has one transactional record
+ val txnRecords1 = new SimpleRecord(txnMessageKeyBytes1,
TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), true))
+ val records1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
Compression.NONE, TimestampType.CREATE_TIME, txnRecords1)
+ // The second batch is an empty batch.
+ val records2 = createEmptyBatch(1L, 1L)
+
+ val combinedBuffer = ByteBuffer.allocate(records1.buffer.limit +
records2.buffer.limit)
+ combinedBuffer.put(records1.buffer)
+ combinedBuffer.put(records2.buffer)
+ combinedBuffer.flip
+ val firstSegmentRecords = MemoryRecords.readableRecords(combinedBuffer)
+
+ // Create the second segment which contains one batch
+ val txnRecords3 = new SimpleRecord(txnMessageKeyBytes2,
TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit(), true))
+ val secondSegmentRecords =
MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 2L, Compression.NONE,
TimestampType.CREATE_TIME, txnRecords3)
+
+ // Prepare a txn log
+ reset(replicaManager)
+
+ val logMock = mock(classOf[UnifiedLog])
+ when(replicaManager.getLog(topicPartition)).thenReturn(Some(logMock))
+ when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(3L))
+
+ when(logMock.logStartOffset).thenReturn(0L)
+ when(logMock.read(ArgumentMatchers.eq(0L),
+ maxLength = anyInt(),
+ isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
+ minOneMessage = ArgumentMatchers.eq(true)))
+ .thenReturn(new FetchDataInfo(new LogOffsetMetadata(0L),
firstSegmentRecords))
+ when(logMock.read(ArgumentMatchers.eq(2L),
+ maxLength = anyInt(),
+ isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
+ minOneMessage = ArgumentMatchers.eq(true)))
+ .thenReturn(new FetchDataInfo(new LogOffsetMetadata(2L),
secondSegmentRecords))
+
+ // Load transactions should not stuck.
+ transactionManager.loadTransactionsForTxnTopicPartition(partitionId,
coordinatorEpoch = 1, (_, _, _, _) => ())
+ assertEquals(0, transactionManager.loadingPartitions.size)
+ assertEquals(1, transactionManager.transactionMetadataCache.size)
+
assertTrue(transactionManager.transactionMetadataCache.contains(partitionId))
+ // all transactions should have been loaded
+ val txnMetadataPool =
transactionManager.transactionMetadataCache(partitionId).metadataPerTransactionalId
+ assertEquals(2, txnMetadataPool.size)
+ assertTrue(txnMetadataPool.contains(transactionalId1))
+ assertTrue(txnMetadataPool.contains(transactionalId2))
+ }
+
private def verifyMetadataDoesExistAndIsUsable(transactionalId: String):
Unit = {
transactionManager.getTransactionState(transactionalId) match {
case Left(_) => fail("shouldn't have been any errors")