hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493160927



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+    public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Map<ConfigResource, 
Collection<AlterConfigOp>> configs,
+                                                                               
     final boolean validateOnly) {
+        return generateIncrementalRequestData(configs.keySet(), configs, 
validateOnly);
+    }
+
+    public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Collection<ConfigResource> resources,

Review comment:
       nit: might be useful to document the expectation that `resources` is a 
subset of the key set of `configs`. The signature surprised me a little bit.
   
   As an aside, this kind of convenience conversion seems more appropriate for 
`IncrementalAlterConfigsRequest.Builder` rather than a static class.

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -103,6 +103,9 @@ object ApiVersion {
     KAFKA_2_7_IV0,
     // Bup Fetch protocol for Raft protocol (KIP-595)
     KAFKA_2_7_IV1,
+    // Enable redirection (KIP-590)
+    // TODO: remove this IBP in the 2.7 release if redirection work could not 
be done before the freeze

Review comment:
       Get rid of this TODO. We do not need to remove IBP internal versions.

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -147,7 +147,9 @@ abstract class InterBrokerSendThread(name: String,
 
 case class RequestAndCompletionHandler(destination: Node,
                                        request: AbstractRequest.Builder[_ <: 
AbstractRequest],
-                                       handler: RequestCompletionHandler)
+                                       handler: RequestCompletionHandler,
+                                       initialPrincipalName: String = null,

Review comment:
       nit: why don't we add a case class and make this optional. for example:
   
   ```scala
   case class InitialPrincipal(name: String, clientId: String)
   ```
   In addition to reducing parameters, that makes the expectation that both are 
provided explicit.
   

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = 
"AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends 
Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, 
ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = 
resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, 
Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not 
caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new 
ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),

Review comment:
       In general, the forwarded request may have a different version than the 
client request. I'm wondering if we should keep the version the same in case 
there are semantic differences. As an example, a newer version of the API may 
introduce unexpected error codes. Unless we have logic to convert those error 
codes, then we might break compatibility unexpectedly.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -459,7 +459,10 @@ class AclAuthorizer extends Authorizer with Logging {
       val apiKey = if (ApiKeys.hasId(requestContext.requestType)) 
ApiKeys.forId(requestContext.requestType).name else requestContext.requestType
       val refCount = action.resourceReferenceCount
 
-      s"Principal = $principal is $authResult Operation = $operation from host 
= $host on resource = $resource for request = $apiKey with resourceRefCount = 
$refCount"
+      val initialPrincipalName = requestContext.initialPrincipalName
+      val initialPrincipalMessage = if(initialPrincipalName != null) s", on 
behalf of initial principal =$initialPrincipalName," else ""

Review comment:
       nit: space after `if`

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -547,7 +553,8 @@ class AdminManager(val config: KafkaConfig,
       None
     else {
       val id = resourceNameToBrokerId(resource.name)
-      if (id != this.config.brokerId)
+      // Under redirection, it is possible to handle config changes targeting 
at brokers other than the controller.

Review comment:
       The comment doesn't seem to make sense here. Seems like the logic 
doesn't have anything to do with the controller?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = 
"AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends 
Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, 
ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = 
resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, 
Errors.NOT_CONTROLLER.exception())
+          } else {

Review comment:
       nit: this is misaligned

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -117,10 +119,26 @@ class BrokerToControllerChannelManager(metadataCache: 
kafka.server.MetadataCache
                                   callback: RequestCompletionHandler): Unit = {
     requestQueue.put(BrokerToControllerQueueItem(request, callback))
   }
+
+  private[server] def forwardRequest(requestBuilder: AbstractRequest.Builder[_ 
<: AbstractRequest],
+                                     responseToOriginalClient: 
(RequestChannel.Request, Int => AbstractResponse,

Review comment:
       This function has 3 callbacks... It would be nice if we could figure out 
how to pass through the `ForwardRequestHandler` directly.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3064,12 +3272,33 @@ class KafkaApis(val requestChannel: RequestChannel,
                                 logIfDenied: Boolean = true,
                                 refCount: Int = 1): Boolean = {
     authorizer.forall { authZ =>
-      val resource = new ResourcePattern(resourceType, resourceName, 
PatternType.LITERAL)
-      val actions = Collections.singletonList(new Action(operation, resource, 
refCount, logIfAllowed, logIfDenied))
-      authZ.authorize(requestContext, actions).get(0) == 
AuthorizationResult.ALLOWED
+      if (authorizeAction(requestContext, operation,
+        resourceType, resourceName, logIfAllowed, logIfDenied, refCount, 
authZ)) {
+        true
+      } else {
+        operation match {
+          case ALTER | ALTER_CONFIGS | CREATE | DELETE =>

Review comment:
       It would be helpful to have a comment explaining this. It does not seem 
obvious.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -273,31 +275,632 @@ class KafkaApisTest {
       .setIncludeSynonyms(true)
       .setResources(List(new 
DescribeConfigsRequestData.DescribeConfigsResource()
         .setResourceName("topic-1")
-        
.setResourceType(ConfigResource.Type.TOPIC.id)).asJava)).build(requestHeader.apiVersion))
+        .setResourceType(ConfigResource.Type.TOPIC.id)).asJava))
+      .build(requestHeader.apiVersion),
+      requestHeader = Option(requestHeader))
     createKafkaApis(authorizer = 
Some(authorizer)).handleDescribeConfigsRequest(request)
 
     verify(authorizer, adminManager)
   }
 
+  @Test
+  def testAlterClientQuotasWithAuthorizer(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new 
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, 
Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, 
ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val request = buildRequest(new 
AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    expectNoThrottling()
+
+    EasyMock.expect(adminManager.alterClientQuotas(anyObject(), 
EasyMock.eq(false)))
+      .andReturn(Map(quotaEntity -> ApiError.NONE))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasWithNonControllerAndRedirectionDisabled(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new 
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, 
Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, 
ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val alterClientQuotasRequest = new 
AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest)
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.expect(adminManager.alterClientQuotas(anyObject(), 
EasyMock.eq(false)))
+      .andReturn(Map(quotaEntity -> ApiError.NONE))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer,
+      adminManager, controller)
+
+    // Should just handle the config change since IBP is low
+    createKafkaApis(interBrokerProtocolVersion = KAFKA_2_6_IV0,
+      authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map(quotaEntity -> Errors.NONE))
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasWithRedirection(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new 
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, 
Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, 
ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val request = buildRequest(new 
AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    expectNoThrottling()
+
+    val redirectRequestBuilder = new AlterClientQuotasRequest.Builder(
+      Set(new ClientQuotaAlteration(quotaEntity, 
Collections.emptySet())).asJava, false)
+
+    val capturedCallback = EasyMock.newCapture[ClientResponse => 
AbstractResponse]()
+
+    EasyMock.expect(redirectionManager.forwardRequest(
+      EasyMock.eq(redirectRequestBuilder),
+      anyObject[(RequestChannel.Request, Int => AbstractResponse,
+        Option[Send => Unit]) => Unit](),
+      EasyMock.eq(request),
+      EasyMock.capture(capturedCallback),
+      anyObject()
+    )).once()
+
+    val clientResponse: ClientResponse = 
EasyMock.createNiceMock(classOf[ClientResponse])
+    val alterClientQuotasResponse = new AlterClientQuotasResponse(
+      Map(quotaEntity -> ApiError.NONE).asJava, 10
+    )
+    
EasyMock.expect(clientResponse.responseBody).andReturn(alterClientQuotasResponse)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel,
+      authorizer, controller, redirectionManager, clientResponse)
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    assertEquals(alterClientQuotasResponse, 
capturedCallback.getValue.apply(clientResponse))
+
+    EasyMock.verify(controller, redirectionManager)
+  }
+
+  @Test
+  def testAlterClientQuotasAsForwardingRequestWithNonController(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new 
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, 
Seq.empty.asJavaCollection))
+
+    // Include extra header fields for forwarding request check
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, 
ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0, "initial-principal", "initial-client")
+
+    val alterClientQuotasRequest = new 
AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest,
+      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map(quotaEntity -> Errors.NOT_CONTROLLER))
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasAsForwardingRequest(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
+    // As a forwarding request, we would use CLUSTER_ACTION to do a separate 
round of auth.
+    authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, 
ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
+
+    val quotaEntity = new 
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, 
Seq.empty.asJavaCollection))
+
+    // Include extra header fields for forwarding request check
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, 
ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0, "initial-principal", "initial-client")
+
+    val alterClientQuotasRequest = new 
AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest,
+      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map( quotaEntity -> 
Errors.BROKER_AUTHORIZATION_FAILURE))
+
+    verify(authorizer, adminManager)
+  }
+
+  private def verifyAlterClientQuotaResult(alterClientQuotasRequest: 
AlterClientQuotasRequest,
+                                           capturedResponse: 
Capture[RequestChannel.Response],
+                                           expected: Map[ClientQuotaEntity, 
Errors]): Unit = {
+    val response = readResponse(ApiKeys.ALTER_CLIENT_QUOTAS, 
alterClientQuotasRequest, capturedResponse)
+      .asInstanceOf[AlterClientQuotasResponse]
+    val futures = expected.keys.map(quotaEntity => quotaEntity -> new 
KafkaFutureImpl[Void]()).toMap
+    response.complete(futures.asJava)
+    futures.foreach {
+      case (entity, future) =>
+        future.whenComplete((_, thrown) =>
+          assertEquals(thrown, expected(entity).exception())
+        ).isDone
+    }
+  }
+
+  @Test
+  def testCreateTopicsWithAuthorizer(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    val operation = AclOperation.CREATE
+    val topicName = "topic-1"
+    val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 
ApiKeys.CREATE_TOPICS.latestVersion,
+      clientId, 0)
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    authorizeResource(authorizer, operation, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED, logIfDenied = false)
+
+    authorizeResource(authorizer, AclOperation.DESCRIBE_CONFIGS, 
ResourceType.TOPIC,
+      topicName, AuthorizationResult.ALLOWED, logIfDenied = false)
+
+    expectNoThrottling()
+
+    val topicsAuthorized = new 
CreateTopicsRequestData.CreatableTopicCollection(1)
+    val topicToCreate = new CreateTopicsRequestData.CreatableTopic()
+      .setName(topicName)
+    topicsAuthorized.add(topicToCreate)
+
+    val timeout = 10
+    val request = buildRequest(new CreateTopicsRequest.Builder(new 
CreateTopicsRequestData()
+      .setTimeoutMs(timeout)
+      .setValidateOnly(false)
+      .setTopics(topicsAuthorized))
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.eq(request), 
EasyMock.eq(6))).andReturn(UnboundedControllerMutationQuota)
+
+    EasyMock.expect(adminManager.createTopics(
+      EasyMock.eq(timeout),
+      EasyMock.eq(false),
+      EasyMock.eq(Map(topicName -> topicToCreate)),
+      anyObject(),
+      EasyMock.eq(UnboundedControllerMutationQuota),
+      anyObject()))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, 
clientControllerQuotaManager,
+      requestChannel, authorizer, adminManager, controller)
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleCreateTopicsRequest(request)
+
+    verify(authorizer, adminManager, clientControllerQuotaManager)
+  }
+
+  @Test
+  def testCreateTopicsWithNonControllerAndRedirectionDisabled(): Unit = {

Review comment:
       Good to see the unit tests in here. I think we also need at least a 
couple integration tests. For example, could we add something to 
`CreateTopicsRequestTest` to ensure that forwarding works as expected?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1733,68 +1817,109 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
     }
 
