Repository: kafka
Updated Branches:
  refs/heads/trunk 2a1b39ef1 -> c6e5a32d0


KAFKA-5856; AdminClient.createPartitions() follow up

- Improve tests and javadoc (including expected exceptions)
- Return correct authorization error if no describe topic
permission

Author: Tom Bentley <tbent...@redhat.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3937 from tombentley/KAFKA-5856-AdminClient.createPartitions-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6e5a32d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6e5a32d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6e5a32d

Branch: refs/heads/trunk
Commit: c6e5a32d0464622ced4b7887923ca37c717b9e74
Parents: 2a1b39e
Author: Tom Bentley <tbent...@redhat.com>
Authored: Wed Oct 4 18:57:26 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Wed Oct 4 18:57:31 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/clients/admin/AdminClient.java |  42 ++-
 .../common/errors/InvalidTopicException.java    |   4 +
 .../UnknownTopicOrPartitionException.java       |   6 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |   3 +-
 .../main/scala/kafka/server/AdminManager.scala  |  16 +-
 .../kafka/server/DelayedCreatePartitions.scala  |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  13 +-
 .../kafka/api/AdminClientIntegrationTest.scala  | 332 ++++++++++++-------
 .../kafka/api/AuthorizerIntegrationTest.scala   |  39 ++-
 9 files changed, 302 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 636317c..fd695f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -453,11 +453,12 @@ public abstract class AdminClient implements 
