jolshan commented on code in PR #15657:
URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
       
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
+    } else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+      // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This was not in the design. If we send a request version the server can 
handle, we can handle it.
   
   Ie --> if ApiVersions advertises TV 1, then the server has the code to 
handle V1, and we send the new request version. There are two cases where 
`!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested` is true. 
   
   1) TV is downgraded only (no image changes) -- in this case, we can still 
handle the old request and we should do so. 
   2) TV is downgraded + image version is downgraded. In this case, when the 
server receives v5 request, it will return unsupported version automatically 
since it doesn't recognize this version.
   
   The reason we do this is there is no way to guarantee that downgrades happen 
immediately due to the way ApiVersions requests propagate. (The only way to 
ensure it happens is to restart a broker) Thus, we took this strategy:
   
   > The downgrade case is a bit different. When we downgrade TV, it is 
possible to not receive an update communicating this from any broker for a long 
time. We could even start rolling an incompatible image to the cluster. Once we 
do this roll however, the brokers will reconnect and update the TV with the 
newest epoch. As we are checking the TV on every request, we can abort the 
transaction and restart with the new epoch of TV and the old protocol. However, 
in the edge case where we somehow send a request to an older image broker, we 
know that the new protocol is gated by the Produce/TxnOffsetCommit and 
AddPartitionsToTxn versions. If we encounter a broker that is unable to handle 
the protocol, it is also unable to handle the request version. In this case, we 
will return UnsupportedVersionException which is fatal to the client. In most 
cases, we shouldn’t hit this scenario.
   
   We also chose this approach as to not cause flip-flopping during the upgrade 
case. 
   



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else if (!authHelper.authorize(request.context, READ, GROUP, 
txnOffsetCommitRequest.data.groupId)) {
       
sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
+    } else if (!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested) {
+      // If the client requests to use transaction V2 but server side does not 
supports it, return unsupported version.

Review Comment:
   This was not in the design. If we send a request version the server can 
handle, we can handle it.
   
   Ie --> if ApiVersions advertises TV 1, then the server has the code to 
handle V1, and we send the new request version. There are two cases where 
`!metadataCache.metadataVersion().isTransactionV2Enabled && 
txnOffsetCommitRequest.isTransactionV2Requested` is true. 
   
   1) TV is downgraded only (no image changes) -- in this case, we can still 
handle the old request and we should do so. 
   2) TV is downgraded + image version is downgraded. In this case, when the 
server receives v5 request, it will return unsupported version automatically 
since it doesn't recognize this version.
   
   The reason we do this is there is no way to guarantee that downgrades happen 
immediately due to the way ApiVersions requests propagate. (The only way to 
ensure it happens immediately is to restart a broker) Thus, we took this 
strategy:
   
   > The downgrade case is a bit different. When we downgrade TV, it is 
possible to not receive an update communicating this from any broker for a long 
time. We could even start rolling an incompatible image to the cluster. Once we 
do this roll however, the brokers will reconnect and update the TV with the 
newest epoch. As we are checking the TV on every request, we can abort the 
transaction and restart with the new epoch of TV and the old protocol. However, 
in the edge case where we somehow send a request to an older image broker, we 
know that the new protocol is gated by the Produce/TxnOffsetCommit and 
AddPartitionsToTxn versions. If we encounter a broker that is unable to handle 
the protocol, it is also unable to handle the request version. In this case, we 
will return UnsupportedVersionException which is fatal to the client. In most 
cases, we shouldn’t hit this scenario.
   
   We also chose this approach as to not cause flip-flopping during the upgrade 
case. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to