-    val createTopicsRequest = request.body[CreateTopicsRequest]
-    val results = new 
CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
-      }
-      val hasClusterAuthorization = authorize(request.context, CREATE, 
CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else filterByAuthorized(request.context, CREATE, TOPIC, 
topics)(identity)
-      val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
+    val forwardRequestHandler = new ForwardRequestHandler[CreateTopicsRequest,
+      CreateTopicsResponse, String, CreatableTopic](request) {
 
-      results.forEach { topic =>
-        if (results.findAll(topic.name).size > 1) {
-          topic.setErrorCode(Errors.INVALID_REQUEST.code)
-          topic.setErrorMessage("Found multiple entries for this topic.")
-        } else if (!authorizedTopics.contains(topic.name)) {
-          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-          topic.setErrorMessage("Authorization failed.")
-        }
-        if (!authorizedForDescribeConfigs.contains(topic.name)) {
-          topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+      override def resourceSplitByAuthorization(createTopicsRequest: 
CreateTopicsRequest):
+      (Map[String, CreatableTopic], Map[String, ApiError]) = {

Review comment:
       nit: this is subjective, but this style is a bit ugly. I would prefer 
the following:
   ```scala
   override def resourceSplitByAuthorization(
     createTopicsRequest: CreateTopicsRequest
   ): (Map[String, CreatableTopic], Map[String, ApiError]) = {
   ```
   That makes it easier visually to separate the return type and the function 
logic (again, in my opinion).

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = 
"AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends 
Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, 
ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = 
resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, 
Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not 
caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new 
ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),
+          sendResponseMaybeThrottle,
+          request,
+          response => {
+            mergeResponse(response.responseBody.asInstanceOf[R], 
unauthorizedResources)
+          })
+      } else {
+        // When IBP is smaller than 2.7, forwarding is not supported,
+        // therefore requests are handled directly
+        process(authorizedResources, unauthorizedResources, requestBody)

Review comment:
       We can't guarantee that this broker will still be the controller when we 
call `process` or that the broker we're forwarding to will still be the 
controller when it receives the request. In these cases, we need to return some 
retriable error to the client. Can you help me understand how this is 
implemented?

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -513,15 +513,21 @@ class AdminManager(val config: KafkaConfig,
     resource -> ApiError.NONE
   }
 
-  private def alterBrokerConfigs(resource: ConfigResource, validateOnly: 
Boolean,
-                                 configProps: Properties, configEntriesMap: 
Map[String, String]): (ConfigResource, ApiError) = {
+  private def alterBrokerConfigs(resource: ConfigResource,
+                                 validateOnly: Boolean,
+                                 configProps: Properties,
+                                 configEntriesMap: Map[String, String]): 
(ConfigResource, ApiError) = {
     val brokerId = getBrokerId(resource)
     val perBrokerConfig = brokerId.nonEmpty
     this.config.dynamicConfig.validate(configProps, perBrokerConfig)
     validateConfigPolicy(resource, configEntriesMap)
     if (!validateOnly) {
-      if (perBrokerConfig)
+      if (perBrokerConfig) {
+        val previousConfigProps = 
config.dynamicConfig.currentDynamicBrokerConfigs
         
this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
+        this.config.dynamicConfig.maybeAugmentSSLStorePaths(configProps, 
previousConfigProps)

Review comment:
       Can you explain why this change is needed?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = 
"AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends 
Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, 
ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {

Review comment:
       nit: seems `handle` doesn't really need to be part of 
`ForwardRequestHandler`. Instead we could pull it out:
   ```scala
   private def handle(handler: ForwardRequestHandler): Unit = {
   ...
   ```
   The advantage of this is that it allows us to pull the type out of 
`KafkaApis` without inheriting all of the dependencies that are needed by 
`handle`.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
##########
@@ -25,23 +25,35 @@
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class IncrementalAlterConfigsResponse extends AbstractResponse {
 
-    public static IncrementalAlterConfigsResponseData toResponseData(final int 
requestThrottleMs,
-                                                                     final 
Map<ConfigResource, ApiError> results) {
-        IncrementalAlterConfigsResponseData responseData = new 
IncrementalAlterConfigsResponseData();
-        responseData.setThrottleTimeMs(requestThrottleMs);
-        for (Map.Entry<ConfigResource, ApiError> entry : results.entrySet()) {
-            responseData.responses().add(new AlterConfigsResourceResponse().
-                    setResourceName(entry.getKey().name()).
-                    setResourceType(entry.getKey().type().id()).
-                    setErrorCode(entry.getValue().error().code()).
-                    setErrorMessage(entry.getValue().message()));
-        }
-        return responseData;
+    public IncrementalAlterConfigsResponse(final int requestThrottleMs,
+                                           final Map<ConfigResource, ApiError> 
results) {
+        this.data = new IncrementalAlterConfigsResponseData()
+                        .setThrottleTimeMs(requestThrottleMs);
+
+        addResults(results);
+    }
+
+    public IncrementalAlterConfigsResponse addResults(final 
Map<ConfigResource, ApiError> results) {

Review comment:
       Typically responses are immutable after construction. It seems kind of a 
brittle pattern to rely on being able to mutate the response we receive from 
the other broker. For example we inherit the throttle time which is a bit 
weird. Are we saving that much by not creating a new response?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to