abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r510568706
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } + private def checkForwarding(request: RequestChannel.Request): Unit = { + if (!request.header.apiKey.forwardable && request.envelopeContext.isDefined) { + throw new IllegalStateException("Given RPC " + request.header.apiKey + " does not support forwarding.") + } + } + + private def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit): Unit = { + if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) { + sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception()) + } else if (request.envelopeContext.isDefined && + (!request.context.fromPrivilegedListener || + !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) + ) { + // If the designated forwarding request is not coming from a privileged listener, or + // it fails CLUSTER_ACTION permission, we would fail the authorization. + sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception()) Review comment: The question would be how the forwarding broker should do the error handling for auth & principal serde exceptions. To me we should get a vanilla error response with `UNKNOWN_SERVER_ERROR` and get back to the original client? Besides that, I think we could add a differentiation here to avoid passing the serde-type errors to the client. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org