AutoCloseable {
     public abstract DescribeReplicaLogDirsResult 
describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, 
DescribeReplicaLogDirsOptions options);
 
     /**
-     * Increase the number of partitions of the topics given as the keys of 
{@code newPartitions}
-     * according to the corresponding values.
+     * <p>Increase the number of partitions of the topics given as the keys of 
{@code newPartitions}
+     * according to the corresponding values. <strong>If partitions are 
increased for a topic that has a key,
+     * the partition logic or ordering of the messages will be 
affected.</strong></p>
      *
-     * This is a convenience method for {@link #createPartitions(Map, 
CreatePartitionsOptions)} with default options.
-     * See the overload for more details.
+     * <p>This is a convenience method for {@link #createPartitions(Map, 
CreatePartitionsOptions)} with default options.
+     * See the overload for more details.</p>
      *
      * @param newPartitions The topics which should have new partitions 
created, and corresponding parameters
      *                      for the created partitions.
@@ -468,17 +469,36 @@ public abstract class AdminClient implements 
AutoCloseable {
     }
 
     /**
-     * Increase the number of partitions of the topics given as the keys of 
{@code newPartitions}
-     * according to the corresponding values.
+     * <p>Increase the number of partitions of the topics given as the keys of 
{@code newPartitions}
+     * according to the corresponding values. <strong>If partitions are 
increased for a topic that has a key,
+     * the partition logic or ordering of the messages will be 
affected.</strong></p>
      *
-     * This operation is not transactional so it may succeed for some topics 
while fail for others.
+     * <p>This operation is not transactional so it may succeed for some 
topics while fail for others.</p>
      *
-     * It may take several seconds after this method returns
+     * <p>It may take several seconds after this method returns
      * success for all the brokers to become aware that the partitions have 
been created.
      * During this time, {@link AdminClient#describeTopics(Collection)}
-     * may not return information about the new partitions.
-     *
-     * This operation is supported by brokers with version 1.0.0 or higher.
+     * may not return information about the new partitions.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.0.0 or 
higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on the futures obtained from the
+     * {@link CreatePartitionsResult#values() values()} method of the returned 
{@code CreatePartitionsResult}</p>
+     * <ul>
+     *     <li>{@link org.apache.kafka.common.errors.AuthorizationException}
+     *     if the authenticated user is not authorized to alter the topic</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link 
CreatePartitionsOptions#timeoutMs()}.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.ReassignmentInProgressException}
+     *     if a partition reassignment is currently in progress</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.BrokerNotAvailableException}
+     *     if the requested {@link NewPartitions#assignments()} contain a 
broker that is currently unavailable.</li>
+     *     <li>{@link 
org.apache.kafka.common.errors.InvalidReplicationFactorException}
+     *     if no {@link NewPartitions#assignments()} are given and it is 
impossible for the broker to assign
+     *     replicas with the topics replication factor.</li>
+     *     <li>Subclasses of {@link org.apache.kafka.common.KafkaException}
+     *     if the request is invalid in some way.</li>
+     * </ul>
      *
      * @param newPartitions The topics which should have new partitions 
created, and corresponding parameters
      *                      for the created partitions.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
index 5c7b2be..f79e9a7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
@@ -18,6 +18,10 @@ package org.apache.kafka.common.errors;
 
 /**
  * The client has attempted to perform an operation on an invalid topic.
+ * For example the topic name is too long, contains invalid characters etc.
+ * This exception is not retriable because the operation won't suddenly become 
valid.
+ *
+ * @see UnknownTopicOrPartitionException
  */
 public class InvalidTopicException extends ApiException {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
index 0f2a562..6d10945 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
@@ -17,7 +17,11 @@
 package org.apache.kafka.common.errors;
 
 /**
- * This topic/partition doesn't exist
+ * This topic/partition doesn't exist.
+ * This exception is used in contexts where a topic doesn't seem to exist 
based on possibly stale metadata.
+ * This exception is retriable because the topic or partition might 
subsequently be created.
+ *
+ * @see InvalidTopicException
  */
 public class UnknownTopicOrPartitionException extends InvalidMetadataException 
{
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index ee987f1..32cab2a 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -304,6 +304,7 @@ object AdminUtils extends Logging with AdminUtilities {
       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, 
topic, proposedAssignment, update = true)
     }
     proposedAssignment
+
   }
 
   /**
@@ -354,7 +355,7 @@ object AdminUtils extends Logging with AdminUtilities {
       val repFactors = sortedBadRepFactors.map { case (_, rf) => rf }
       throw new InvalidReplicaAssignmentException(s"Inconsistent replication 
factor between partitions, " +
         s"partition 0 has ${existingAssignmentPartition0.size} while 
partitions [${partitions.mkString(", ")}] have " +
-        s"replication factors [${repFactors.mkString(", ")}], respectively")
+        s"replication factors [${repFactors.mkString(", ")}], respectively.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala 
b/core/src/main/scala/kafka/server/AdminManager.scala
index 2c83c1b..aefdefd 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -122,15 +122,15 @@ class AdminManager(val config: KafkaConfig,
             else
               
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, 
assignments, configs, update = false)
         }
-        CreateTopicMetadata(topic, assignments, ApiError.NONE)
+        CreatePartitionsMetadata(topic, assignments, ApiError.NONE)
       } catch {
         // Log client errors at a lower level than unexpected exceptions
         case e@ (_: PolicyViolationException | _: ApiException) =>
           info(s"Error processing create topic request for topic $topic with 
arguments $arguments", e)
-          CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e))
+          CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
         case e: Throwable =>
           error(s"Error processing create topic request for topic $topic with 
arguments $arguments", e)
-          CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e))
+          CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
       }
     }
 
@@ -219,7 +219,7 @@ class AdminManager(val config: KafkaConfig,
           case (topicPartition, replicas) => topicPartition.partition -> 
replicas
         }
         if (existingAssignment.isEmpty)
-          throw new UnknownTopicOrPartitionException(s"The topic '$topic' does 
not exist")
+          throw new UnknownTopicOrPartitionException(s"The topic '$topic' does 
not exist.")
 
         val oldNumPartitions = existingAssignment.size
         val newNumPartitions = newPartition.totalCount
@@ -238,7 +238,7 @@ class AdminManager(val config: KafkaConfig,
               s"Unknown broker(s) in replica assignment: 
${unknownBrokers.mkString(", ")}.")
 
           if (assignments.size != numPartitionsIncrement)
-            throw new InvalidRequestException(
+            throw new InvalidReplicaAssignmentException(
               s"Increasing the number of partitions by $numPartitionsIncrement 
" +
                 s"but ${assignments.size} assignments provided.")
 
@@ -249,12 +249,12 @@ class AdminManager(val config: KafkaConfig,
 
         val updatedReplicaAssignment = AdminUtils.addPartitions(zkUtils, 
topic, existingAssignment, allBrokers,
           newPartition.totalCount, reassignment, validateOnly = validateOnly)
-        CreateTopicMetadata(topic, updatedReplicaAssignment, ApiError.NONE)
+        CreatePartitionsMetadata(topic, updatedReplicaAssignment, 
ApiError.NONE)
       } catch {
         case e: AdminOperationException =>
-          CreateTopicMetadata(topic, Map.empty, ApiError.fromThrowable(e))
+          CreatePartitionsMetadata(topic, Map.empty, ApiError.fromThrowable(e))
         case e: ApiException =>
-          CreateTopicMetadata(topic, Map.empty, ApiError.fromThrowable(e))
+          CreatePartitionsMetadata(topic, Map.empty, ApiError.fromThrowable(e))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala 
b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
index 81a096e..0a99483 100644
--- a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
+++ b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
@@ -26,13 +26,13 @@ import scala.collection._
 /**
   * The create metadata maintained by the delayed create topic or create 
partitions operations.
   */
