Repository: kafka Updated Branches: refs/heads/0.10.1 8ad31173e -> 4cb726fde
KAFKA-3396; Ensure Describe access is required to detect topic existence Reopening of https://github.com/apache/kafka/pull/1428 Author: Edoardo Comar <[email protected]> Author: Mickael Maison <[email protected]> Reviewers: Grant Henke <[email protected]>, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]> Closes #1908 from edoardocomar/KAFKA-3396 (cherry picked from commit 8124f6e0996cb673760750b3aba004ae11e34c6a) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4cb726fd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4cb726fd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4cb726fd Branch: refs/heads/0.10.1 Commit: 4cb726fde57675239d831882c8bb5296e0fa2249 Parents: 8ad3117 Author: Edoardo Comar <[email protected]> Authored: Fri Sep 30 23:07:51 2016 -0700 Committer: Jason Gustafson <[email protected]> Committed: Fri Sep 30 23:08:52 2016 -0700 ---------------------------------------------------------------------- .../consumer/internals/ConsumerCoordinator.java | 6 + .../src/main/scala/kafka/admin/AdminUtils.scala | 4 +- .../security/auth/SimpleAclAuthorizer.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 132 ++++++----- .../main/scala/kafka/server/MetadataCache.scala | 6 - .../kafka/api/AuthorizerIntegrationTest.scala | 233 ++++++++++++++++--- .../kafka/api/EndToEndAuthorizationTest.scala | 176 +++++++++++--- .../kafka/api/IntegrationTestHarness.scala | 48 +++- .../unit/kafka/admin/DeleteTopicTest.scala | 6 +- .../kafka/server/DeleteTopicsRequestTest.scala | 4 +- docs/upgrade.html | 2 + 11 files changed, 474 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index ff0d669..27d6a75 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -670,6 +670,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { resetGeneration(); future.raise(new CommitFailedException()); return; + } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message()); + future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message())); + return; } else { log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message()); future.raise(new KafkaException("Unexpected error in commit: " + error.message())); @@ -731,6 +735,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // re-discover the coordinator and retry coordinatorDead(); future.raise(error); + } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message())); } else { future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 0273bdb..7873028 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -26,7 +26,7 @@ import kafka.utils.ZkUtils._ import java.util.Random import java.util.Properties import org.apache.kafka.common.Node -import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException} +import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException} import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.requests.MetadataResponse @@ -325,7 +325,7 @@ object AdminUtils extends Logging with AdminUtilities { case e2: Throwable => throw new AdminOperationException(e2) } } else { - throw new InvalidTopicException("topic %s to delete does not exist".format(topic)) + throw new UnknownTopicOrPartitionException("topic %s to delete does not exist".format(topic)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index a36a07d..42bfebf 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -127,9 +127,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging { //check if there is any Deny acl match that would disallow this operation. val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls) - //if principal is allowed to read or write we allow describe by default, the reverse does not apply to Deny. + //if principal is allowed to read, write or delete we allow describe by default, the reverse does not apply to Deny. val ops = if (Describe == operation) - Set[Operation](operation, Read, Write) + Set[Operation](operation, Read, Write, Delete) else Set[Operation](operation) http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 85c47e6..d765c8a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -233,44 +233,48 @@ class KafkaApis(val requestChannel: RequestChannel, val responseBody = new OffsetCommitResponse(results.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } else { - // filter non-existent topics - val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) => - !metadataCache.contains(topicPartition.topic) + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition { + case (topicPartition, _) => { + val authorizedForDescribe = authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) + val exists = metadataCache.contains(topicPartition.topic) + if (!authorizedForDescribe && exists) + debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + + s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION") + authorizedForDescribe && exists + } } - val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys - val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { - case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) + val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition { + case (topicPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) } // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) { - val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code) - - mergedCommitStatus.foreach { case (topicPartition, errorCode) => - if (errorCode != Errors.NONE.code) { - debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}") + val combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++ + unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++ + nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) + + if (logger.isDebugEnabled()) //optimizing code as it's a loop + combinedCommitStatus.foreach { case (topicPartition, errorCode) => + if (errorCode != Errors.NONE.code) { + debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + + s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}") + } } - } - val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) - val responseHeader = new ResponseHeader(header.correlationId) val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } - if (authorizedRequestInfo.isEmpty) + if (authorizedTopics.isEmpty) sendResponseCallback(Map.empty) else if (header.apiVersion == 0) { // for version 0 always store offsets to ZK - val responseInfo = authorizedRequestInfo.map { + val responseInfo = authorizedTopics.map { case (topicPartition, partitionData) => val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic) try { - if (!metadataCache.hasTopicMetadata(topicPartition.topic)) - (topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize) + if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize) (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code) else { zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString) @@ -301,7 +305,7 @@ class KafkaApis(val requestChannel: RequestChannel, // - If v2 we use the default expiration timestamp val currentTimestamp = SystemTime.milliseconds val defaultExpireTimestamp = offsetRetention + currentTimestamp - val partitionData = authorizedRequestInfo.mapValues { partitionData => + val partitionData = authorizedTopics.mapValues { partitionData => val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata new OffsetAndMetadata( offsetMetadata = OffsetMetadata(partitionData.offset, metadata), @@ -336,15 +340,22 @@ class KafkaApis(val requestChannel: RequestChannel, val produceRequest = request.body.asInstanceOf[ProduceRequest] val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf - val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.partitionRecords.asScala.partition { + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition { + case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic) + } + + val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic)) } // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { - val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => - new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) + val mergedResponseStatus = responseStatus ++ + unauthorizedForWriteRequestInfo.mapValues(_ => + new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++ + nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => + new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Message.NoTimestamp)) var errorInResponse = false @@ -432,11 +443,18 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition { + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.requestInfo.partition { + case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicAndPartition.topic)) && metadataCache.contains(topicAndPartition.topic) + } + + val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { case (topicAndPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicAndPartition.topic)) } - val unauthorizedPartitionData = unauthorizedRequestInfo.map { case (tp, _) => + val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map { case (tp, _) => + (tp, FetchResponsePartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, MessageSet.Empty)) + } + val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map { case (tp, _) => (tp, FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty)) } @@ -466,7 +484,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } else responsePartitionData - val mergedPartitionData = convertedPartitionData ++ unauthorizedPartitionData + val mergedPartitionData = convertedPartitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData mergedPartitionData.foreach { case (topicAndPartition, data) => if (data.error != Errors.NONE.code) @@ -554,7 +572,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => - new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava) + new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, List[JLong]().asJava) ) val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) => @@ -605,7 +623,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => { - new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, + new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) }) @@ -775,7 +793,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (config.autoCreateTopicsEnable) { createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()) } } @@ -804,25 +822,34 @@ class KafkaApis(val requestChannel: RequestChannel, metadataRequest.topics.asScala.toSet } - var (authorizedTopics, unauthorizedTopics) = + var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic))) + var unauthorizedForCreateTopics = Set[String]() + if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { - authorizer.foreach { az => - if (!az.authorize(request.session, Create, Resource.ClusterResource)) { - authorizedTopics --= nonExistingTopics - unauthorizedTopics ++= nonExistingTopics - } + if (!authorize(request.session, Create, Resource.ClusterResource)) { + authorizedTopics --= nonExistingTopics + unauthorizedForCreateTopics ++= nonExistingTopics } } } - val unauthorizedTopicMetadata = unauthorizedTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Topic.isInternal(topic), + val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic => + new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) + // do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not + val unauthorizedForDescribeTopicMetadata = + // In case of all topics, don't include topics unauthorized for Describe + if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics) + Set.empty[MetadataResponse.TopicMetadata] + else + unauthorizedForDescribeTopics.map(topic => + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList())) + // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list val errorUnavailableEndpoints = requestVersion == 0 @@ -832,7 +859,7 @@ class KafkaApis(val requestChannel: RequestChannel, else getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints) - val completeTopicMetadata = topicMetadata ++ unauthorizedTopicMetadata + val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata val brokers = metadataCache.getAliveBrokers @@ -869,16 +896,15 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) } - val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code) - val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unknownTopicPartitionResponse)).toMap if (header.apiVersion == 0) { // version 0 reads offsets from ZK val responseInfo = authorizedTopicPartitions.map { topicPartition => val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic) try { - if (!metadataCache.hasTopicMetadata(topicPartition.topic)) + if (!metadataCache.contains(topicPartition.topic)) (topicPartition, unknownTopicPartitionResponse) else { val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1 @@ -1169,21 +1195,17 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDeleteTopicsRequest(request: RequestChannel.Request) { val deleteTopicRequest = request.body.asInstanceOf[DeleteTopicsRequest] - val (authorizedTopics, unauthorizedTopics) = deleteTopicRequest.topics.asScala.partition( topic => - authorize(request.session, Delete, new Resource(auth.Topic, topic)) + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition( topic => + authorize(request.session, Describe, new Resource(auth.Topic, topic)) && metadataCache.contains(topic) ) - val unauthorizedResults = unauthorizedTopics.map ( topic => - // Avoid leaking that the topic exists if the user is not authorized to describe the topic - if (authorize(request.session, Describe, new Resource(auth.Topic, topic))) { - (topic, Errors.TOPIC_AUTHORIZATION_FAILED) - } else { - (topic, Errors.INVALID_TOPIC_EXCEPTION) - } - ).toMap - + val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition( topic => + authorize(request.session, Delete, new Resource(auth.Topic, topic)) + ) + def sendResponseCallback(results: Map[String, Errors]): Unit = { - val completeResults = results ++ unauthorizedResults + val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map( topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ + unauthorizedForDeleteTopics.map( topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results val respHeader = new ResponseHeader(request.header.correlationId) val responseBody = new DeleteTopicsResponse(completeResults.asJava) trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.") @@ -1196,7 +1218,7 @@ class KafkaApis(val requestChannel: RequestChannel, }.toMap sendResponseCallback(results) } else { - // If no authorized topics return immediatly + // If no authorized topics return immediately if (authorizedTopics.isEmpty) sendResponseCallback(Map()) else { http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/core/src/main/scala/kafka/server/MetadataCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index f493e7d..feef6ae 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -120,12 +120,6 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } - def hasTopicMetadata(topic: String): Boolean = { - inReadLock(partitionMetadataLock) { - cache.contains(topic) - } - } - def getAllTopics(): Set[String] = { inReadLock(partitionMetadataLock) { cache.keySet.toSet http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 6d3b098..be41581 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -15,6 +15,7 @@ package kafka.api import java.nio.ByteBuffer import java.util import java.util.concurrent.ExecutionException +import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} import kafka.common @@ -22,7 +23,8 @@ import kafka.common.TopicAndPartition import kafka.security.auth._ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener +import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} @@ -36,6 +38,14 @@ import org.junit.{After, Assert, Before, Test} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.Buffer +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +import org.apache.kafka.common.KafkaException +import java.util.HashMap +import kafka.admin.AdminUtils class AuthorizerIntegrationTest extends BaseRequestTest { @@ -43,12 +53,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val brokerId: Integer = 0 val topic = "topic" + val topicPattern = "topic.*" val createTopic = "topic-new" val deleteTopic = "topic-delete" val part = 0 val correlationId = 0 val clientId = "client-Id" val tp = new TopicPartition(topic, part) + val topicAndPartition = new TopicAndPartition(topic, part) val group = "my-group" val topicResource = new Resource(Topic, topic) @@ -163,6 +175,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @After override def tearDown() = { producers.foreach(_.close()) + consumers.foreach(_.wakeup()) consumers.foreach(_.close()) removeAllAcls super.tearDown() @@ -276,14 +289,39 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + /* + * checking that whether the topic exists or not, when unauthorized, FETCH and PRODUCE do not leak the topic name + */ + @Test + def testAuthorizationWithTopicNotExisting() { + AdminUtils.deleteTopic(zkUtils, topic) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) + AdminUtils.deleteTopic(zkUtils, deleteTopic) + TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers) + + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.PRODUCE -> createProduceRequest, + ApiKeys.FETCH -> createFetchRequest, + ApiKeys.DELETE_TOPICS -> deleteTopicsRequest + ) + + for ((key, request) <- requestKeyToRequest) { + removeAllAcls + val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet + sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, topicExists = false) + for ((resource, acls) <- RequestKeysToAcls(key)) + addAndVerifyAcls(acls, resource) + sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, topicExists = false) + } + } + @Test def testProduceWithNoTopicAccess() { try { sendRecords(numRecords, tp) - fail("sendRecords should have thrown") + fail("should have thrown exception") } catch { - case e: TopicAuthorizationException => - assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) + case e: TimeoutException => //expected } } @@ -292,7 +330,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) try { sendRecords(numRecords, tp) - fail("sendRecords should have thrown") + fail("should have thrown exception") } catch { case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) @@ -304,7 +342,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) try { sendRecords(numRecords, tp) - fail("sendRecords should have thrown") + fail("should have thrown exception") } catch { case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) @@ -375,7 +413,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testConsumeWithNoTopicAccess() { + def testConsumeWithoutTopicDescribeAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() @@ -386,7 +424,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(this.consumers.head) Assert.fail("should have thrown exception") } catch { - case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + case e: KafkaException => //expected } } @@ -403,7 +441,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(this.consumers.head) Assert.fail("should have thrown exception") } catch { - case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } } @@ -421,7 +459,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { Assert.fail("should have thrown exception") } catch { case e: TopicAuthorizationException => - assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } } @@ -438,6 +476,125 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test + def testPatternSubscriptionWithNoTopicAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + this.consumers.head.poll(50) + assertTrue(this.consumers.head.subscription.isEmpty) + } + + @Test + def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + val consumer = consumers.head + consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + try { + consumeRecords(consumer) + Assert.fail("Expected TopicAuthorizationException") + } catch { + case e: TopicAuthorizationException => //expected + } + + } + + @Test + def testPatternSubscriptionWithTopicAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + + //create a unmatched topic + val unmatchedTopic = "unmatched" + TestUtils.createTopic(zkUtils, unmatchedTopic, 1, 1, this.servers) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic)) + sendRecords(1, new TopicPartition(unmatchedTopic, part)) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + val consumer = consumers.head + consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + + // set the subscription pattern to an internal topic that the consumer has no read permission for, but since + // `exclude.internal.topics` is true by default, the subscription should be empty and no authorization exception + // should be thrown + consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener) + assertTrue(consumer.poll(50).isEmpty) + } + + @Test + def testPatternSubscriptionMatchingInternalTopicWithNoPermission() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + + val consumerConfig = new Properties + consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, + securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) + try { + consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + assertEquals(Set[String](topic).asJava, consumer.subscription) + } finally consumer.close() + } + + @Test + def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + val internalTopicResource = new Resource(Topic, kafka.common.Topic.GroupMetadataTopicName) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), internalTopicResource) + + val consumerConfig = new Properties + consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, + securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) + try { + consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + Assert.fail("Expected TopicAuthorizationException") + } catch { + case e: TopicAuthorizationException => //expected + } finally consumer.close() + } + + @Test + def testPatternSubscriptionNotMatchingInternalTopic() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + + val consumerConfig = new Properties + consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, + securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) + try { + consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + } finally consumer.close() +} + + @Test def testCreatePermissionNeededToReadFromNonExistentTopic() { val newTopic = "newTopic" val topicPartition = new TopicPartition(newTopic, 0) @@ -451,7 +608,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { Assert.fail("should have thrown exception") } catch { case e: TopicAuthorizationException => - assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()); + assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource) @@ -466,7 +623,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) } - @Test(expected = classOf[TopicAuthorizationException]) + @Test(expected = classOf[KafkaException]) def testCommitWithNoTopicAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) @@ -512,7 +669,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.position(tp) } - @Test(expected = classOf[TopicAuthorizationException]) + @Test(expected = classOf[KafkaException]) def testOffsetFetchWithNoTopicAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.assign(List(tp).asJava) @@ -537,14 +694,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testListOffsetsWithNoTopicAccess() { - val e = intercept[TopicAuthorizationException] { - this.consumers.head.partitionsFor(topic) - } - assertEquals(Set(topic), e.unauthorizedTopics().asScala) + val partitionInfos = this.consumers.head.partitionsFor(topic) + assertNull(partitionInfos) } @Test - def testListOfsetsWithTopicDescribe() { + def testListOffsetsWithTopicDescribe() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) this.consumers.head.partitionsFor(topic) } @@ -554,7 +709,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val response = send(deleteTopicsRequest, ApiKeys.DELETE_TOPICS) val deleteResponse = DeleteTopicsResponse.parse(response) - assertEquals(Errors.INVALID_TOPIC_EXCEPTION, deleteResponse.errors.asScala.head._2) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteResponse.errors.asScala.head._2) } @Test @@ -585,24 +740,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest { def sendRequestAndVerifyResponseErrorCode(apiKey: ApiKeys, request: AbstractRequest, resources: Set[ResourceType], - isAuthorized: Boolean): AbstractRequestResponse = { + isAuthorized: Boolean, + topicExists: Boolean = true): AbstractRequestResponse = { val resp = send(request, apiKey) val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse] val errorCode = RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractRequestResponse) => Short](response) val possibleErrorCodes = resources.flatMap { resourceType => - if(resourceType == Topic) - // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION to prevent leaking topic names - Seq(resourceType.errorCode, Errors.INVALID_TOPIC_EXCEPTION.code()) + if (resourceType == Topic) + // When completely unauthorized topic resources must return an UNKNOWN_TOPIC_OR_PARTITION to prevent leaking topic names + Seq(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) else - Seq(resourceType.errorCode) + Seq(resourceType.errorCode) } - if (isAuthorized) - assertFalse(s"${apiKey} should be allowed. Found error code $errorCode", possibleErrorCodes.contains(errorCode)) + if (topicExists) + if (isAuthorized) + assertFalse(s"${apiKey} should be allowed. Found error code $errorCode", possibleErrorCodes.contains(errorCode)) + else + assertTrue(s"${apiKey} should be forbidden. Found error code $errorCode but expected one of ${possibleErrorCodes.mkString(",")} ", possibleErrorCodes.contains(errorCode)) else - assertTrue(s"${apiKey} should be forbidden. Found error code $errorCode but expected one of ${possibleErrorCodes.mkString(",")} ", possibleErrorCodes.contains(errorCode)) - + assertEquals(s"${apiKey} - Found error code $errorCode", Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), errorCode) + response } @@ -634,16 +793,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest { topic: String = topic, part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() - val maxIters = numRecords * 50 - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { - records.add(record) - } - if (iters > maxIters) - throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 + + val future = Future { + while (records.size < numRecords) + for (record <- consumer.poll(50).asScala) + records.add(record) + records } + val result = Await.result(future, 10 seconds) + for (i <- 0 until numRecords) { val record = records.get(i) val offset = startingOffset + i @@ -652,4 +810,5 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertEquals(offset.toLong, record.offset()) } } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 8edb6f8..2f5858c 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -19,7 +19,7 @@ package kafka.api import java.io.File import java.util.ArrayList -import java.util.concurrent.ExecutionException +import java.util.concurrent.{ExecutionException, TimeoutException => JTimeoutException} import kafka.admin.AclCommand import kafka.common.TopicAndPartition @@ -28,16 +28,19 @@ import kafka.server._ import kafka.utils._ import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerConfig} -import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig} +import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig, KafkaProducer} import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.{TopicPartition} +import org.apache.kafka.common.{TopicPartition,KafkaException} import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException} +import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException,TimeoutException} import org.junit.Assert._ import org.junit.{Test, After, Before} import scala.collection.JavaConverters._ - +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.util.{Failure, Success} /** * The test cases here verify that a producer authorized to publish to a topic @@ -107,6 +110,26 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { s"--topic=$topic", s"--producer", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def describeAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$topic", + s"--operation=Describe", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--remove", + s"--force", + s"--topic=$topic", + s"--operation=Describe", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--remove", + s"--force", + s"--topic=$topic", + s"--operation=Write", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def consumeAclArgs: Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", @@ -149,18 +172,28 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { } super.setUp AclCommand.main(topicBrokerReadAclArgs) - servers.foreach( s => + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*")) - ) + } // create the test topic with all the brokers as replicas TestUtils.createTopic(zkUtils, topic, 1, 3, this.servers) } + override def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = { + TestUtils.createNewProducer(brokerList, + maxBlockMs = 5000L, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + saslProperties = this.saslProperties, + props = Some(producerConfig)) + } + /** * Closes MiniKDC last when tearing down. */ @After override def tearDown { + consumers.foreach(_.wakeup()) super.tearDown closeSasl() } @@ -187,10 +220,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { private def setAclsAndProduce() { AclCommand.main(produceAclArgs) AclCommand.main(consumeAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) - }) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) @@ -203,35 +236,93 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { * isn't set. */ @Test - def testNoProduceAcl { + def testNoProduceWithoutDescribeAcl { //Produce records debug("Starting to send records") try{ sendRecords(numRecords, tp) - fail("Topic authorization exception expected") + fail("exception expected") } catch { - case e: TopicAuthorizationException => //expected + case e: TimeoutException => //expected } } - /** + @Test + def testNoProduceWithDescribeAcl { + AclCommand.main(describeAclArgs) + servers.foreach { s => + TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource) + } + //Produce records + debug("Starting to send records") + try{ + sendRecords(numRecords, tp) + fail("exception expected") + } catch { + case e: TopicAuthorizationException => //expected + } + } + + /** * Tests that a consumer fails to consume messages without the appropriate * ACL set. */ @Test - def testNoConsumeAcl { + def testNoConsumeWithoutDescribeAclViaAssign { + noConsumeWithoutDescribeAclSetup + consumers.head.assign(List(tp).asJava) + + try { + consumeRecords(this.consumers.head) + fail("exception expected") + } catch { + case e: KafkaException => //expected + } + } + + @Test + def testNoConsumeWithoutDescribeAclViaSubscribe { + noConsumeWithoutDescribeAclSetup + consumers.head.subscribe(List(topic).asJava) + + try { + consumeRecords(this.consumers.head) + fail("exception expected") + } catch { + case e: JTimeoutException => //expected + } + } + + private def noConsumeWithoutDescribeAclSetup { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) - }) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) - //Consume records + + //Deleting topic ACL + AclCommand.main(deleteDescribeAclArgs) + AclCommand.main(deleteWriteAclArgs) + servers.foreach { s => + TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + } + debug("Finished sending and starting to consume records") + } + + /** + * Tests that a consumer fails to consume messages without the appropriate + * ACL set. + */ + @Test + def testNoConsumeWithDescribeAclViaAssign { + noConsumeWithDescribeAclSetup consumers.head.assign(List(tp).asJava) + try { consumeRecords(this.consumers.head) fail("Topic authorization exception expected") @@ -239,6 +330,33 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { case e: TopicAuthorizationException => //expected } } + + @Test + def testNoConsumeWithDescribeAclViaSubscribe { + noConsumeWithDescribeAclSetup + consumers.head.subscribe(List(topic).asJava) + + try { + consumeRecords(this.consumers.head) + fail("Topic authorization exception expected") + } catch { + case e: TopicAuthorizationException => //expected + } + } + + private def noConsumeWithDescribeAclSetup { + AclCommand.main(produceAclArgs) + AclCommand.main(groupAclArgs) + servers.foreach { s => + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + } + //Produce records + debug("Starting to send records") + sendRecords(numRecords, tp) + //Consume records + debug("Finished sending and starting to consume records") + } /** * Tests that a consumer fails to consume messages without the appropriate @@ -247,9 +365,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { @Test def testNoGroupAcl { AclCommand.main(produceAclArgs) - servers.foreach(s => + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) - ) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) @@ -283,22 +401,22 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { topic: String = topic, part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() - val maxIters = numRecords * 50 - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { - records.add(record) - } - if (iters > maxIters) - throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 + + val future = Future { + while (records.size < numRecords) + for (record <- consumer.poll(50).asScala) + records.add(record) + records } + val result = Await.result(future, 10 seconds) + for (i <- 0 until numRecords) { val record = records.get(i) val offset = startingOffset + i assertEquals(topic, record.topic()) assertEquals(part, record.partition()) assertEquals(offset.toLong, record.offset()) - } + } } } + http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 9595ad6..ffca431 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -29,6 +29,8 @@ import kafka.integration.KafkaServerTestHarness import org.junit.{After, Before} import scala.collection.mutable.Buffer +import scala.util.control.Breaks.{breakable, break} +import java.util.ConcurrentModificationException /** * A helper class for writing integration tests that involve producers, consumers, and servers @@ -64,17 +66,9 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) consumerConfig.putAll(consumerSecurityProps) for (i <- 0 until producerCount) - producers += TestUtils.createNewProducer(brokerList, - securityProtocol = this.securityProtocol, - trustStoreFile = this.trustStoreFile, - saslProperties = this.saslProperties, - props = Some(producerConfig)) + producers += createNewProducer for (i <- 0 until consumerCount) { - consumers += TestUtils.createNewConsumer(brokerList, - securityProtocol = this.securityProtocol, - trustStoreFile = this.trustStoreFile, - saslProperties = this.saslProperties, - props = Some(consumerConfig)) + consumers += createNewConsumer } // create the consumer offset topic @@ -85,10 +79,42 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { servers.head.groupCoordinator.offsetsTopicConfigs) } + //extracted method to allow for different params in some specific tests + def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = { + TestUtils.createNewProducer(brokerList, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + saslProperties = this.saslProperties, + props = Some(producerConfig)) + } + + //extracted method to allow for different params in some specific tests + def createNewConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { + TestUtils.createNewConsumer(brokerList, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + saslProperties = this.saslProperties, + props = Some(consumerConfig)) + } + @After override def tearDown() { producers.foreach(_.close()) - consumers.foreach(_.close()) + + consumers.foreach { consumer => + breakable { + while(true) { + try { + consumer.close + break + } catch { + //short wait to make sure that woken up consumer can be closed without spurious ConcurrentModificationException + case e: ConcurrentModificationException => Thread.sleep(100L) + } + } + } + } + super.tearDown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index ea5a213..d39de75 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -26,7 +26,7 @@ import org.junit.Test import java.util.Properties import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition} -import org.apache.kafka.common.errors.InvalidTopicException +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException class DeleteTopicTest extends ZooKeeperTestHarness { @@ -206,9 +206,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // start topic deletion try { AdminUtils.deleteTopic(zkUtils, "test2") - fail("Expected InvalidTopicException") + fail("Expected UnknownTopicOrPartitionException") } catch { - case e: InvalidTopicException => // expected exception + case e: UnknownTopicOrPartitionException => // expected exception } // verify delete topic path for test2 is removed from zookeeper TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers) http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index a59316b..e04e1b7 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -58,7 +58,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { // Basic validateErrorDeleteTopicRequests(new DeleteTopicsRequest(Set("invalid-topic").asJava, timeout), - Map("invalid-topic" -> Errors.INVALID_TOPIC_EXCEPTION)) + Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION)) // Partial TestUtils.createTopic(zkUtils, "partial-topic-1", 1, 1, servers) @@ -67,7 +67,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { "partial-invalid-topic").asJava, timeout), Map( "partial-topic-1" -> Errors.NONE, - "partial-invalid-topic" -> Errors.INVALID_TOPIC_EXCEPTION + "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION ) ) http://git-wip-us.apache.org/repos/asf/kafka/blob/4cb726fd/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index 7b16ab0..1b1c593 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -64,6 +64,8 @@ Note: Because new protocols are introduced, it is important to upgrade your Kafk <li> Kafka clusters can now be uniquely identified by a cluster id. It will be automatically generated when a broker is upgraded to 0.10.1.0. The cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId metric and it is part of the Metadata response. Serializers, client interceptors and metric reporters can receive the cluster id by implementing the ClusterResourceListener interface. </li> <li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li> <li> The new Java Consumer now allows users to search offsets by timestamp on partitions. </li> + <li> When using an Authorizer and a user hasn't got <b>Describe</b> authorization on a topic, the broker will no longer return TOPIC_AUTHORIZATION_FAILED errors + but just UNKNOWN_TOPIC_OR_PARTITION errors, to avoid leaking topic names.</li> </ul> <h5><a id="upgrade_1010_new_protocols" href="#upgrade_1010_new_protocols">New Protocol Versions</a></h5>
