hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r505765664



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -338,7 +339,9 @@
     INCONSISTENT_VOTER_SET(94, "Indicates that the either the sender or 
recipient of a " +
             "voter-only request is not one of the expected voters", 
InconsistentVoterSetException::new),
     INVALID_UPDATE_VERSION(95, "The given update version was invalid.", 
InvalidUpdateVersionException::new),
-    FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an 
unexpected server error.", FeatureUpdateFailedException::new);
+    FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an 
unexpected server error.", FeatureUpdateFailedException::new),
+    BROKER_AUTHORIZATION_FAILURE(97, "Authorization failed for the request 
during forwarding. " +

Review comment:
       What is the benefit of using a different error code instead of 
`CLUSTER_AUTHORIZATION_FAILURE`?

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -251,7 +253,8 @@ public Struct parseResponse(short version, ByteBuffer 
buffer) {
         DescribeQuorumRequestData.SCHEMAS, DescribeQuorumResponseData.SCHEMAS),
     ALTER_ISR(56, "AlterIsr", AlterIsrRequestData.SCHEMAS, 
AlterIsrResponseData.SCHEMAS),
     UPDATE_FEATURES(57, "UpdateFeatures",
-        UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS);
+        UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS),
+    ENVELOPE(58, "Envelope", EnvelopeRequestData.SCHEMAS, 
EnvelopeResponseData.SCHEMAS);

Review comment:
       I believe we need to set `requiresDelayedAllocation` for this API. 
Typically we will release the underlying buffer allocated for a request when 
`RequestChannel.Request` is constructed. However, since we are using 
"zeroCopy," we need to hold onto the `ByteBuffer` reference until the API has 
been handled.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -121,6 +121,33 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = 
"AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   */
+  private[server] abstract class ForwardRequestHandler(request: 
RequestChannel.Request, isForwardRequest: Boolean,

Review comment:
       It seems like we're trying to reuse this handler from the previous 
patch, but I'm not sure it still makes as much sense. A simpler structure might 
be something like the following:
   
   ```scala
     private def maybeForward(
       request: RequestChannel.Request,
       handler: RequestChannel.Request => Unit
     ): Unit = {
       if (!controller.isActive && config.redirectionEnabled && 
request.context.principalSerde.isPresent) {
         redirectionManager.forwardRequest(sendResponseMaybeThrottle, request)
       } else {
         // When IBP is smaller than 2.8 or the principal serde is undefined, 
forwarding is not supported,
         // therefore requests are handled directly.
         handler(request)
       }
     }
   
     // then invoked like this
   override def handle(request: RequestChannel.Request): Unit = {
       try {
         trace(s"Handling request:${request.requestDesc(true)} from connection 
${request.context.connectionId};" +
           
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
   
         request.header.apiKey match {
         ...
           case ApiKeys.ALTER_CONFIGS => maybeForward(request, 
handleAlterConfigsRequest)
   ...
   
   
     // unchanged
     def handleAlterConfigs(request): Unit 
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to