mumrah commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612039100



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -343,21 +366,22 @@ class ControllerApis(val requestChannel: RequestChannel,
         iterator.remove()
       }
     }
-    val response = controller.createTopics(effectiveRequest).get()
-    duplicateTopicNames.forEach { name =>
-      response.topics().add(new CreatableTopicResult().
-        setName(name).
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Found multiple entries for this topic."))
-    }
-    topicNames.forEach { name =>
-      if (!authorizedTopicNames.contains(name)) {
+    controller.createTopics(effectiveRequest).thenApply(response => {
+      duplicateTopicNames.forEach { name =>
         response.topics().add(new CreatableTopicResult().
           setName(name).
-          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("Found multiple entries for this topic."))
       }
-    }
-    response
+      topicNames.forEach { name =>
+        if (!authorizedTopicNames.contains(name)) {
+          response.topics().add(new CreatableTopicResult().
+            setName(name).
+            setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))

Review comment:
       nit: here and other places, we don't need parens for `code()`

##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -151,6 +221,73 @@ class ControllerApisTest {
       brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testUnauthorizedHandleAlterClientQuotas(): Unit = {
+    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
+      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
+        handleAlterClientQuotas(buildRequest(new AlterClientQuotasRequest(
+          new AlterClientQuotasRequestData(), 0))))
+  }
+
+  @Test
+  def testUnauthorizedHandleIncrementalAlterConfigs(): Unit = {
+    val requestData = new IncrementalAlterConfigsRequestData().setResources(
+      new AlterConfigsResourceCollection(
+        util.Arrays.asList(new 
IncrementalAlterConfigsRequestData.AlterConfigsResource().
+          setResourceName("1").
+          setResourceType(ConfigResource.Type.BROKER.id()).
+          setConfigs(new AlterableConfigCollection(util.Arrays.asList(new 
AlterableConfig().
+            setName("log.cleaner.backoff.ms").

Review comment:
       Should we use the static KafkaConfig property definitions instead of 
these strings?

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -35,8 +36,8 @@ import org.apache.kafka.common.internals.FatalExitError
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, 
DeletableTopicResultCollection}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
-import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, 
BrokerHeartbeatResponseData, BrokerRegistrationResponseData, 
CreateTopicsRequestData, CreateTopicsResponseData, DeleteTopicsRequestData, 
DeleteTopicsResponseData, DescribeQuorumResponseData, 
EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, 
SaslAuthenticateResponseData, SaslHandshakeResponseData, 
UnregisterBrokerResponseData, VoteResponseData}
-import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, 
TOPIC_AUTHORIZATION_FAILED}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors._

Review comment:
       In KafkaApis we import `Errors` rather than importing all the members of 
the enum via a wildcard import. Any reason to prefer one way over the other? It 
seems more common in our code base to import the enum and refer to members like 
`Errors.ILLEGAL_SASL_STATE`

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -238,87 +249,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of 
describeable
-    // topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for 
it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {
+      topicNames.forEach { (id, nameOrError) =>
+        if (nameOrError.isError) {
+          appendResponse(null, id, nameOrError.error())
         } else {
-          appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          toAuthenticate.add(nameOrError.result())
+          idToName.put(id, nameOrError.result())
         }
-        iterator.remove()
       }
-    }
-    // For each topic that was provided by name, check if authentication 
failed.
-    // If so, create an error response for it.  Otherwise, add it to the 
idToName map.
-    controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
-      if (!describeable.contains(name)) {
-        appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
-      } else if (idOrError.isError) {
-        appendResponse(name, ZERO_UUID, idOrError.error)
-      } else if (deletable.contains(name)) {
-        val id = idOrError.result()
-        if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != 
null) {
-          // This is kind of a weird case: what if we supply topic ID X and 
also a name
-          // that maps to ID X?  In that case, _if authorization succeeds_, we 
end up
-          // here.  If authorization doesn't succeed, we refrain from 
commenting on the
-          // situation since it would reveal topic ID mappings.
-          duplicateProvidedIds.add(id)
-          idToName.remove(id)
-          appendResponse(name, id, new ApiError(INVALID_REQUEST,
-            "The provided topic name maps to an ID that was already 
supplied."))
-        }
+      // Get the list of deletable topics (those we can delete) and the list 
of describeable
+      // topics.
+      val topicsToAuthenticate = toAuthenticate.asScala
+      val (describeable, deletable) = if (hasClusterAuth) {
+        (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
       } else {
-        appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
+        (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
       }
-    }
-    // Finally, the idToName map contains all the topics that we are 
authorized to delete.
-    // Perform the deletion and create responses for each one.
-    val idToError = controller.deleteTopics(idToName.keySet).get()
-    idToError.forEach { (id, error) =>
-        appendResponse(idToName.get(id), id, error)
-    }
-    // Shuffle the responses so that users can not use patterns in their 
positions to
-    // distinguish between absent topics and topics we are not permitted to 
see.
-    Collections.shuffle(responses)
-    responses
+      // For each topic that was provided by ID, check if authentication 
failed.
+      // If so, remove it from the idToName map and create an error response 
for it.
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val id = entry.getKey
+        val name = entry.getValue
+        if (!deletable.contains(name)) {
+          if (describeable.contains(name)) {
+            appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else {
+            appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+          iterator.remove()
+        }
+      }
+      // For each topic that was provided by name, check if authentication 
failed.
+      // If so, create an error response for it.  Otherwise, add it to the 
idToName map.
+      controller.findTopicIds(providedNames).thenCompose(topicIds => {
+        topicIds.forEach { (name, idOrError) =>
+          if (!describeable.contains(name)) {
+            appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else if (idOrError.isError) {
+            appendResponse(name, ZERO_UUID, idOrError.error)
+          } else if (deletable.contains(name)) {
+            val id = idOrError.result()
+            if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != 
null) {
+              // This is kind of a weird case: what if we supply topic ID X 
and also a name
+              // that maps to ID X?  In that case, _if authorization 
succeeds_, we end up
+              // here.  If authorization doesn't succeed, we refrain from 
commenting on the
+              // situation since it would reveal topic ID mappings.
+              duplicateProvidedIds.add(id)
+              idToName.remove(id)
+              appendResponse(name, id, new ApiError(INVALID_REQUEST,
+                "The provided topic name maps to an ID that was already 
supplied."))
+            }
+          } else {
+            appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+        }
+        // Finally, the idToName map contains all the topics that we are 
authorized to delete.
+        // Perform the deletion and create responses for each one.
+        controller.deleteTopics(idToName.keySet).thenApply(idToError => {
+          idToError.forEach { (id, error) =>
+            appendResponse(idToName.get(id), id, error)
+          }
+          // Shuffle the responses so that users can not use patterns in their 
positions to
+          // distinguish between absent topics and topics we are not permitted 
to see.
+          Collections.shuffle(responses)
+          responses
+        })
+      })
+    })
   }
 
   def handleCreateTopics(request: RequestChannel.Request): Unit = {
-    val responseData = createTopics(request.body[CreateTopicsRequest].data(),
+    val createTopicsRequest = request.body[CreateTopicsRequest]
+    val future = createTopics(createTopicsRequest.data(),
         authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
         names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, 
names)(identity))
-    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
-      responseData.setThrottleTimeMs(throttleTimeMs)
-      new CreateTopicsResponse(responseData)
+    future.whenComplete((result, exception) => {
+      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+        if (exception != null) {
+          createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
+        } else {
+          result.setThrottleTimeMs(throttleTimeMs)
+          new CreateTopicsResponse(result)
+        }
+      })
     })
   }
 
   def createTopics(request: CreateTopicsRequestData,
                    hasClusterAuth: Boolean,
-                   getCreatableTopics: Iterable[String] => Set[String]): 
CreateTopicsResponseData = {
+                   getCreatableTopics: Iterable[String] => Set[String])
+                   : CompletableFuture[CreateTopicsResponseData] = {

Review comment:
       Indentation seems weird. (I don't really know if it's correct or not)

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -517,25 +540,67 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
     val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
-    authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
-    val configChanges = new util.HashMap[ConfigResource, util.Map[String, 
util.Map.Entry[AlterConfigOp.OpType, String]]]()
+    val configChanges = new util.HashMap[ConfigResource,
+      util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
     alterConfigsRequest.data.resources.forEach { resource =>
-      val configResource = new 
ConfigResource(ConfigResource.Type.forId(resource.resourceType), 
resource.resourceName())
-      val altersByName = new util.HashMap[String, 
util.Map.Entry[AlterConfigOp.OpType, String]]()
+      val configResource = new ConfigResource(
+        ConfigResource.Type.forId(resource.resourceType), 
resource.resourceName())
+      val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, 
String]]()
       resource.configs.forEach { config =>
         altersByName.put(config.name, new 
util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
           AlterConfigOp.OpType.forId(config.configOperation), config.value))
       }
       configChanges.put(configResource, altersByName)
     }
+    val results = new util.HashMap[ConfigResource, ApiError]
+    val iterator = configChanges.keySet().iterator()
+    while (iterator.hasNext) {
+      val resource = iterator.next()
+      val apiError = resource.`type` match {
+        case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
+          if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
+            new ApiError(NONE)
+          } else {
+            new ApiError(CLUSTER_AUTHORIZATION_FAILED)

Review comment:
       If we get an authorization error, we continue processing the remaining 
config items? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to