hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r505765664
########## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ########## @@ -338,7 +339,9 @@ INCONSISTENT_VOTER_SET(94, "Indicates that the either the sender or recipient of a " + "voter-only request is not one of the expected voters", InconsistentVoterSetException::new), INVALID_UPDATE_VERSION(95, "The given update version was invalid.", InvalidUpdateVersionException::new), - FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new); + FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new), + BROKER_AUTHORIZATION_FAILURE(97, "Authorization failed for the request during forwarding. " + Review comment: What is the benefit of using a different error code instead of `CLUSTER_AUTHORIZATION_FAILURE`? ########## File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ########## @@ -251,7 +253,8 @@ public Struct parseResponse(short version, ByteBuffer buffer) { DescribeQuorumRequestData.SCHEMAS, DescribeQuorumResponseData.SCHEMAS), ALTER_ISR(56, "AlterIsr", AlterIsrRequestData.SCHEMAS, AlterIsrResponseData.SCHEMAS), UPDATE_FEATURES(57, "UpdateFeatures", - UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS); + UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS), + ENVELOPE(58, "Envelope", EnvelopeRequestData.SCHEMAS, EnvelopeResponseData.SCHEMAS); Review comment: I believe we need to set `requiresDelayedAllocation` for this API. Typically we will release the underlying buffer allocated for a request when `RequestChannel.Request` is constructed. However, since we are using "zeroCopy," we need to hold onto the `ByteBuffer` reference until the API has been handled. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -121,6 +121,33 @@ class KafkaApis(val requestChannel: RequestChannel, val adminZkClient = new AdminZkClient(zkClient) private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId) + /** + * The template to create a forward request handler. + */ + private[server] abstract class ForwardRequestHandler(request: RequestChannel.Request, isForwardRequest: Boolean, Review comment: It seems like we're trying to reuse this handler from the previous patch, but I'm not sure it still makes as much sense. A simpler structure might be something like the following: ```scala private def maybeForward( request: RequestChannel.Request, handler: RequestChannel.Request => Unit ): Unit = { if (!controller.isActive && config.redirectionEnabled && request.context.principalSerde.isPresent) { redirectionManager.forwardRequest(sendResponseMaybeThrottle, request) } else { // When IBP is smaller than 2.8 or the principal serde is undefined, forwarding is not supported, // therefore requests are handled directly. handler(request) } } // then invoked like this override def handle(request: RequestChannel.Request): Unit = { try { trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}") request.header.apiKey match { ... case ApiKeys.ALTER_CONFIGS => maybeForward(request, handleAlterConfigsRequest) ... // unchanged def handleAlterConfigs(request): Unit ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org