hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510552124



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- 
${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),

Review comment:
       Quotas are one aspect of this work that need more consideration. What we 
don't want is for the inter-broker channel to get affected by the individual 
client throttle, which is what will happen with the current patch. What I'd 
suggest for now is that we allow the broker to track client quotas and pass 
back the throttle value in the underlying response, but we set the envelope 
throttle time to 0 and ensure that the channel does not get throttled. 
   
   For this, I think we we will need to change the logic in 
`KafkaApis.sendResponseMaybeThrottle`. If it is a forwarded request, we still 
need to check `maybeRecordAndGetThrottleTimeMs`, but we can skip the call to 
`ClientQuotaManager.throttle`. When the response is received on the forwarding 
broker, we will need to apply the throttle, which I think the patch already 
handles.
   
   One challenging aspect is how this will affect quota metrics. Currently 
quota/throttling metrics are relatively simple because they are recorded 
separately by each broker. However, here the controller is the one that is 
tracking the throttling for the client across multiple inbound connections from 
multiple brokers. This means that the broker that is applying a throttle for a 
forwarded request may not have actually observed a quota violation. Other than 
causing some reporting confusion, I am not sure whether there are any other 
consequences to this.
   
   cc @apovzner @rajinisivaram 

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def checkForwarding(request: RequestChannel.Request): Unit = {
+    if (!request.header.apiKey.forwardable && 
request.envelopeContext.isDefined) {
+      throw new IllegalStateException("Given RPC " + request.header.apiKey + " 
does not support forwarding.")
+    }
+  }
+
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+      sendErrorResponseMaybeThrottle(request, 
Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined &&
+      (!request.context.fromPrivilegedListener ||
+      !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, 
CLUSTER, CLUSTER_NAME))
+    ) {
+      // If the designated forwarding request is not coming from a privileged 
listener, or
+      // it fails CLUSTER_ACTION permission, we would fail the authorization.
+      sendErrorResponseMaybeThrottle(request, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception())

Review comment:
       One challenge we have here is that there are two levels of errors. The 
current patch seems to conflate the two, which makes it confusing. I think we 
need a structure which allows us to separate the errors possible at the 
envelope level and those possible at the request level. What I'm thinking is 
this:
   
   1. For cluster auth and principal serde errors, we should return the 
envelope error and null response body.
   2. For everything else, we return envelope error NONE and just pass through 
whatever error is in the response.
   
   Does that make sense?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def checkForwarding(request: RequestChannel.Request): Unit = {
+    if (!request.header.apiKey.forwardable && 
request.envelopeContext.isDefined) {

Review comment:
       Can we move some of the checks from `maybeForward` here? This is the 
flow I'm thinking about:
   
   1. First check authorization => CLUSTER_AUTHORIZATION_FAILURE
   2. Verify forwarding is enabled => INVALID_REQUEST
   3. Verify the api is forwardable => INVALID_REQUEST 
   
   If all of these pass, then the request continues down the normal handling 
path.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to