abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493795436



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = 
"AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends 
Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, 
ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = 
resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, 
Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not 
caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new 
ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),

Review comment:
       It's a bit hard since we are passing requestBuilder all the way to 
NetworkClient, so if we want a designated version to build the request, that 
may involve some non-trivial changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to