hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r510552124
########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -94,19 +104,63 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size - //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. - //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference - //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. + // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. + // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference + // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. if (!header.apiKey.requiresDelayedAllocation) { releaseBuffer() } - def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}" + def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = { + envelopeContext match { + case Some(envelopeContext) => + val envelopeResponse = new EnvelopeResponse( + abstractResponse.throttleTimeMs(), Review comment: Quotas are one aspect of this work that need more consideration. What we don't want is for the inter-broker channel to get affected by the individual client throttle, which is what will happen with the current patch. What I'd suggest for now is that we allow the broker to track client quotas and pass back the throttle value in the underlying response, but we set the envelope throttle time to 0 and ensure that the channel does not get throttled. For this, I think we we will need to change the logic in `KafkaApis.sendResponseMaybeThrottle`. If it is a forwarded request, we still need to check `maybeRecordAndGetThrottleTimeMs`, but we can skip the call to `ClientQuotaManager.throttle`. When the response is received on the forwarding broker, we will need to apply the throttle, which I think the patch already handles. One challenging aspect is how this will affect quota metrics. Currently quota/throttling metrics are relatively simple because they are recorded separately by each broker. However, here the controller is the one that is tracking the throttling for the client across multiple inbound connections from multiple brokers. This means that the broker that is applying a throttle for a forwarded request may not have actually observed a quota violation. Other than causing some reporting confusion, I am not sure whether there are any other consequences to this. cc @apovzner @rajinisivaram ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } + private def checkForwarding(request: RequestChannel.Request): Unit = { + if (!request.header.apiKey.forwardable && request.envelopeContext.isDefined) { + throw new IllegalStateException("Given RPC " + request.header.apiKey + " does not support forwarding.") + } + } + + private def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit): Unit = { + if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) { + sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception()) + } else if (request.envelopeContext.isDefined && + (!request.context.fromPrivilegedListener || + !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) + ) { + // If the designated forwarding request is not coming from a privileged listener, or + // it fails CLUSTER_ACTION permission, we would fail the authorization. + sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception()) Review comment: One challenge we have here is that there are two levels of errors. The current patch seems to conflate the two, which makes it confusing. I think we need a structure which allows us to separate the errors possible at the envelope level and those possible at the request level. What I'm thinking is this: 1. For cluster auth and principal serde errors, we should return the envelope error and null response body. 2. For everything else, we return envelope error NONE and just pass through whatever error is in the response. Does that make sense? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } + private def checkForwarding(request: RequestChannel.Request): Unit = { + if (!request.header.apiKey.forwardable && request.envelopeContext.isDefined) { Review comment: Can we move some of the checks from `maybeForward` here? This is the flow I'm thinking about: 1. First check authorization => CLUSTER_AUTHORIZATION_FAILURE 2. Verify forwarding is enabled => INVALID_REQUEST 3. Verify the api is forwardable => INVALID_REQUEST If all of these pass, then the request continues down the normal handling path. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org