rreddy-22 commented on code in PR #17402:
URL: https://github.com/apache/kafka/pull/17402#discussion_r1806768636
##########
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##########
@@ -814,11 +913,31 @@ class TransactionsTest extends IntegrationTestHarness {
producer3.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0,
"4", "4", willBeCommitted = true))
producer3.commitTransaction()
- // Check that the epoch only increased by 1
producerStateEntry =
brokers(partitionLeader).logManager.getLog(new TopicPartition(topic1,
0)).get.producerStateManager.activeProducers.get(producerId)
assertNotNull(producerStateEntry)
- assertEquals((initialProducerEpoch + 1).toShort,
producerStateEntry.producerEpoch)
+
+ // Check that the epoch only increased by 1 when TV2 is disabled.
+ // With TV2 and the latest EndTxnRequest version, the epoch will be bumped
at the end of every transaction.
+ if (!isTV2Enabled) assertEquals((initialProducerEpoch + 1).toShort,
producerStateEntry.producerEpoch)
+ else {
+ // Producer State entry contains the last epoch with which records were
sent.
+ assertEquals((initialProducerEpoch + 2).toShort,
producerStateEntry.producerEpoch)
+
+ // Access the client's producer epoch via reflection to verify epoch
bump on the last End Txn Request.
+ val transactionManagerField: Field = classOf[KafkaProducer[_,
_]].getDeclaredField("transactionManager")
Review Comment:
I wanted to make sure that the epoch was exactly 3 on the client after the 3
endTxnReq since it doesn't update on the producerStateEntry until the next
commit right? But if it's not necessary I can just check the producer state
entry!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]