-case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, 
Seq[Int]], error: ApiError)
+case class CreatePartitionsMetadata(topic: String, replicaAssignments: 
Map[Int, Seq[Int]], error: ApiError)
 
 /**
   * A delayed create topic or create partitions operation that is stored in 
the topic purgatory.
   */
 class DelayedCreatePartitions(delayMs: Long,
-                              createMetadata: Seq[CreateTopicMetadata],
+                              createMetadata: Seq[CreatePartitionsMetadata],
                               adminManager: AdminManager,
                               responseCallback: Map[String, ApiError] => Unit)
   extends DelayedOperation(delayMs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 13959b8..c171aaa 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1367,12 +1367,14 @@ class KafkaApis(val requestChannel: RequestChannel,
       val (authorized, unauthorized) = notDuped.partition { case (topic, _) =>
         authorize(request.session, Alter, new Resource(Topic, topic))
       }
+
       val (queuedForDeletion, valid) = authorized.partition { case (topic, _) 
=>
         controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic)
+
       }
 
-      val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, 
"Duplicate topic in request")) ++
-        unauthorized.keySet.map(_ -> new 
ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) ++
+      val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, 
"Duplicate topic in request.")) ++
+        unauthorized.keySet.map( topic => topic -> 
createPartitionsAuthorizationApiError(request.session, topic) ) ++
         queuedForDeletion.keySet.map(_ -> new 
ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion."))
 
       adminManager.createPartitions(createPartitionsRequest.timeout, valid, 
createPartitionsRequest.validateOnly,
@@ -1380,6 +1382,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def createPartitionsAuthorizationApiError(session: 
RequestChannel.Session, topic: String): ApiError = {
+    if (authorize(session, Describe, new Resource(Topic, topic)))
+      new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)
+    else
+      new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null)
+  }
+
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {
     val deleteTopicRequest = request.body[DeleteTopicsRequest]
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index e916efa..a18b217 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -376,158 +376,235 @@ class AdminClientIntegrationTest extends 
KafkaServerTestHarness with Logging {
     assertEquals(1, 
client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions.size)
 
     val validateOnly = new CreatePartitionsOptions().validateOnly(true)
+    val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false)
 
-    // assert that a validateOnly request doesn't increase the number of 
partitions
+    def partitions(topic: String) =
+      client.describeTopics(Set(topic).asJava).values.get(topic).get.partitions
+
+    def numPartitions(topic: String) =
+      partitions(topic).size
+
+    // validateOnly: try creating a new partition (no assignments), to bring 
the total to 3 partitions
     var alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(2)).asJava, validateOnly)
+      NewPartitions.increaseTo(3)).asJava, validateOnly)
     var altered = alterResult.values.get(topic1).get
-    // assert that the topics still has 1 partition
-    assertEquals(1, 
client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size)
+    assertEquals(1, numPartitions(topic1))
 
     // try creating a new partition (no assignments), to bring the total to 3 
partitions
     alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(3)).asJava)
+      NewPartitions.increaseTo(3)).asJava, actuallyDoIt)
     altered = alterResult.values.get(topic1).get
-    // assert that the topics now has 2 partitions
-    var actualPartitions = 
client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions
-    assertEquals(3, actualPartitions.size)
+    assertEquals(3, numPartitions(topic1))
+
+    // validateOnly: now try creating a new partition (with assignments), to 
bring the total to 3 partitions
+    val newPartition2Assignments = asList[util.List[Integer]](asList(0, 1), 
asList(1, 2))
+    alterResult = client.createPartitions(Map(topic2 ->
+      NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, 
validateOnly)
+    altered = alterResult.values.get(topic2).get
+    assertEquals(1, numPartitions(topic2))
 
     // now try creating a new partition (with assignments), to bring the total 
to 3 partitions
     alterResult = client.createPartitions(Map(topic2 ->
-      NewPartitions.increaseTo(3, asList(asList(0, 1), asList(1, 2)))).asJava)
+      NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, 
actuallyDoIt)
     altered = alterResult.values.get(topic2).get
-    // assert that the topics now has 3 partitions
-    actualPartitions = 
client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions
-    assertEquals(3, actualPartitions.size)
-    assertEquals(Seq(0, 1), actualPartitions.get(1).replicas.asScala.map(_.id))
-    assertEquals(Seq(1, 2), actualPartitions.get(2).replicas.asScala.map(_.id))
+    var actualPartitions2 = partitions(topic2)
+    assertEquals(3, actualPartitions2.size)
+    assertEquals(Seq(0, 1), 
actualPartitions2.get(1).replicas.asScala.map(_.id).toList)
+    assertEquals(Seq(1, 2), 
actualPartitions2.get(2).replicas.asScala.map(_.id).toList)
+
+    // loop over error cases calling with+without validate-only
+    for (option <- Seq(validateOnly, actuallyDoIt)) {
+      val desc = if (option.validateOnly()) "validateOnly" else 
"validateOnly=false"
+
+      // try a newCount which would be a decrease
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(1)).asJava, option)
+      try {
+        alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidPartitionsException when newCount is a 
decrease")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException])
+          assertEquals(desc, "Topic currently has 3 partitions, which is 
higher than the requested 1.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
 
-    // try a newCount which would be a decrease
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(1)).asJava, validateOnly)
-    try {
-      alterResult.values.get(topic1).get
-      fail("Expect InvalidPartitionsException when newCount is a decrease")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
-        assertEquals("Topic currently has 3 partitions, which is higher than 
the requested 1.", e.getCause.getMessage)
-    }
+      // try a newCount which would be a noop (without assignment)
+      alterResult = client.createPartitions(Map(topic2 ->
+        NewPartitions.increaseTo(3)).asJava, option)
+      try {
+        alterResult.values.get(topic2).get
+        fail(s"$desc: Expect InvalidPartitionsException when requesting a 
noop")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException])
+          assertEquals(desc, "Topic already has 3 partitions.", 
e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic2))
+      }
 
-    // try a newCount which would be a noop
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(3)).asJava, validateOnly)
-    try {
-      alterResult.values.get(topic1).get
-      fail("Expect InvalidPartitionsException when newCount == oldCount")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
-        assertEquals("Topic already has 3 partitions.", e.getCause.getMessage)
-    }
+      // try a newCount which would be a noop (where the assignment matches 
current state)
+      alterResult = client.createPartitions(Map(topic2 ->
+        NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, option)
+      try {
+        alterResult.values.get(topic2).get
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException])
+          assertEquals(desc, "Topic already has 3 partitions.", 
e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic2))
+      }
 
-    // try a bad topic name
-    val unknownTopic = "an-unknown-topic"
-    alterResult = client.createPartitions(Map(unknownTopic ->
-      NewPartitions.increaseTo(2)).asJava, validateOnly)
-    try {
-      alterResult.values.get(unknownTopic).get
-      fail("Expect InvalidTopicException when using an unknown topic")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException])
-        assertEquals("The topic 'an-unknown-topic' does not exist", 
e.getCause.getMessage)
-    }
+      // try a newCount which would be a noop (where the assignment doesn't 
match current state)
+      alterResult = client.createPartitions(Map(topic2 ->
+        NewPartitions.increaseTo(3, 
newPartition2Assignments.asScala.reverse.toList.asJava)).asJava, option)
+      try {
+        alterResult.values.get(topic2).get
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException])
+          assertEquals(desc, "Topic already has 3 partitions.", 
e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic2))
+      }
 
-    // try an invalid newCount
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(-22)).asJava, validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidPartitionsException when newCount is invalid")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
-        assertEquals("Topic currently has 3 partitions, which is higher than 
the requested -22.",
-          e.getCause.getMessage)
-    }
+      // try a bad topic name
+      val unknownTopic = "an-unknown-topic"
+      alterResult = client.createPartitions(Map(unknownTopic ->
+        NewPartitions.increaseTo(2)).asJava, option)
+      try {
+        alterResult.values.get(unknownTopic).get
+        fail(s"$desc: Expect InvalidTopicException when using an unknown 
topic")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, 
e.getCause.isInstanceOf[UnknownTopicOrPartitionException])
+          assertEquals(desc, "The topic 'an-unknown-topic' does not exist.", 
e.getCause.getMessage)
+      }
 
-    // try assignments where the number of brokers != replication factor
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(4, asList(asList(1, 2)))).asJava, validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidPartitionsException when #brokers != replication 
factor")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
-        assertEquals("Inconsistent replication factor between partitions, 
partition 0 has 1 " +
-          "while partitions [3] have replication factors [2], respectively",
-          e.getCause.getMessage)
-    }
+      // try an invalid newCount
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(-22)).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidPartitionsException when newCount is 
invalid")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException])
+          assertEquals(desc, "Topic currently has 3 partitions, which is 
higher than the requested -22.",
+            e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
 
-    // try #assignments incompatible with the increase
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, 
validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidRequestException when #assignments != newCount - 
oldCount")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidRequestException])
-        assertEquals("Increasing the number of partitions by 1 but 2 
assignments provided.", e.getCause.getMessage)
-    }
+      // try assignments where the number of brokers != replication factor
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(4, asList(asList(1, 2)))).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidPartitionsException when #brokers != 
replication factor")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, 
e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Inconsistent replication factor between 
partitions, partition 0 has 1 " +
+            "while partitions [3] have replication factors [2], respectively.",
+            e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
 
-    // try with duplicate brokers in assignments
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(4, asList(asList(1, 1)))).asJava, validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidReplicaAssignmentException when assignments has 
duplicate brokers")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
-        assertEquals("Duplicate brokers not allowed in replica assignment: 1, 
1 for partition id 3.",
-          e.getCause.getMessage)
-    }
+      // try #assignments < with the increase
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(6, asList(asList(1)))).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when 
#assignments != newCount - oldCount")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, 
e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Increasing the number of partitions by 3 but 1 
assignments provided.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
 
-    // try assignments with differently sized inner lists
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(5, asList(asList(1), asList(1, 0)))).asJava, 
validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidReplicaAssignmentException when assignments have 
differently sized inner lists")
-    } catch {
-      case e: ExecutionException =>
-        e.printStackTrace()
-        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
-        assertEquals("Inconsistent replication factor between partitions, 
partition 0 has 1 " +
-          "while partitions [4] have replication factors [2], respectively", 
e.getCause.getMessage)
-    }
+      // try #assignments > with the increase
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, 
option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when 
#assignments != newCount - oldCount")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, 
e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Increasing the number of partitions by 1 but 2 
assignments provided.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
 
-    // try assignments with unknown brokers
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(4, asList(asList(12)))).asJava, validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidReplicaAssignmentException when assignments contains 
an unknown broker")
-    } catch {
-      case e: ExecutionException =>
-        e.printStackTrace()
-        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
-        assertEquals("Unknown broker(s) in replica assignment: 12.", 
e.getCause.getMessage)
+      // try with duplicate brokers in assignments
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(4, asList(asList(1, 1)))).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when 
assignments has duplicate brokers")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, 
e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Duplicate brokers not allowed in replica 
assignment: 1, 1 for partition id 3.",
+            e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
+
+      // try assignments with differently sized inner lists
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(5, asList(asList(1), asList(1, 0)))).asJava, 
option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when 
assignments have differently sized inner lists")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, 
e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Inconsistent replication factor between 
partitions, partition 0 has 1 " +
+            "while partitions [4] have replication factors [2], 
respectively.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
+
+      // try assignments with unknown brokers
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(4, asList(asList(12)))).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when 
assignments contains an unknown broker")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, 
e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Unknown broker(s) in replica assignment: 12.", 
e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
+
+      // try with empty assignments
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(4, Collections.emptyList())).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when 
assignments is empty")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, 
e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Increasing the number of partitions by 1 but 0 
assignments provided.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
     }
 
