Repository: kafka Updated Branches: refs/heads/trunk fb6ca658d -> 10cd98cc8
KAFKA-5547; Return TOPIC_AUTHORIZATION_FAILED error if no describe access for topics Author: Manikumar Reddy <manikumar.re...@gmail.com> Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>, Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #3924 from omkreddy/KAFKA-5547-TOPIC-AUTHRO Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10cd98cc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10cd98cc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10cd98cc Branch: refs/heads/trunk Commit: 10cd98cc894b88c5d1e24fc54c66361ad9914df2 Parents: fb6ca65 Author: Manikumar Reddy <manikumar.re...@gmail.com> Authored: Fri Oct 6 12:51:30 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Oct 6 12:51:30 2017 -0700 ---------------------------------------------------------------------- .../consumer/internals/ConsumerCoordinator.java | 5 +- .../common/requests/OffsetFetchResponse.java | 2 + .../src/main/scala/kafka/server/KafkaApis.scala | 198 ++++++++++--------- .../kafka/api/AuthorizerIntegrationTest.scala | 53 ++--- .../kafka/api/EndToEndAuthorizationTest.scala | 4 +- .../api/SaslEndToEndAuthorizationTest.scala | 6 +- docs/upgrade.html | 4 + 7 files changed, 145 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 38ca041..5482db7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -786,7 +786,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { future.raise(new CommitFailedException()); return; } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { - future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic")); + future.raise(new KafkaException("Topic or Partition " + tp + " does not exist")); return; } else { future.raise(new KafkaException("Unexpected error in commit: " + error.message())); @@ -857,8 +857,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { log.debug("Failed to fetch offset for partition {}: {}", tp, error.message()); if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { - future.raise(new KafkaException("Partition " + tp + " may not exist or the user may not have " + - "Describe access to the topic")); + future.raise(new KafkaException("Topic or Partition " + tp + " does not exist")); } else { future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); } http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 4d069fe..e398442 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -85,6 +85,8 @@ public class OffsetFetchResponse extends AbstractResponse { public static final String NO_METADATA = ""; public static final PartitionData UNKNOWN_PARTITION = new PartitionData(INVALID_OFFSET, NO_METADATA, Errors.UNKNOWN_TOPIC_OR_PARTITION); + public static final PartitionData UNAUTHORIZED_PARTITION = new PartitionData(INVALID_OFFSET, NO_METADATA, + Errors.TOPIC_AUTHORIZATION_FAILED); /** * Possible error codes: http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c171aaa..aa00565 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -264,25 +264,25 @@ class KafkaApis(val requestChannel: RequestChannel, }.toMap sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava)) } else { - val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition { - case (topicPartition, _) => - val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic)) - val exists = metadataCache.contains(topicPartition.topic) - if (!authorizedForDescribe && exists) - debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION") - authorizedForDescribe && exists - } - val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition { - case (topicPartition, _) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic)) + var unauthorizedTopics = Set[TopicPartition]() + var nonExistingTopics = Set[TopicPartition]() + var authorizedTopics = mutable.Map[TopicPartition, OffsetCommitRequest.PartitionData]() + + for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala.toMap) { + if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) + unauthorizedTopics += topicPartition + else if (!metadataCache.contains(topicPartition.topic)) + nonExistingTopics += topicPartition + else + authorizedTopics += (topicPartition -> partitionData) } // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) { val combinedCommitStatus = commitStatus ++ - unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++ - nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION) + unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ + nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) if (isDebugEnabled) combinedCommitStatus.foreach { case (topicPartition, error) => @@ -313,7 +313,7 @@ class KafkaApis(val requestChannel: RequestChannel, case e: Throwable => (topicPartition, Errors.forException(e)) } } - sendResponseCallback(responseInfo) + sendResponseCallback(responseInfo.toMap) } else { // for version 1 and beyond store offsets in offset manager @@ -353,7 +353,7 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequest.groupId, offsetCommitRequest.memberId, offsetCommitRequest.generationId, - partitionData, + partitionData.toMap, sendResponseCallback) } } @@ -381,21 +381,25 @@ class KafkaApis(val requestChannel: RequestChannel, return } - val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = - produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) => - authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic) - } + var unauthorizedTopics = Set[TopicPartition]() + var nonExistingTopics = Set[TopicPartition]() + var authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() - val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { - case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic)) + for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) { + if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic))) + unauthorizedTopics += topicPartition + else if (!metadataCache.contains(topicPartition.topic)) + nonExistingTopics += topicPartition + else + authorizedRequestInfo += (topicPartition -> memoryRecords) } // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { val mergedResponseStatus = responseStatus ++ - unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++ - nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)) + unauthorizedTopics.map(_ -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++ + nonExistingTopics.map(_ -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)) var errorInResponse = false @@ -479,21 +483,26 @@ class KafkaApis(val requestChannel: RequestChannel, val versionId = request.header.apiVersion val clientId = request.header.clientId - val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.fetchData.asScala.toSeq.partition { - case (tp, _) => authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic) - } + var unauthorizedTopics = Set[TopicPartition]() + var nonExistingTopics = Set[TopicPartition]() + var authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { - case (tp, _) => authorize(request.session, Read, new Resource(Topic, tp.topic)) + for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) { + if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) + unauthorizedTopics += topicPartition + else if (!metadataCache.contains(topicPartition.topic)) + nonExistingTopics += topicPartition + else + authorizedRequestInfo += (topicPartition -> partitionData) } - val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map { - case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, + val nonExistingPartitionData = nonExistingTopics.map { + case tp => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)) } - val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map { - case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, + val unauthorizedForReadPartitionData = unauthorizedTopics.map { + case tp => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)) } @@ -538,7 +547,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData + val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingPartitionData val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]() @@ -644,7 +653,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => - new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, List[JLong]().asJava) + new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, List[JLong]().asJava) ) val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) => @@ -697,7 +706,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => { - new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, + new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) }) @@ -957,7 +966,7 @@ class KafkaApis(val requestChannel: RequestChannel, Set.empty[MetadataResponse.TopicMetadata] else unauthorizedForDescribeTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList())) + new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, java.util.Collections.emptyList())) // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list @@ -1029,7 +1038,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }.toMap - val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap + val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava) } else { // versions 1 and above read offsets from Kafka @@ -1050,7 +1059,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (error != Errors.NONE) offsetFetchRequest.getErrorResponse(requestThrottleMs, error) else { - val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap + val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava) } } @@ -1370,11 +1379,10 @@ class KafkaApis(val requestChannel: RequestChannel, val (queuedForDeletion, valid) = authorized.partition { case (topic, _) => controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic) - } val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++ - unauthorized.keySet.map( topic => topic -> createPartitionsAuthorizationApiError(request.session, topic) ) ++ + unauthorized.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "The topic authorization is failed.")) ++ queuedForDeletion.keySet.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion.")) adminManager.createPartitions(createPartitionsRequest.timeout, valid, createPartitionsRequest.validateOnly, @@ -1382,28 +1390,26 @@ class KafkaApis(val requestChannel: RequestChannel, } } - private def createPartitionsAuthorizationApiError(session: RequestChannel.Session, topic: String): ApiError = { - if (authorize(session, Describe, new Resource(Topic, topic))) - new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null) - else - new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null) - } - def handleDeleteTopicsRequest(request: RequestChannel.Request) { val deleteTopicRequest = request.body[DeleteTopicsRequest] - val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition { topic => - authorize(request.session, Describe, new Resource(Topic, topic)) && metadataCache.contains(topic) - } + var unauthorizedTopics = Set[String]() + var nonExistingTopics = Set[String]() + var authorizedForDeleteTopics = Set[String]() - val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition { topic => - authorize(request.session, Delete, new Resource(Topic, topic)) + for (topic <- deleteTopicRequest.topics.asScala) { + if (!authorize(request.session, Delete, new Resource(Topic, topic))) + unauthorizedTopics += topic + else if (!metadataCache.contains(topic)) + nonExistingTopics += topic + else + authorizedForDeleteTopics += topic } def sendResponseCallback(results: Map[String, Errors]): Unit = { def createResponse(requestThrottleMs: Int): AbstractResponse = { - val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ - unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results + val completeResults = unauthorizedTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ + nonExistingTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ results val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava) trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.") responseBody @@ -1418,12 +1424,12 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback(results) } else { // If no authorized topics return immediately - if (authorizedTopics.isEmpty) + if (authorizedForDeleteTopics.isEmpty) sendResponseCallback(Map()) else { adminManager.deleteTopics( deleteTopicRequest.timeout.toInt, - authorizedTopics, + authorizedForDeleteTopics, sendResponseCallback ) } @@ -1433,21 +1439,26 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDeleteRecordsRequest(request: RequestChannel.Request) { val deleteRecordsRequest = request.body[DeleteRecordsRequest] - val (authorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteRecordsRequest.partitionOffsets.asScala.partition { - case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic) - } + var unauthorizedTopics = Set[TopicPartition]() + var nonExistingTopics = Set[TopicPartition]() + var authorizedForDeleteTopics = mutable.Map[TopicPartition, Long]() - val (authorizedForDeleteTopics, unauthorizedForDeleteTopics) = authorizedForDescribeTopics.partition { - case (topicPartition, _) => authorize(request.session, Delete, new Resource(Topic, topicPartition.topic)) + for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) { + if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic))) + unauthorizedTopics += topicPartition + else if (!metadataCache.contains(topicPartition.topic)) + nonExistingTopics += topicPartition + else + authorizedForDeleteTopics += (topicPartition -> offset) } // the callback for sending a DeleteRecordsResponse def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) { val mergedResponseStatus = responseStatus ++ - unauthorizedForDeleteTopics.mapValues(_ => + unauthorizedTopics.map(_ -> new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++ - nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => + nonExistingTopics.map(_ -> new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION)) mergedResponseStatus.foreach { case (topicPartition, status) => @@ -1646,24 +1657,28 @@ class KafkaApis(val requestChannel: RequestChannel, else { val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())} - val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = - partitionsToAdd.asScala.partition { tp => - authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp) - } + var unauthorizedTopics = Set[TopicPartition]() + var nonExistingTopics = Set[TopicPartition]() + var authorizedPartitions = Set[TopicPartition]() - val (authorizedPartitions, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { tp => - authorize(request.session, Write, new Resource(Topic, tp.topic)) + for ( topicPartition <- partitionsToAdd.asScala) { + if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic))) + unauthorizedTopics += topicPartition + else if (!metadataCache.contains(topicPartition.topic)) + nonExistingTopics += topicPartition + else + authorizedPartitions += topicPartition } - if (nonExistingOrUnauthorizedForDescribeTopics.nonEmpty - || unauthorizedForWriteRequestInfo.nonEmpty + if (unauthorizedTopics.nonEmpty + || nonExistingTopics.nonEmpty || internalTopics.nonEmpty) { // Any failed partition check causes the entire request to fail. We send the appropriate error codes for the // partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded // the authorization check to indicate that they were not added to the transaction. - val partitionErrors = (unauthorizedForWriteRequestInfo.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ - nonExistingOrUnauthorizedForDescribeTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++ + val partitionErrors = (unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ + nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++ internalTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)).toMap @@ -1734,26 +1749,24 @@ class KafkaApis(val requestChannel: RequestChannel, else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId))) sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) else { - val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = txnOffsetCommitRequest.offsets.asScala.toMap.partition { - case (topicPartition, _) => - val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic)) - val exists = metadataCache.contains(topicPartition.topic) - if (!authorizedForDescribe && exists) - debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning " + - s"${Errors.UNKNOWN_TOPIC_OR_PARTITION.name}") - authorizedForDescribe && exists - } - - val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition { - case (topicPartition, _) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic)) + var unauthorizedTopics = Set[TopicPartition]() + var nonExistingTopics = Set[TopicPartition]() + var authorizedTopics = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]() + + for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) { + if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) + unauthorizedTopics += topicPartition + else if (!metadataCache.contains(topicPartition.topic)) + nonExistingTopics += topicPartition + else + authorizedTopics += (topicPartition -> commitedOffset) } // the callback for sending an offset commit response def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) { val combinedCommitStatus = commitStatus ++ - unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++ - nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION) + unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ + nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) if (isDebugEnabled) combinedCommitStatus.foreach { case (topicPartition, error) => @@ -1769,7 +1782,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.isEmpty) sendResponseCallback(Map.empty) else { - val offsetMetadata = convertTxnOffsets(authorizedTopics) + val offsetMetadata = convertTxnOffsets(authorizedTopics.toMap) groupCoordinator.handleTxnCommitOffsets( txnOffsetCommitRequest.consumerGroupId, txnOffsetCommitRequest.producerId, @@ -1942,12 +1955,7 @@ class KafkaApis(val requestChannel: RequestChannel, private def configsAuthorizationApiError(session: RequestChannel.Session, resource: RResource): ApiError = { val error = resource.`type` match { case RResourceType.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED - case RResourceType.TOPIC => - // Don't leak topic name unless the user has describe topic permission - if (authorize(session, Describe, new Resource(Topic, resource.name))) - Errors.TOPIC_AUTHORIZATION_FAILED - else - Errors.UNKNOWN_TOPIC_OR_PARTITION + case RResourceType.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}") } new ApiError(error, null) http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index d07d08e..522fcd3 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -382,6 +382,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(Collections.singleton(tp)).build() + private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build() + + private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build() + @Test def testAuthorizationWithTopicExisting() { @@ -413,26 +417,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DESCRIBE_ACLS -> describeAclsRequest, ApiKeys.ALTER_REPLICA_LOG_DIRS -> alterReplicaLogDirsRequest, ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest, - ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest + ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest, + ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest, + ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest ) for ((key, request) <- requestKeyToRequest) { removeAllAcls() val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false) val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).foreach { acls => val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) - sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized) removeAllAcls() } for ((resource, acls) <- resourceToAcls) addAndVerifyAcls(acls, resource) - sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true) } } @@ -447,7 +453,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers) val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( - ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false), ApiKeys.PRODUCE -> createProduceRequest, ApiKeys.FETCH -> createFetchRequest, @@ -455,26 +460,29 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, ApiKeys.DELETE_TOPICS -> deleteTopicsRequest, - ApiKeys.DELETE_RECORDS -> deleteRecordsRequest + ApiKeys.DELETE_RECORDS -> deleteRecordsRequest, + ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest, + ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest, + ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest ) for ((key, request) <- requestKeyToRequest) { removeAllAcls() val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, topicExists = false) val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).foreach { acls => val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) - sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, topicExists = false) removeAllAcls() } for ((resource, acls) <- resourceToAcls) addAndVerifyAcls(acls, resource) - sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false, topicExists = false) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, topicExists = false) } } @@ -484,7 +492,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { sendRecords(numRecords, tp) fail("should have thrown exception") } catch { - case _: TimeoutException => //expected + case _: TopicAuthorizationException => //expected } } @@ -534,8 +542,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { sendRecords(numRecords, topicPartition) } - @Test(expected = classOf[GroupAuthorizationException]) - def testConsumeWithNoAccess(): Unit = { + @Test(expected = classOf[TopicAuthorizationException]) + def testConsumeUsingAssignWithNoAccess(): Unit = { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() @@ -893,10 +901,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.position(tp) } - @Test + @Test(expected = classOf[TopicAuthorizationException]) def testListOffsetsWithNoTopicAccess() { - val partitionInfos = this.consumers.head.partitionsFor(topic) - assertNull(partitionInfos) + this.consumers.head.partitionsFor(topic) } @Test @@ -935,7 +942,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS) val version = ApiKeys.DELETE_TOPICS.latestVersion val deleteResponse = DeleteTopicsResponse.parse(response, version) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteResponse.errors.asScala.head._2) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteResponse.errors.asScala.head._2) } @Test @@ -963,7 +970,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS) val version = ApiKeys.DELETE_RECORDS.latestVersion val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteRecordsResponse.responses.asScala.head._2.error) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteRecordsResponse.responses.asScala.head._2.error) } @Test @@ -990,7 +997,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS) val version = ApiKeys.CREATE_PARTITIONS.latestVersion val createPartitionsResponse = CreatePartitionsResponse.parse(response, version) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, createPartitionsResponse.errors.asScala.head._2.error) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, createPartitionsResponse.errors.asScala.head._2.error) } @Test @@ -1240,7 +1247,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { request: AbstractRequest, resources: Set[ResourceType], isAuthorized: Boolean, - isAuthorizedTopicDescribe: Boolean, topicExists: Boolean = true): AbstractResponse = { val resp = connectAndSend(request, apiKey) val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke( @@ -1251,8 +1257,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { if (resourceType == Topic) { if (isAuthorized) Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.error) - else if (!isAuthorizedTopicDescribe) - Set(Errors.UNKNOWN_TOPIC_OR_PARTITION) else Set(Topic.error) } else { @@ -1266,9 +1270,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { else assertTrue(s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors", authorizationErrors.contains(error)) else if (resources == Set(Topic)) - assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error) - else - assertNotEquals(s"$apiKey had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error) + if (isAuthorized) + assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error) + else + assertEquals(s"$apiKey had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error) response } http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index bbb3249..720d8b6 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -214,7 +214,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas * Tests that a producer fails to publish messages when the appropriate ACL * isn't set. */ - @Test(expected = classOf[TimeoutException]) + @Test(expected = classOf[TopicAuthorizationException]) def testNoProduceWithoutDescribeAcl(): Unit = { sendRecords(numRecords, tp) } @@ -246,7 +246,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas consumeRecords(this.consumers.head) } - @Test(expected = classOf[TimeoutException]) + @Test(expected = classOf[TopicAuthorizationException]) def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = { noConsumeWithoutDescribeAclSetup() consumers.head.subscribe(List(topic).asJava) http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index a366b1d..fb6bee8 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -21,8 +21,8 @@ import java.util.Properties import kafka.utils.TestUtils import kafka.utils.Implicits._ import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.errors.GroupAuthorizationException import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.errors.TopicAuthorizationException import org.junit.{Before, Test} import scala.collection.immutable.List @@ -77,9 +77,9 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { try { consumeRecords(consumer2) - fail("Expected exception as consumer2 has no access to group") + fail("Expected exception as consumer2 has no access to topic") } catch { - case _: GroupAuthorizationException => //expected + case _: TopicAuthorizationException => //expected } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index 5872c7c..862dadb 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -86,6 +86,10 @@ inversion bug, it was previously enabled by default and disabled if <code>kafka_mx4jenable</code> was set to <code>true</code>.</li> <li>The package <code>org.apache.kafka.common.security.auth</code> in the clients jar has been made public and added to the javadocs. Internal classes which had previously been located in this package have been moved elsewhere.</li> + <li>When using an Authorizer and a user doesn't have required permissions on a topic, the broker + will return TOPIC_AUTHORIZATION_FAILED errors to requests irrespective of topic existence on broker. + If the user have required permissions and the topic doesn't exists, then the UNKNOWN_TOPIC_OR_PARTITION + error code will be returned. </li> </ul> <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5>