This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dbae448a052 KAFKA-18137: Unloading transaction state incorrectly
removes loading partitions (#18011)
dbae448a052 is described below
commit dbae448a052bd3d5f3c1293294a33497de213bff
Author: Justine Olshan <[email protected]>
AuthorDate: Tue Dec 3 14:51:07 2024 -0800
KAFKA-18137: Unloading transaction state incorrectly removes loading
partitions (#18011)
When there is a become follower transition on a transaction coordinator
state partition, we intend to unload the state partition. However, we pass the
new epoch to the method that does the unloading. In that method, we create a
`TransactionPartitionAndLeaderEpoch` object comprising of the topic partition
and the epoch that we use as a key to remove the partition from loading.
However, we wouldn't ever expect to see this epoch in that map since we only
load on the leader. See the code [...]
We could have a partition load after the unloading occurs, and that
partition will be stuck storing stale state on the broker until it restarts.
While this may not immediately cause a correctness issue, we should try to
properly clean up state.
Check that the epoch is less than the new epoch when removing the partition
from loadingPartitions.
Added a test that failed before this change was made.
Reviewers: Artem Livshits <[email protected]>, Jeff Kim
<[email protected]>
---
.../transaction/TransactionStateManager.scala | 15 +++++-
.../transaction/TransactionStateManagerTest.scala | 59 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 747d3f63eb9..a6e7dd30bf0 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -596,10 +596,9 @@ class TransactionStateManager(brokerId: Int,
*/
def removeTransactionsForTxnTopicPartition(partitionId: Int,
coordinatorEpoch: Int): Unit = {
val topicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
- val partitionAndLeaderEpoch =
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
inWriteLock(stateLock) {
- loadingPartitions.remove(partitionAndLeaderEpoch)
+ removeLoadingPartitionWithEpoch(partitionId, coordinatorEpoch)
transactionMetadataCache.remove(partitionId) match {
case Some(txnMetadataCacheEntry) =>
info(s"Unloaded transaction metadata $txnMetadataCacheEntry for
$topicPartition on become-follower transition")
@@ -610,6 +609,18 @@ class TransactionStateManager(brokerId: Int,
}
}
+ /**
+ * Remove the loading partition if the epoch is less than the specified
epoch. Note: This method must be called under the write state lock.
+ */
+ private def removeLoadingPartitionWithEpoch(partitionId: Int,
coordinatorEpoch: Int): Unit = {
+ loadingPartitions.find(_.txnPartitionId == partitionId).foreach {
partitionAndLeaderEpoch =>
+ if (partitionAndLeaderEpoch.coordinatorEpoch < coordinatorEpoch) {
+ loadingPartitions.remove(partitionAndLeaderEpoch)
+ info(s"Cancelling load of currently loading partition
$partitionAndLeaderEpoch")
+ }
+ }
+ }
+
private def validateTransactionTopicPartitionCountIsStable(): Unit = {
val previouslyDeterminedPartitionCount = transactionTopicPartitionCount
val curTransactionTopicPartitionCount =
retrieveTransactionTopicPartitionCount()
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 36dcaaa7e60..78da50f782b 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -218,6 +218,65 @@ class TransactionStateManagerTest {
assertEquals(Left(Errors.NOT_COORDINATOR),
transactionManager.getTransactionState(txnMetadata1.transactionalId))
}
+ @Test
+ def testMakeFollowerLoadingPartition(): Unit = {
+ // Verify the handling of a call to make a partition a follower while it
is in the
+ // process of being loaded. The partition should not be loaded.
+
+ val startOffset = 0L
+ val endOffset = 1L
+
+ val fileRecordsMock = mock[FileRecords](classOf[FileRecords])
+ val logMock = mock[UnifiedLog](classOf[UnifiedLog])
+ when(replicaManager.getLog(topicPartition)).thenReturn(Some(logMock))
+ when(logMock.logStartOffset).thenReturn(startOffset)
+ when(logMock.read(ArgumentMatchers.eq(startOffset),
+ maxLength = anyInt(),
+ isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
+ minOneMessage = ArgumentMatchers.eq(true))
+ ).thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset),
fileRecordsMock))
+
when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset))
+
+ txnMetadata1.state = PrepareCommit
+ txnMetadata1.addPartitions(Set[TopicPartition](
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1)))
+ val records = MemoryRecords.withRecords(startOffset, Compression.NONE,
+ new SimpleRecord(txnMessageKeyBytes1,
TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit(), TV_2)))
+
+ // We create a latch which is awaited while the log is loading. This
ensures that the follower transition
+ // is triggered before the loading returns
+ val latch = new CountDownLatch(1)
+
+ when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)
+ val bufferCapture: ArgumentCaptor[ByteBuffer] =
ArgumentCaptor.forClass(classOf[ByteBuffer])
+ when(fileRecordsMock.readInto(bufferCapture.capture(),
anyInt())).thenAnswer(_ => {
+ latch.await()
+ val buffer = bufferCapture.getValue
+ buffer.put(records.buffer.duplicate)
+ buffer.flip()
+ })
+
+ val coordinatorEpoch = 0
+ val partitionAndLeaderEpoch =
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
+
+ val loadingThread = new Thread(() => {
+ transactionManager.loadTransactionsForTxnTopicPartition(partitionId,
coordinatorEpoch, (_, _, _, _) => ())
+ })
+ loadingThread.start()
+ TestUtils.waitUntilTrue(() =>
transactionManager.loadingPartitions.contains(partitionAndLeaderEpoch),
+ "Timed out waiting for loading partition", pause = 10)
+
+ transactionManager.removeTransactionsForTxnTopicPartition(partitionId,
coordinatorEpoch + 1)
+
assertFalse(transactionManager.loadingPartitions.contains(partitionAndLeaderEpoch))
+
+ latch.countDown()
+ loadingThread.join()
+
+ // Verify that transaction state was not loaded
+ assertEquals(Left(Errors.NOT_COORDINATOR),
transactionManager.getTransactionState(txnMetadata1.transactionalId))
+ }
+
@Test
def testLoadAndRemoveTransactionsForPartition(): Unit = {
// generate transaction log messages for two pids traces: