Repository: kafka Updated Branches: refs/heads/trunk 2a1b39ef1 -> c6e5a32d0
KAFKA-5856; AdminClient.createPartitions() follow up - Improve tests and javadoc (including expected exceptions) - Return correct authorization error if no describe topic permission Author: Tom Bentley <tbent...@redhat.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3937 from tombentley/KAFKA-5856-AdminClient.createPartitions-follow-up Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6e5a32d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6e5a32d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6e5a32d Branch: refs/heads/trunk Commit: c6e5a32d0464622ced4b7887923ca37c717b9e74 Parents: 2a1b39e Author: Tom Bentley <tbent...@redhat.com> Authored: Wed Oct 4 18:57:26 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Wed Oct 4 18:57:31 2017 +0100 ---------------------------------------------------------------------- .../apache/kafka/clients/admin/AdminClient.java | 42 ++- .../common/errors/InvalidTopicException.java | 4 + .../UnknownTopicOrPartitionException.java | 6 +- .../src/main/scala/kafka/admin/AdminUtils.scala | 3 +- .../main/scala/kafka/server/AdminManager.scala | 16 +- .../kafka/server/DelayedCreatePartitions.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 13 +- .../kafka/api/AdminClientIntegrationTest.scala | 332 ++++++++++++------- .../kafka/api/AuthorizerIntegrationTest.scala | 39 ++- 9 files changed, 302 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 636317c..fd695f9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -453,11 +453,12 @@ public abstract class AdminClient implements AutoCloseable { public abstract DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options); /** - * Increase the number of partitions of the topics given as the keys of {@code newPartitions} - * according to the corresponding values. + * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions} + * according to the corresponding values. <strong>If partitions are increased for a topic that has a key, + * the partition logic or ordering of the messages will be affected.</strong></p> * - * This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options. - * See the overload for more details. + * <p>This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options. + * See the overload for more details.</p> * * @param newPartitions The topics which should have new partitions created, and corresponding parameters * for the created partitions. @@ -468,17 +469,36 @@ public abstract class AdminClient implements AutoCloseable { } /** - * Increase the number of partitions of the topics given as the keys of {@code newPartitions} - * according to the corresponding values. + * <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions} + * according to the corresponding values. <strong>If partitions are increased for a topic that has a key, + * the partition logic or ordering of the messages will be affected.</strong></p> * - * This operation is not transactional so it may succeed for some topics while fail for others. + * <p>This operation is not transactional so it may succeed for some topics while fail for others.</p> * - * It may take several seconds after this method returns + * <p>It may take several seconds after this method returns * success for all the brokers to become aware that the partitions have been created. * During this time, {@link AdminClient#describeTopics(Collection)} - * may not return information about the new partitions. - * - * This operation is supported by brokers with version 1.0.0 or higher. + * may not return information about the new partitions.</p> + * + * <p>This operation is supported by brokers with version 1.0.0 or higher.</p> + * + * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the + * {@link CreatePartitionsResult#values() values()} method of the returned {@code CreatePartitionsResult}</p> + * <ul> + * <li>{@link org.apache.kafka.common.errors.AuthorizationException} + * if the authenticated user is not authorized to alter the topic</li> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.</li> + * <li>{@link org.apache.kafka.common.errors.ReassignmentInProgressException} + * if a partition reassignment is currently in progress</li> + * <li>{@link org.apache.kafka.common.errors.BrokerNotAvailableException} + * if the requested {@link NewPartitions#assignments()} contain a broker that is currently unavailable.</li> + * <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException} + * if no {@link NewPartitions#assignments()} are given and it is impossible for the broker to assign + * replicas with the topics replication factor.</li> + * <li>Subclasses of {@link org.apache.kafka.common.KafkaException} + * if the request is invalid in some way.</li> + * </ul> * * @param newPartitions The topics which should have new partitions created, and corresponding parameters * for the created partitions. http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java index 5c7b2be..f79e9a7 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java @@ -18,6 +18,10 @@ package org.apache.kafka.common.errors; /** * The client has attempted to perform an operation on an invalid topic. + * For example the topic name is too long, contains invalid characters etc. + * This exception is not retriable because the operation won't suddenly become valid. + * + * @see UnknownTopicOrPartitionException */ public class InvalidTopicException extends ApiException { http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java index 0f2a562..6d10945 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java @@ -17,7 +17,11 @@ package org.apache.kafka.common.errors; /** - * This topic/partition doesn't exist + * This topic/partition doesn't exist. + * This exception is used in contexts where a topic doesn't seem to exist based on possibly stale metadata. + * This exception is retriable because the topic or partition might subsequently be created. + * + * @see InvalidTopicException */ public class UnknownTopicOrPartitionException extends InvalidMetadataException { http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index ee987f1..32cab2a 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -304,6 +304,7 @@ object AdminUtils extends Logging with AdminUtilities { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, proposedAssignment, update = true) } proposedAssignment + } /** @@ -354,7 +355,7 @@ object AdminUtils extends Logging with AdminUtilities { val repFactors = sortedBadRepFactors.map { case (_, rf) => rf } throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " + s"partition 0 has ${existingAssignmentPartition0.size} while partitions [${partitions.mkString(", ")}] have " + - s"replication factors [${repFactors.mkString(", ")}], respectively") + s"replication factors [${repFactors.mkString(", ")}], respectively.") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/server/AdminManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 2c83c1b..aefdefd 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -122,15 +122,15 @@ class AdminManager(val config: KafkaConfig, else AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false) } - CreateTopicMetadata(topic, assignments, ApiError.NONE) + CreatePartitionsMetadata(topic, assignments, ApiError.NONE) } catch { // Log client errors at a lower level than unexpected exceptions case e@ (_: PolicyViolationException | _: ApiException) => info(s"Error processing create topic request for topic $topic with arguments $arguments", e) - CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e)) + CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e)) case e: Throwable => error(s"Error processing create topic request for topic $topic with arguments $arguments", e) - CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e)) + CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e)) } } @@ -219,7 +219,7 @@ class AdminManager(val config: KafkaConfig, case (topicPartition, replicas) => topicPartition.partition -> replicas } if (existingAssignment.isEmpty) - throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist") + throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.") val oldNumPartitions = existingAssignment.size val newNumPartitions = newPartition.totalCount @@ -238,7 +238,7 @@ class AdminManager(val config: KafkaConfig, s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.") if (assignments.size != numPartitionsIncrement) - throw new InvalidRequestException( + throw new InvalidReplicaAssignmentException( s"Increasing the number of partitions by $numPartitionsIncrement " + s"but ${assignments.size} assignments provided.") @@ -249,12 +249,12 @@ class AdminManager(val config: KafkaConfig, val updatedReplicaAssignment = AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers, newPartition.totalCount, reassignment, validateOnly = validateOnly) - CreateTopicMetadata(topic, updatedReplicaAssignment, ApiError.NONE) + CreatePartitionsMetadata(topic, updatedReplicaAssignment, ApiError.NONE) } catch { case e: AdminOperationException => - CreateTopicMetadata(topic, Map.empty, ApiError.fromThrowable(e)) + CreatePartitionsMetadata(topic, Map.empty, ApiError.fromThrowable(e)) case e: ApiException => - CreateTopicMetadata(topic, Map.empty, ApiError.fromThrowable(e)) + CreatePartitionsMetadata(topic, Map.empty, ApiError.fromThrowable(e)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala index 81a096e..0a99483 100644 --- a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala +++ b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala @@ -26,13 +26,13 @@ import scala.collection._ /** * The create metadata maintained by the delayed create topic or create partitions operations. */ -case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: ApiError) +case class CreatePartitionsMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: ApiError) /** * A delayed create topic or create partitions operation that is stored in the topic purgatory. */ class DelayedCreatePartitions(delayMs: Long, - createMetadata: Seq[CreateTopicMetadata], + createMetadata: Seq[CreatePartitionsMetadata], adminManager: AdminManager, responseCallback: Map[String, ApiError] => Unit) extends DelayedOperation(delayMs) { http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 13959b8..c171aaa 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1367,12 +1367,14 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorized, unauthorized) = notDuped.partition { case (topic, _) => authorize(request.session, Alter, new Resource(Topic, topic)) } + val (queuedForDeletion, valid) = authorized.partition { case (topic, _) => controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic) + } - val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request")) ++ - unauthorized.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) ++ + val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++ + unauthorized.keySet.map( topic => topic -> createPartitionsAuthorizationApiError(request.session, topic) ) ++ queuedForDeletion.keySet.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion.")) adminManager.createPartitions(createPartitionsRequest.timeout, valid, createPartitionsRequest.validateOnly, @@ -1380,6 +1382,13 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def createPartitionsAuthorizationApiError(session: RequestChannel.Session, topic: String): ApiError = { + if (authorize(session, Describe, new Resource(Topic, topic))) + new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null) + else + new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null) + } + def handleDeleteTopicsRequest(request: RequestChannel.Request) { val deleteTopicRequest = request.body[DeleteTopicsRequest] http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index e916efa..a18b217 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -376,158 +376,235 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { assertEquals(1, client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions.size) val validateOnly = new CreatePartitionsOptions().validateOnly(true) + val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false) - // assert that a validateOnly request doesn't increase the number of partitions + def partitions(topic: String) = + client.describeTopics(Set(topic).asJava).values.get(topic).get.partitions + + def numPartitions(topic: String) = + partitions(topic).size + + // validateOnly: try creating a new partition (no assignments), to bring the total to 3 partitions var alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(2)).asJava, validateOnly) + NewPartitions.increaseTo(3)).asJava, validateOnly) var altered = alterResult.values.get(topic1).get - // assert that the topics still has 1 partition - assertEquals(1, client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size) + assertEquals(1, numPartitions(topic1)) // try creating a new partition (no assignments), to bring the total to 3 partitions alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(3)).asJava) + NewPartitions.increaseTo(3)).asJava, actuallyDoIt) altered = alterResult.values.get(topic1).get - // assert that the topics now has 2 partitions - var actualPartitions = client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions - assertEquals(3, actualPartitions.size) + assertEquals(3, numPartitions(topic1)) + + // validateOnly: now try creating a new partition (with assignments), to bring the total to 3 partitions + val newPartition2Assignments = asList[util.List[Integer]](asList(0, 1), asList(1, 2)) + alterResult = client.createPartitions(Map(topic2 -> + NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, validateOnly) + altered = alterResult.values.get(topic2).get + assertEquals(1, numPartitions(topic2)) // now try creating a new partition (with assignments), to bring the total to 3 partitions alterResult = client.createPartitions(Map(topic2 -> - NewPartitions.increaseTo(3, asList(asList(0, 1), asList(1, 2)))).asJava) + NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, actuallyDoIt) altered = alterResult.values.get(topic2).get - // assert that the topics now has 3 partitions - actualPartitions = client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions - assertEquals(3, actualPartitions.size) - assertEquals(Seq(0, 1), actualPartitions.get(1).replicas.asScala.map(_.id)) - assertEquals(Seq(1, 2), actualPartitions.get(2).replicas.asScala.map(_.id)) + var actualPartitions2 = partitions(topic2) + assertEquals(3, actualPartitions2.size) + assertEquals(Seq(0, 1), actualPartitions2.get(1).replicas.asScala.map(_.id).toList) + assertEquals(Seq(1, 2), actualPartitions2.get(2).replicas.asScala.map(_.id).toList) + + // loop over error cases calling with+without validate-only + for (option <- Seq(validateOnly, actuallyDoIt)) { + val desc = if (option.validateOnly()) "validateOnly" else "validateOnly=false" + + // try a newCount which would be a decrease + alterResult = client.createPartitions(Map(topic1 -> + NewPartitions.increaseTo(1)).asJava, option) + try { + alterResult.values.get(topic1).get + fail(s"$desc: Expect InvalidPartitionsException when newCount is a decrease") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException]) + assertEquals(desc, "Topic currently has 3 partitions, which is higher than the requested 1.", e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic1)) + } - // try a newCount which would be a decrease - alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(1)).asJava, validateOnly) - try { - alterResult.values.get(topic1).get - fail("Expect InvalidPartitionsException when newCount is a decrease") - } catch { - case e: ExecutionException => - assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException]) - assertEquals("Topic currently has 3 partitions, which is higher than the requested 1.", e.getCause.getMessage) - } + // try a newCount which would be a noop (without assignment) + alterResult = client.createPartitions(Map(topic2 -> + NewPartitions.increaseTo(3)).asJava, option) + try { + alterResult.values.get(topic2).get + fail(s"$desc: Expect InvalidPartitionsException when requesting a noop") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException]) + assertEquals(desc, "Topic already has 3 partitions.", e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic2)) + } - // try a newCount which would be a noop - alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(3)).asJava, validateOnly) - try { - alterResult.values.get(topic1).get - fail("Expect InvalidPartitionsException when newCount == oldCount") - } catch { - case e: ExecutionException => - assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException]) - assertEquals("Topic already has 3 partitions.", e.getCause.getMessage) - } + // try a newCount which would be a noop (where the assignment matches current state) + alterResult = client.createPartitions(Map(topic2 -> + NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, option) + try { + alterResult.values.get(topic2).get + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException]) + assertEquals(desc, "Topic already has 3 partitions.", e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic2)) + } - // try a bad topic name - val unknownTopic = "an-unknown-topic" - alterResult = client.createPartitions(Map(unknownTopic -> - NewPartitions.increaseTo(2)).asJava, validateOnly) - try { - alterResult.values.get(unknownTopic).get - fail("Expect InvalidTopicException when using an unknown topic") - } catch { - case e: ExecutionException => - assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) - assertEquals("The topic 'an-unknown-topic' does not exist", e.getCause.getMessage) - } + // try a newCount which would be a noop (where the assignment doesn't match current state) + alterResult = client.createPartitions(Map(topic2 -> + NewPartitions.increaseTo(3, newPartition2Assignments.asScala.reverse.toList.asJava)).asJava, option) + try { + alterResult.values.get(topic2).get + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException]) + assertEquals(desc, "Topic already has 3 partitions.", e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic2)) + } - // try an invalid newCount - alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(-22)).asJava, validateOnly) - try { - altered = alterResult.values.get(topic1).get - fail("Expect InvalidPartitionsException when newCount is invalid") - } catch { - case e: ExecutionException => - assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException]) - assertEquals("Topic currently has 3 partitions, which is higher than the requested -22.", - e.getCause.getMessage) - } + // try a bad topic name + val unknownTopic = "an-unknown-topic" + alterResult = client.createPartitions(Map(unknownTopic -> + NewPartitions.increaseTo(2)).asJava, option) + try { + alterResult.values.get(unknownTopic).get + fail(s"$desc: Expect InvalidTopicException when using an unknown topic") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) + assertEquals(desc, "The topic 'an-unknown-topic' does not exist.", e.getCause.getMessage) + } - // try assignments where the number of brokers != replication factor - alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(4, asList(asList(1, 2)))).asJava, validateOnly) - try { - altered = alterResult.values.get(topic1).get - fail("Expect InvalidPartitionsException when #brokers != replication factor") - } catch { - case e: ExecutionException => - assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) - assertEquals("Inconsistent replication factor between partitions, partition 0 has 1 " + - "while partitions [3] have replication factors [2], respectively", - e.getCause.getMessage) - } + // try an invalid newCount + alterResult = client.createPartitions(Map(topic1 -> + NewPartitions.increaseTo(-22)).asJava, option) + try { + altered = alterResult.values.get(topic1).get + fail(s"$desc: Expect InvalidPartitionsException when newCount is invalid") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException]) + assertEquals(desc, "Topic currently has 3 partitions, which is higher than the requested -22.", + e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic1)) + } - // try #assignments incompatible with the increase - alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, validateOnly) - try { - altered = alterResult.values.get(topic1).get - fail("Expect InvalidRequestException when #assignments != newCount - oldCount") - } catch { - case e: ExecutionException => - assertTrue(e.getCause.isInstanceOf[InvalidRequestException]) - assertEquals("Increasing the number of partitions by 1 but 2 assignments provided.", e.getCause.getMessage) - } + // try assignments where the number of brokers != replication factor + alterResult = client.createPartitions(Map(topic1 -> + NewPartitions.increaseTo(4, asList(asList(1, 2)))).asJava, option) + try { + altered = alterResult.values.get(topic1).get + fail(s"$desc: Expect InvalidPartitionsException when #brokers != replication factor") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) + assertEquals(desc, "Inconsistent replication factor between partitions, partition 0 has 1 " + + "while partitions [3] have replication factors [2], respectively.", + e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic1)) + } - // try with duplicate brokers in assignments - alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(4, asList(asList(1, 1)))).asJava, validateOnly) - try { - altered = alterResult.values.get(topic1).get - fail("Expect InvalidReplicaAssignmentException when assignments has duplicate brokers") - } catch { - case e: ExecutionException => - assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) - assertEquals("Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3.", - e.getCause.getMessage) - } + // try #assignments < with the increase + alterResult = client.createPartitions(Map(topic1 -> + NewPartitions.increaseTo(6, asList(asList(1)))).asJava, option) + try { + altered = alterResult.values.get(topic1).get + fail(s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) + assertEquals(desc, "Increasing the number of partitions by 3 but 1 assignments provided.", e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic1)) + } - // try assignments with differently sized inner lists - alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(5, asList(asList(1), asList(1, 0)))).asJava, validateOnly) - try { - altered = alterResult.values.get(topic1).get - fail("Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists") - } catch { - case e: ExecutionException => - e.printStackTrace() - assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) - assertEquals("Inconsistent replication factor between partitions, partition 0 has 1 " + - "while partitions [4] have replication factors [2], respectively", e.getCause.getMessage) - } + // try #assignments > with the increase + alterResult = client.createPartitions(Map(topic1 -> + NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, option) + try { + altered = alterResult.values.get(topic1).get + fail(s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount - oldCount") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) + assertEquals(desc, "Increasing the number of partitions by 1 but 2 assignments provided.", e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic1)) + } - // try assignments with unknown brokers - alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(4, asList(asList(12)))).asJava, validateOnly) - try { - altered = alterResult.values.get(topic1).get - fail("Expect InvalidReplicaAssignmentException when assignments contains an unknown broker") - } catch { - case e: ExecutionException => - e.printStackTrace() - assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) - assertEquals("Unknown broker(s) in replica assignment: 12.", e.getCause.getMessage) + // try with duplicate brokers in assignments + alterResult = client.createPartitions(Map(topic1 -> + NewPartitions.increaseTo(4, asList(asList(1, 1)))).asJava, option) + try { + altered = alterResult.values.get(topic1).get + fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments has duplicate brokers") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) + assertEquals(desc, "Duplicate brokers not allowed in replica assignment: 1, 1 for partition id 3.", + e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic1)) + } + + // try assignments with differently sized inner lists + alterResult = client.createPartitions(Map(topic1 -> + NewPartitions.increaseTo(5, asList(asList(1), asList(1, 0)))).asJava, option) + try { + altered = alterResult.values.get(topic1).get + fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments have differently sized inner lists") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) + assertEquals(desc, "Inconsistent replication factor between partitions, partition 0 has 1 " + + "while partitions [4] have replication factors [2], respectively.", e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic1)) + } + + // try assignments with unknown brokers + alterResult = client.createPartitions(Map(topic1 -> + NewPartitions.increaseTo(4, asList(asList(12)))).asJava, option) + try { + altered = alterResult.values.get(topic1).get + fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments contains an unknown broker") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) + assertEquals(desc, "Unknown broker(s) in replica assignment: 12.", e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic1)) + } + + // try with empty assignments + alterResult = client.createPartitions(Map(topic1 -> + NewPartitions.increaseTo(4, Collections.emptyList())).asJava, option) + try { + altered = alterResult.values.get(topic1).get + fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments is empty") + } catch { + case e: ExecutionException => + assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) + assertEquals(desc, "Increasing the number of partitions by 1 but 0 assignments provided.", e.getCause.getMessage) + assertEquals(desc, 3, numPartitions(topic1)) + } } - // try with empty assignments - alterResult = client.createPartitions(Map(topic1 -> - NewPartitions.increaseTo(4, Collections.emptyList())).asJava, validateOnly) + // a mixed success, failure response + alterResult = client.createPartitions(Map( + topic1 -> NewPartitions.increaseTo(4), + topic2 -> NewPartitions.increaseTo(2)).asJava, actuallyDoIt) + // assert that the topic1 now has 4 partitions + altered = alterResult.values.get(topic1).get + assertEquals(4, numPartitions(topic1)) try { - altered = alterResult.values.get(topic1).get - fail("Expect InvalidRequestException when assignments is empty") + altered = alterResult.values.get(topic2).get } catch { case e: ExecutionException => - assertTrue(e.getCause.isInstanceOf[InvalidRequestException]) - assertEquals("Increasing the number of partitions by 1 but 0 assignments provided.", e.getCause.getMessage) + case e: ExecutionException => + assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException]) + assertEquals("Topic currently has 3 partitions, which is higher than the requested 2.", e.getCause.getMessage) + // assert that the topic2 still has 3 partitions + assertEquals(3, numPartitions(topic2)) } // finally, try to add partitions to a topic queued for deletion @@ -543,7 +620,6 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { assertTrue(e.getCause.isInstanceOf[InvalidTopicException]) assertEquals("The topic is queued for deletion.", e.getCause.getMessage) } - } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 728e958..d07d08e 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -28,6 +28,7 @@ import kafka.network.SocketServer import kafka.security.auth._ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils +import org.apache.kafka.clients.admin.NewPartitions import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.consumer._ @@ -86,12 +87,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) + val topicAlterAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter))) val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete))) val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs))) val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs))) val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) + val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() @@ -142,7 +145,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse], ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse], ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse], - ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse] + ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse], + ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse] ) val requestKeyToError = Map[ApiKeys, Nothing => Errors]( @@ -181,7 +185,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error), ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => resp.responses.get(tp)), ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) => - if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED) + if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED), + ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error) ) val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( @@ -217,7 +222,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl, ApiKeys.DELETE_ACLS -> clusterAlterAcl, ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl, - ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl + ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl, + ApiKeys.CREATE_PARTITIONS -> topicAlterAcl + ) @Before @@ -321,6 +328,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { build() } + private def createPartitionsRequest = { + new CreatePartitionsRequest.Builder( + Map(topic -> NewPartitions.increaseTo(10)).asJava, 10000, true + ).build() + } + private def heartbeatRequest = new HeartbeatRequest.Builder(group, 1, "").build() private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build() @@ -399,7 +412,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_ACLS -> deleteAclsRequest, ApiKeys.DESCRIBE_ACLS -> describeAclsRequest, ApiKeys.ALTER_REPLICA_LOG_DIRS -> alterReplicaLogDirsRequest, - ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest + ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest, + ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest ) for ((key, request) <- requestKeyToRequest) { @@ -971,6 +985,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Errors.NONE, deleteRecordsResponse.responses.asScala.head._2.error) } + @Test + def testUnauthorizedCreatePartitions() { + val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS) + val version = ApiKeys.CREATE_PARTITIONS.latestVersion + val createPartitionsResponse = CreatePartitionsResponse.parse(response, version) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, createPartitionsResponse.errors.asScala.head._2.error) + } + + @Test + def testCreatePartitionsWithWildCardAuth() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*")) + val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS) + val version = ApiKeys.CREATE_PARTITIONS.latestVersion + val createPartitionsResponse = CreatePartitionsResponse.parse(response, version) + assertEquals(Errors.NONE, createPartitionsResponse.errors.asScala.head._2.error) + } + @Test(expected = classOf[TransactionalIdAuthorizationException]) def testTransactionalProducerInitTransactionsNoWriteTransactionalIdAcl(): Unit = { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), transactionalIdResource)