[GitHub] [kafka] hachikuji commented on a change in pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis

2020-12-01 Thread GitBox


hachikuji commented on a change in pull request #9300:
URL: https://github.com/apache/kafka/pull/9300#discussion_r533763099



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1770,45 +1770,47 @@ class KafkaApis(val requestChannel: RequestChannel,
   sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
 }
 
+// be sure to check authorization first, before checking if this is the 
controller, to avoid leaking
+// information about the system (i.e. who is the controller) to principals 
unauthorized for that information
+
 val createTopicsRequest = request.body[CreateTopicsRequest]
 val results = new 
CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-if (!controller.isActive) {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name)
-  .setErrorCode(Errors.NOT_CONTROLLER.code))
-  }
-  sendResponseCallback(results)
-} else {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name))
+createTopicsRequest.data.topics.forEach { topic =>
+  results.add(new CreatableTopicResult().setName(topic.name))
+}
+val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, 
CLUSTER_NAME,
+  logIfDenied = false)
+val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+val authorizedTopics =
+  if (hasClusterAuthorization) topics.toSet
+  else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
+  topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
+
+results.forEach { topic =>
+  if (results.findAll(topic.name).size > 1) {
+topic.setErrorCode(Errors.INVALID_REQUEST.code)
+topic.setErrorMessage("Found multiple entries for this topic.")
+  } else if (!authorizedTopics.contains(topic.name)) {
+topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+topic.setErrorMessage("Authorization failed.")
+  }
+  if (!authorizedForDescribeConfigs.contains(topic.name)) {
+topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
   }
-  val hasClusterAuthorization = authorize(request.context, CREATE, 
CLUSTER, CLUSTER_NAME,
-logIfDenied = false)
-  val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-  val authorizedTopics =
-if (hasClusterAuthorization) topics.toSet
-else filterByAuthorized(request.context, CREATE, TOPIC, 
topics)(identity)
-  val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
-topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
-
+}
+if (!controller.isActive) {
+  // Don't provide the information that this node is not the controller 
unless they were authorized
+  // to perform at least one of their requests.  So only set 
NOT_CONTROLLER error for anything that so far has a
+  // success/NONE error code.  Keep the existing error codes that we've 
determined rather than overwriting them
+  // with NOT_CONTROLLER because that is potentially useful information 
for the client.
   results.forEach { topic =>
-if (results.findAll(topic.name).size > 1) {
-  topic.setErrorCode(Errors.INVALID_REQUEST.code)
-  topic.setErrorMessage("Found multiple entries for this topic.")
-} else if (!authorizedTopics.contains(topic.name)) {
-  topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-  topic.setErrorMessage("Authorization failed.")
-}
-if (!authorizedForDescribeConfigs.contains(topic.name)) {
-  topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-}
-  }
-  val toCreate = mutable.Map[String, CreatableTopic]()
-  createTopicsRequest.data.topics.forEach { topic =>
-if (results.find(topic.name).errorCode == Errors.NONE.code) {
-  toCreate += topic.name -> topic
+if(topic.errorCode() == Errors.NONE.code()) {

Review comment:
   nit: convention is to add space after `if`. There are a few of these in 
the patch

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1770,45 +1770,47 @@ class KafkaApis(val requestChannel: RequestChannel,
   sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
 }
 
+// be sure to check authorization first, before checking if this is the 
controller, to avoid leaking
+// information about the system (i.e. who is the controller) to principals 
unauthorized for that information
+
 val createTopicsRequest = request.body[CreateTopicsReques

[GitHub] [kafka] hachikuji commented on a change in pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis

2020-09-17 Thread GitBox


hachikuji commented on a change in pull request #9300:
URL: https://github.com/apache/kafka/pull/9300#discussion_r490604721



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1732,45 +1732,57 @@ class KafkaApis(val requestChannel: RequestChannel,
   sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
 }
 
+// be sure to check authorization first, before checking if this is the 
controller, to avoid leaking
+// information about the system (i.e. who is the controller) to principals 
unauthorized for that information
+
 val createTopicsRequest = request.body[CreateTopicsRequest]
 val results = new 
CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-if (!controller.isActive) {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name)
-  .setErrorCode(Errors.NOT_CONTROLLER.code))
-  }
-  sendResponseCallback(results)
-} else {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name))
-  }
-  val hasClusterAuthorization = authorize(request.context, CREATE, 
CLUSTER, CLUSTER_NAME,
-logIfDenied = false)
-  val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-  val authorizedTopics =
-if (hasClusterAuthorization) topics.toSet
-else filterByAuthorized(request.context, CREATE, TOPIC, 
topics)(identity)
-  val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
-topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
+createTopicsRequest.data.topics.forEach { topic =>
+  results.add(new CreatableTopicResult().setName(topic.name))
+}
+val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, 
CLUSTER_NAME,
+  logIfDenied = false)
+val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+val authorizedTopics =
+  if (hasClusterAuthorization) topics.toSet
+  else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,

Review comment:
   Might not matter since `CreateTopics` requests are infrequent, but the 
two passes for authorization are a bit vexing. Feels like we are missing a good 
intermediate type between this handler and `AdminManager`. Maybe we can replace 
the 3 maps that we pass to `AdminManager.createTopic` with a single map which 
contains all the state we need for each topic.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1732,45 +1732,57 @@ class KafkaApis(val requestChannel: RequestChannel,
   sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
 }
 
+// be sure to check authorization first, before checking if this is the 
controller, to avoid leaking
+// information about the system (i.e. who is the controller) to principals 
unauthorized for that information
+
 val createTopicsRequest = request.body[CreateTopicsRequest]
 val results = new 
CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-if (!controller.isActive) {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name)
-  .setErrorCode(Errors.NOT_CONTROLLER.code))
-  }
-  sendResponseCallback(results)
-} else {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name))
-  }
-  val hasClusterAuthorization = authorize(request.context, CREATE, 
CLUSTER, CLUSTER_NAME,
-logIfDenied = false)
-  val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-  val authorizedTopics =
-if (hasClusterAuthorization) topics.toSet
-else filterByAuthorized(request.context, CREATE, TOPIC, 
topics)(identity)
-  val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
-topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
+createTopicsRequest.data.topics.forEach { topic =>
+  results.add(new CreatableTopicResult().setName(topic.name))
+}
+val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, 
CLUSTER_NAME,
+  logIfDenied = false)
+val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+val authorizedTopics =
+  if (hasClusterAuthorization) topics.toSet
+  else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
+  topics, logIfDenied = false)(identity).map(name => name -> 
results.