-    // try with empty assignments
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(4, Collections.emptyList())).asJava, 
validateOnly)
+    // a mixed success, failure response
+    alterResult = client.createPartitions(Map(
+      topic1 -> NewPartitions.increaseTo(4),
+      topic2 -> NewPartitions.increaseTo(2)).asJava, actuallyDoIt)
+    // assert that the topic1 now has 4 partitions
+    altered = alterResult.values.get(topic1).get
+    assertEquals(4, numPartitions(topic1))
     try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidRequestException when assignments is empty")
+      altered = alterResult.values.get(topic2).get
     } catch {
       case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidRequestException])
-        assertEquals("Increasing the number of partitions by 1 but 0 
assignments provided.", e.getCause.getMessage)
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
+        assertEquals("Topic currently has 3 partitions, which is higher than 
the requested 2.", e.getCause.getMessage)
+        // assert that the topic2 still has 3 partitions
+        assertEquals(3, numPartitions(topic2))
     }
 
     // finally, try to add partitions to a topic queued for deletion
@@ -543,7 +620,6 @@ class AdminClientIntegrationTest extends 
KafkaServerTestHarness with Logging {
         assertTrue(e.getCause.isInstanceOf[InvalidTopicException])
         assertEquals("The topic is queued for deletion.", 
e.getCause.getMessage)
     }
-
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 728e958..d07d08e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -28,6 +28,7 @@ import kafka.network.SocketServer
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
@@ -86,12 +87,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val topicReadAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val topicWriteAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
   val topicDescribeAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+  val topicAlterAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
   val topicDeleteAcl = Map(deleteTopicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
   val topicDescribeConfigsAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs)))
   val topicAlterConfigsAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs)))
   val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
   val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
 
+
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
   val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
@@ -142,7 +145,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse],
       ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse],
       ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse],
-      ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse]
+      ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse],
+      ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse]
   )
 
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@@ -181,7 +185,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => 
resp.responses.asScala.head.error.error),
     ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => 
resp.responses.get(tp)),
     ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
-      if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error 
else Errors.CLUSTER_AUTHORIZATION_FAILED)
+      if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error 
else Errors.CLUSTER_AUTHORIZATION_FAILED),
+    ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => 
resp.errors.asScala.find(_._1 == topic).get._2.error)
   )
 
   val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -217,7 +222,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl,
     ApiKeys.DELETE_ACLS -> clusterAlterAcl,
     ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl,
-    ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl
+    ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl,
+    ApiKeys.CREATE_PARTITIONS -> topicAlterAcl
+
   )
 
   @Before
@@ -321,6 +328,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       build()
   }
 
+  private def createPartitionsRequest = {
+    new CreatePartitionsRequest.Builder(
+      Map(topic -> NewPartitions.increaseTo(10)).asJava, 10000, true
+    ).build()
+  }
+
   private def heartbeatRequest = new HeartbeatRequest.Builder(group, 1, 
"").build()
 
   private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, 
"").build()
@@ -399,7 +412,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DELETE_ACLS -> deleteAclsRequest,
       ApiKeys.DESCRIBE_ACLS -> describeAclsRequest,
       ApiKeys.ALTER_REPLICA_LOG_DIRS -> alterReplicaLogDirsRequest,
-      ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest
+      ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest,
+      ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
@@ -971,6 +985,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertEquals(Errors.NONE, 
deleteRecordsResponse.responses.asScala.head._2.error)
   }
 
+  @Test
+  def testUnauthorizedCreatePartitions() {
+    val response = connectAndSend(createPartitionsRequest, 
ApiKeys.CREATE_PARTITIONS)
+    val version = ApiKeys.CREATE_PARTITIONS.latestVersion
+    val createPartitionsResponse = CreatePartitionsResponse.parse(response, 
version)
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
createPartitionsResponse.errors.asScala.head._2.error)
+  }
+
+  @Test
+  def testCreatePartitionsWithWildCardAuth() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Alter)), new Resource(Topic, "*"))
+    val response = connectAndSend(createPartitionsRequest, 
ApiKeys.CREATE_PARTITIONS)
+    val version = ApiKeys.CREATE_PARTITIONS.latestVersion
+    val createPartitionsResponse = CreatePartitionsResponse.parse(response, 
version)
+    assertEquals(Errors.NONE, 
createPartitionsResponse.errors.asScala.head._2.error)
+  }
+
   @Test(expected = classOf[TransactionalIdAuthorizationException])
   def testTransactionalProducerInitTransactionsNoWriteTransactionalIdAcl(): 
Unit = {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), transactionalIdResource)

Reply via email to