jolshan commented on code in PR #15657: URL: https://github.com/apache/kafka/pull/15657#discussion_r1556603619
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) { sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) + } else if (!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested) { + // If the client requests to use transaction V2 but server side does not supports it, return unsupported version. Review Comment: This was not in the design. If we send a request version the server can handle, we can handle it. Ie --> if ApiVersions advertises TV 1, then the server has the code to handle V1, and we send the new request version. There are two cases where `!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested` is true. 1) TV is downgraded only (no image changes) -- in this case, we can still handle the old request and we should do so. 2) TV is downgraded + image version is downgraded. In this case, when the server receives v5 request, it will return unsupported version automatically since it doesn't recognize this version. The reason we do this is there is no way to guarantee that downgrades happen immediately due to the way ApiVersions requests propagate. (The only way to ensure it happens is to restart a broker) Thus, we took this strategy: > The downgrade case is a bit different. When we downgrade TV, it is possible to not receive an update communicating this from any broker for a long time. We could even start rolling an incompatible image to the cluster. Once we do this roll however, the brokers will reconnect and update the TV with the newest epoch. As we are checking the TV on every request, we can abort the transaction and restart with the new epoch of TV and the old protocol. However, in the edge case where we somehow send a request to an older image broker, we know that the new protocol is gated by the Produce/TxnOffsetCommit and AddPartitionsToTxn versions. If we encounter a broker that is unable to handle the protocol, it is also unable to handle the request version. In this case, we will return UnsupportedVersionException which is fatal to the client. In most cases, we shouldn’t hit this scenario. We also chose this approach as to not cause flip-flopping during the upgrade case. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2715,6 +2721,10 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) { sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) + } else if (!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested) { + // If the client requests to use transaction V2 but server side does not supports it, return unsupported version. Review Comment: This was not in the design. If we send a request version the server can handle, we can handle it. Ie --> if ApiVersions advertises TV 1, then the server has the code to handle V1, and we send the new request version. There are two cases where `!metadataCache.metadataVersion().isTransactionV2Enabled && txnOffsetCommitRequest.isTransactionV2Requested` is true. 1) TV is downgraded only (no image changes) -- in this case, we can still handle the old request and we should do so. 2) TV is downgraded + image version is downgraded. In this case, when the server receives v5 request, it will return unsupported version automatically since it doesn't recognize this version. The reason we do this is there is no way to guarantee that downgrades happen immediately due to the way ApiVersions requests propagate. (The only way to ensure it happens immediately is to restart a broker) Thus, we took this strategy: > The downgrade case is a bit different. When we downgrade TV, it is possible to not receive an update communicating this from any broker for a long time. We could even start rolling an incompatible image to the cluster. Once we do this roll however, the brokers will reconnect and update the TV with the newest epoch. As we are checking the TV on every request, we can abort the transaction and restart with the new epoch of TV and the old protocol. However, in the edge case where we somehow send a request to an older image broker, we know that the new protocol is gated by the Produce/TxnOffsetCommit and AddPartitionsToTxn versions. If we encounter a broker that is unable to handle the protocol, it is also unable to handle the request version. In this case, we will return UnsupportedVersionException which is fatal to the client. In most cases, we shouldn’t hit this scenario. We also chose this approach as to not cause flip-flopping during the upgrade case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org