CalvinConfluent commented on code in PR #17402:
URL: https://github.com/apache/kafka/pull/17402#discussion_r1792333412
##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -3063,12 +3063,14 @@ private AddOffsetsToTxnResponse
createAddOffsetsToTxnResponse() {
}
private EndTxnRequest createEndTxnRequest(short version) {
+ boolean isTransactionV2Enabled = true;
Review Comment:
maybe something like
`boolean isTransactionV2Enabled = version >
LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2`
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2980,6 +2980,68 @@ public void
testEpochBumpAfterLastInflightBatchFails(boolean transactionV2Enable
assertEquals(0, transactionManager.sequenceNumber(tp0));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true})
Review Comment:
true,false?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1568,6 +1571,17 @@ public void handleResponse(AbstractResponse response) {
Errors error = endTxnResponse.error();
if (error == Errors.NONE) {
+ // For transaction version 5+, the broker includes the
producerId and producerEpoch in the EndTxnResponse.
+ // KIP-890 Part 2 mandates bumping the epoch after every
transaction. If the epoch overflows,
+ // a new producerId is returned with epoch set to 0.
+ if (isTransactionV2Enabled) {
Review Comment:
The isTransactionV2Enabled is locked within a transaction's life cycle. I
wonder if we should abort the transaction if isTransactionV2Enabled=true but
endTxnResponse version<5.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]