[GitHub] [kafka] hachikuji commented on a change in pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis
hachikuji commented on a change in pull request #9300: URL: https://github.com/apache/kafka/pull/9300#discussion_r533763099 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1770,45 +1770,47 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None) } +// be sure to check authorization first, before checking if this is the controller, to avoid leaking +// information about the system (i.e. who is the controller) to principals unauthorized for that information + val createTopicsRequest = request.body[CreateTopicsRequest] val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size) -if (!controller.isActive) { - createTopicsRequest.data.topics.forEach { topic => -results.add(new CreatableTopicResult().setName(topic.name) - .setErrorCode(Errors.NOT_CONTROLLER.code)) - } - sendResponseCallback(results) -} else { - createTopicsRequest.data.topics.forEach { topic => -results.add(new CreatableTopicResult().setName(topic.name)) +createTopicsRequest.data.topics.forEach { topic => + results.add(new CreatableTopicResult().setName(topic.name)) +} +val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, + logIfDenied = false) +val topics = createTopicsRequest.data.topics.asScala.map(_.name) +val authorizedTopics = + if (hasClusterAuthorization) topics.toSet + else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) +val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, + topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap + +results.forEach { topic => + if (results.findAll(topic.name).size > 1) { +topic.setErrorCode(Errors.INVALID_REQUEST.code) +topic.setErrorMessage("Found multiple entries for this topic.") + } else if (!authorizedTopics.contains(topic.name)) { +topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) +topic.setErrorMessage("Authorization failed.") + } + if (!authorizedForDescribeConfigs.contains(topic.name)) { +topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) } - val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, -logIfDenied = false) - val topics = createTopicsRequest.data.topics.asScala.map(_.name) - val authorizedTopics = -if (hasClusterAuthorization) topics.toSet -else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) - val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, -topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap - +} +if (!controller.isActive) { + // Don't provide the information that this node is not the controller unless they were authorized + // to perform at least one of their requests. So only set NOT_CONTROLLER error for anything that so far has a + // success/NONE error code. Keep the existing error codes that we've determined rather than overwriting them + // with NOT_CONTROLLER because that is potentially useful information for the client. results.forEach { topic => -if (results.findAll(topic.name).size > 1) { - topic.setErrorCode(Errors.INVALID_REQUEST.code) - topic.setErrorMessage("Found multiple entries for this topic.") -} else if (!authorizedTopics.contains(topic.name)) { - topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) - topic.setErrorMessage("Authorization failed.") -} -if (!authorizedForDescribeConfigs.contains(topic.name)) { - topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) -} - } - val toCreate = mutable.Map[String, CreatableTopic]() - createTopicsRequest.data.topics.forEach { topic => -if (results.find(topic.name).errorCode == Errors.NONE.code) { - toCreate += topic.name -> topic +if(topic.errorCode() == Errors.NONE.code()) { Review comment: nit: convention is to add space after `if`. There are a few of these in the patch ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1770,45 +1770,47 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None) } +// be sure to check authorization first, before checking if this is the controller, to avoid leaking +// information about the system (i.e. who is the controller) to principals unauthorized for that information + val createTopicsRequest = request.body[CreateTopicsReques
[GitHub] [kafka] hachikuji commented on a change in pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis
hachikuji commented on a change in pull request #9300: URL: https://github.com/apache/kafka/pull/9300#discussion_r490604721 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1732,45 +1732,57 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None) } +// be sure to check authorization first, before checking if this is the controller, to avoid leaking +// information about the system (i.e. who is the controller) to principals unauthorized for that information + val createTopicsRequest = request.body[CreateTopicsRequest] val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size) -if (!controller.isActive) { - createTopicsRequest.data.topics.forEach { topic => -results.add(new CreatableTopicResult().setName(topic.name) - .setErrorCode(Errors.NOT_CONTROLLER.code)) - } - sendResponseCallback(results) -} else { - createTopicsRequest.data.topics.forEach { topic => -results.add(new CreatableTopicResult().setName(topic.name)) - } - val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, -logIfDenied = false) - val topics = createTopicsRequest.data.topics.asScala.map(_.name) - val authorizedTopics = -if (hasClusterAuthorization) topics.toSet -else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) - val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, -topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap +createTopicsRequest.data.topics.forEach { topic => + results.add(new CreatableTopicResult().setName(topic.name)) +} +val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, + logIfDenied = false) +val topics = createTopicsRequest.data.topics.asScala.map(_.name) +val authorizedTopics = + if (hasClusterAuthorization) topics.toSet + else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) +val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, Review comment: Might not matter since `CreateTopics` requests are infrequent, but the two passes for authorization are a bit vexing. Feels like we are missing a good intermediate type between this handler and `AdminManager`. Maybe we can replace the 3 maps that we pass to `AdminManager.createTopic` with a single map which contains all the state we need for each topic. ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1732,45 +1732,57 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None) } +// be sure to check authorization first, before checking if this is the controller, to avoid leaking +// information about the system (i.e. who is the controller) to principals unauthorized for that information + val createTopicsRequest = request.body[CreateTopicsRequest] val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size) -if (!controller.isActive) { - createTopicsRequest.data.topics.forEach { topic => -results.add(new CreatableTopicResult().setName(topic.name) - .setErrorCode(Errors.NOT_CONTROLLER.code)) - } - sendResponseCallback(results) -} else { - createTopicsRequest.data.topics.forEach { topic => -results.add(new CreatableTopicResult().setName(topic.name)) - } - val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, -logIfDenied = false) - val topics = createTopicsRequest.data.topics.asScala.map(_.name) - val authorizedTopics = -if (hasClusterAuthorization) topics.toSet -else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) - val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, -topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap +createTopicsRequest.data.topics.forEach { topic => + results.add(new CreatableTopicResult().setName(topic.name)) +} +val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, + logIfDenied = false) +val topics = createTopicsRequest.data.topics.asScala.map(_.name) +val authorizedTopics = + if (hasClusterAuthorization) topics.toSet + else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) +val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, + topics, logIfDenied = false)(identity).map(name => name -> results.