artemlivshits commented on code in PR #17402:
URL: https://github.com/apache/kafka/pull/17402#discussion_r1796293859
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1568,6 +1571,17 @@ public void handleResponse(AbstractResponse response) {
Errors error = endTxnResponse.error();
if (error == Errors.NONE) {
+ // For transaction version 5+, the broker includes the
producerId and producerEpoch in the EndTxnResponse.
+ // KIP-890 Part 2 mandates bumping the epoch after every
transaction. If the epoch overflows,
+ // a new producerId is returned with epoch set to 0.
+ if (isTransactionV2Enabled) {
Review Comment:
> Do we not force sending version 5? I think we may want to do that and get
UnsupportedVersionException
The desired behavior is for this^^ to happen. If TV is downgraded in the
middle of transaction, we don't change `isTransactionV2Enabled` until the end
and continue using new protocol and new RPC versions. If the broker image
hasn't downgraded yet, then it'll just work. If the broker has managed to
downgrade, then we'd get UnsuppotedVersionException, so we should never get an
older version response when `isTransactionV2Enabled == true`. There are other
ways to implement downgrade, but this one is simple to prove correctness (for
example, we don't need to consider all the combinations of cases when protocol
gets switched in the middle and we could send mixed versions to different
brokers and prove that all combinations work correctly).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]