Repository: kafka
Updated Branches:
  refs/heads/trunk e3bdc84d8 -> fd6d7bcf3


KAFKA-4591; Create Topic Policy follow-up

1. Added javadoc to public classes
2. Removed `s` from config name for consistency with interface name
3. The policy interface now implements Configurable and AutoCloseable as per 
the KIP
4. Use `null` instead of `-1` in `RequestMetadata`
5. Perform all broker validation before invoking the policy
6. Add tests

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2388 from ijuma/create-topic-policy-docs-and-config-name-change


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd6d7bcf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd6d7bcf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd6d7bcf

Branch: refs/heads/trunk
Commit: fd6d7bcf335166a524dc9a29a50c96af8f1c1c02
Parents: e3bdc84
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Wed Jan 18 02:43:10 2017 +0000
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Wed Jan 18 02:43:10 2017 +0000

----------------------------------------------------------------------
 .../common/errors/PolicyViolationException.java |  3 +
 .../apache/kafka/common/protocol/Errors.java    |  2 +-
 .../kafka/server/policy/CreateTopicPolicy.java  | 72 ++++++++++++++++++--
 .../src/main/scala/kafka/admin/AdminUtils.scala | 36 ++++++----
 .../main/scala/kafka/server/AdminManager.scala  | 56 ++++++++++-----
 .../main/scala/kafka/server/KafkaConfig.scala   |  6 +-
 .../AbstractCreateTopicsRequestTest.scala       |  2 +-
 .../CreateTopicsRequestWithPolicyTest.scala     | 59 +++++++++++++---
 .../unit/kafka/server/KafkaConfigTest.scala     |  2 +-
 9 files changed, 183 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
index 7923444..393a6df 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.common.errors;
 
+/**
+ * Exception thrown if a create topics request does not satisfy the configured 
policy for a topic.
+ */
 public class PolicyViolationException extends ApiException {
 
     public PolicyViolationException(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index f30f889..e7689e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -166,7 +166,7 @@ public enum Errors {
             " the message was sent to an incompatible broker. See the broker 
logs for more details.")),
     UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
         new UnsupportedForMessageFormatException("The message format version 
on the broker does not support the request.")),
-    POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do 
not satisfy the system policy."));
+    POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do 
not satisfy the configured policy."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java 
b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
index 94f1e76..22a7c1d 100644
--- 
a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
+++ 
b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
@@ -13,42 +13,90 @@
 
 package org.apache.kafka.server.policy;
 
+import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.errors.PolicyViolationException;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public interface CreateTopicPolicy {
+/**
+ * An interface for enforcing a policy on create topics requests.
+ *
+ * Common use cases are requiring that the replication factor, 
min.insync.replicas and/or retention settings for a
+ * topic are within an allowable range.
+ *
+ * If <code>create.topic.policy.class.name</code> is defined, Kafka will 
create an instance of the specified class
+ * using the default constructor and will then pass the broker configs to its 
<code>configure()</code> method. During
+ * broker shutdown, the <code>close()</code> method will be invoked so that 
resources can be released (if necessary).
+ */
+public interface CreateTopicPolicy extends Configurable, AutoCloseable {
 
+    /**
+     * Class containing the create request parameters.
+     */
     class RequestMetadata {
         private final String topic;
-        private final int numPartitions;
-        private final short replicationFactor;
+        private final Integer numPartitions;
+        private final Short replicationFactor;
         private final Map<Integer, List<Integer>> replicasAssignments;
         private final Map<String, String> configs;
 
-        public RequestMetadata(String topic, int numPartitions, short 
replicationFactor,
-                               Map<Integer, List<Integer>> 
replicasAssignments, Map<String, String> configs) {
+        /**
+         * Create an instance of this class with the provided parameters.
+         *
+         * This constructor is public to make testing of 
<code>CreateTopicPolicy</code> implementations easier.
+         *
+         * @param topic the name of the topic to created.
+         * @param numPartitions the number of partitions to create or null if 
replicasAssignments is set.
+         * @param replicationFactor the replication factor for the topic or 
null if replicaAssignments is set.
+         * @param replicasAssignments replica assignments or null if 
numPartitions and replicationFactor is set. The
+         *                            assignment is a map from partition id to 
replica (broker) ids.
+         * @param configs topic configs for the topic to be created, not 
including broker defaults. Broker configs are
+         *                passed via the {@code configure()} method of the 
policy implementation.
+         */
+        public RequestMetadata(String topic, Integer numPartitions, Short 
replicationFactor,
+                        Map<Integer, List<Integer>> replicasAssignments, 
Map<String, String> configs) {
             this.topic = topic;
             this.numPartitions = numPartitions;
             this.replicationFactor = replicationFactor;
-            this.replicasAssignments = 
Collections.unmodifiableMap(replicasAssignments);
+            this.replicasAssignments = replicasAssignments == null ? null : 
Collections.unmodifiableMap(replicasAssignments);
             this.configs = Collections.unmodifiableMap(configs);
         }
 
+        /**
+         * Return the name of the topic to create.
+         */
         public String topic() {
             return topic;
         }
 
-        public int numPartitions() {
+        /**
+         * Return the number of partitions to create or null if 
replicaAssignments is not null.
+         */
+        public Integer numPartitions() {
             return numPartitions;
         }
 
+        /**
+         * Return the number of replicas to create or null if 
replicaAssignments is not null.
+         */
+        public Short replicationFactor() {
+            return replicationFactor;
+        }
+
+        /**
+         * Return a map from partition id to replica (broker) ids or null if 
numPartitions and replicationFactor are
+         * set instead.
+         */
         public Map<Integer, List<Integer>> replicasAssignments() {
             return replicasAssignments;
         }
 
+        /**
+         * Return topic configs in the request, not including broker defaults. 
Broker configs are passed via
+         * the {@code configure()} method of the policy implementation.
+         */
         public Map<String, String> configs() {
             return configs;
         }
@@ -63,5 +111,15 @@ public interface CreateTopicPolicy {
         }
     }
 
+    /**
+     * Validate the request parameters and throw a 
<code>PolicyViolationException</code> with a suitable error
+     * message if the create request parameters for the provided topic do not 
satisfy this policy.
+     *
+     * Clients will receive the POLICY_VIOLATION error code along with the 
exception's message. Note that validation
+     * failure only affects the relevant topic, other topics in the request 
will still be processed.
+     *
+     * @param requestMetadata the create request parameters for the provided 
topic.
+     * @throws PolicyViolationException if the request parameters do not 
satisfy this policy.
+     */
     void validate(RequestMetadata requestMetadata) throws 
PolicyViolationException;
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 916db48..02d5fe0 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -417,12 +417,11 @@ object AdminUtils extends Logging with AdminUtilities {
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, 
replicaAssignment, topicConfig)
   }
 
-  def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
-                                                     topic: String,
-                                                     
partitionReplicaAssignment: Map[Int, Seq[Int]],
-                                                     config: Properties = new 
Properties,
-                                                     update: Boolean = false,
-                                                     validateOnly: Boolean = 
false) {
+  def validateCreateOrUpdateTopic(zkUtils: ZkUtils,
+                                  topic: String,
+                                  partitionReplicaAssignment: Map[Int, 
Seq[Int]],
+                                  config: Properties,
+                                  update: Boolean): Unit = {
     // validate arguments
     Topic.validate(topic)
 
@@ -450,18 +449,25 @@ object AdminUtils extends Logging with AdminUtilities {
 
 
     // Configs only matter if a topic is being created. Changing configs via 
AlterTopic is not supported
-    if (!update) {
+    if (!update)
       LogConfig.validate(config)
-      if (!validateOnly) {
-        // write out the config if there is any, this isn't transactional with 
the partition assignments
-        writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, 
topic), config)
-      }
-    }
+  }
 
-    if (!validateOnly) {
-      // create the partition assignment
-      writeTopicPartitionAssignment(zkUtils, topic, 
partitionReplicaAssignment, update)
+  def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
+                                                     topic: String,
+                                                     
partitionReplicaAssignment: Map[Int, Seq[Int]],
+                                                     config: Properties = new 
Properties,
+                                                     update: Boolean = false) {
+    validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, 
config, update)
+
+    // Configs only matter if a topic is being created. Changing configs via 
AlterTopic is not supported
+    if (!update) {
+      // write out the config if there is any, this isn't transactional with 
the partition assignments
+      writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), 
config)
     }
+
+    // create the partition assignment
+    writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, 
update)
   }
 
   private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, 
replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala 
b/core/src/main/scala/kafka/server/AdminManager.scala
index 75a71f5..6dc224d 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -23,7 +23,7 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
-import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, 
PolicyViolationException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.CreateTopicsRequest._
@@ -43,7 +43,7 @@ class AdminManager(val config: KafkaConfig,
   private val topicPurgatory = 
DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
 
   private val createTopicPolicy =
-    
Option(config.getConfiguredInstance(KafkaConfig.CreateTopicsPolicyClassNameProp,
 classOf[CreateTopicPolicy]))
+    
Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, 
classOf[CreateTopicPolicy]))
 
   def hasDelayedTopicOperations = topicPurgatory.delayed() != 0
 
@@ -80,29 +80,49 @@ class AdminManager(val config: KafkaConfig,
             && !arguments.replicasAssignments.isEmpty)
             throw new InvalidRequestException("Both numPartitions or 
replicationFactor and replicasAssignments were set. " +
               "Both cannot be used at the same time.")
-          else {
-            createTopicPolicy.foreach(_.validate(new RequestMetadata(topic, 
arguments.numPartitions,
-              arguments.replicationFactor, arguments.replicasAssignments, 
arguments.configs)))
-
-            if (!arguments.replicasAssignments.isEmpty) {
-              // Note: we don't check that replicaAssignment doesn't contain 
unknown brokers - unlike in add-partitions case,
-              // this follows the existing logic in TopicCommand
-              arguments.replicasAssignments.asScala.map { case (partitionId, 
replicas) =>
-                (partitionId.intValue, replicas.asScala.map(_.intValue))
-              }
-            } else {
-              AdminUtils.assignReplicasToBrokers(brokers, 
arguments.numPartitions, arguments.replicationFactor)
+          else if (!arguments.replicasAssignments.isEmpty) {
+            // Note: we don't check that replicaAssignment contains unknown 
brokers - unlike in add-partitions case,
+            // this follows the existing logic in TopicCommand
+            arguments.replicasAssignments.asScala.map { case (partitionId, 
replicas) =>
+              (partitionId.intValue, replicas.asScala.map(_.intValue))
             }
-          }
+          } else
+            AdminUtils.assignReplicasToBrokers(brokers, 
arguments.numPartitions, arguments.replicationFactor)
         }
         trace(s"Assignments for topic $topic are $assignments ")
-        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, 
topic, assignments, configs,
-          update = false, validateOnly = validateOnly)
+
+        createTopicPolicy match {
+          case Some(policy) =>
+            AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, 
assignments, configs, update = false)
+
+            // Use `null` for unset fields in the public API
+            val numPartitions: java.lang.Integer =
+              if (arguments.numPartitions == NO_NUM_PARTITIONS) null else 
arguments.numPartitions
+            val replicationFactor: java.lang.Short =
+              if (arguments.replicationFactor == NO_REPLICATION_FACTOR) null 
else arguments.replicationFactor
+            val replicaAssignments = if 
(arguments.replicasAssignments.isEmpty) null else arguments.replicasAssignments
+
+            policy.validate(new RequestMetadata(topic, numPartitions, 
replicationFactor, replicaAssignments,
+              arguments.configs))
+
+            if (!validateOnly)
+              
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, 
assignments, configs, update = false)
+
+          case None =>
+            if (validateOnly)
+              AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, 
assignments, configs, update = false)
+            else
+              
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, 
assignments, configs, update = false)
+        }
         CreateTopicMetadata(topic, assignments, new 
CreateTopicsResponse.Error(Errors.NONE, null))
       } catch {
-        case e: Throwable =>
+        // Log client errors at a lower level than unexpected exceptions
+        case e@ (_: PolicyViolationException | _: ApiException) =>
           info(s"Error processing create topic request for topic $topic with 
arguments $arguments", e)
           CreateTopicMetadata(topic, Map(), new 
CreateTopicsResponse.Error(Errors.forException(e), e.getMessage))
+        case e: Throwable =>
+          error(s"Error processing create topic request for topic $topic with 
arguments $arguments", e)
+          CreateTopicMetadata(topic, Map(), new 
CreateTopicsResponse.Error(Errors.forException(e), e.getMessage))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 891327f..3c2a72d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -281,7 +281,7 @@ object KafkaConfig {
   val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
   val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"
-  val CreateTopicsPolicyClassNameProp = "create.topics.policy.class.name"
+  val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name"
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
   val DefaultReplicationFactorProp = "default.replication.factor"
@@ -491,7 +491,7 @@ object KafkaConfig {
     "produce with acks of \"all\". This will ensure that the producer raises 
an exception " +
     "if a majority of replicas do not receive a write."
 
-  val CreateTopicsPolicyClassNameDoc = "The create topics policy class that 
should be used for validation. The class should " +
+  val CreateTopicPolicyClassNameDoc = "The create topic policy class that 
should be used for validation. The class should " +
     "implement the 
<code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface."
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsDoc = "The socket timeout for 
controller-to-broker channels"
@@ -693,7 +693,7 @@ object KafkaConfig {
       .define(LogMessageFormatVersionProp, STRING, 
Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc)
       .define(LogMessageTimestampTypeProp, STRING, 
Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, 
LogMessageTimestampTypeDoc)
       .define(LogMessageTimestampDifferenceMaxMsProp, LONG, 
Defaults.LogMessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, 
LogMessageTimestampDifferenceMaxMsDoc)
-      .define(CreateTopicsPolicyClassNameProp, CLASS, null, LOW, 
CreateTopicsPolicyClassNameDoc)
+      .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, 
CreateTopicPolicyClassNameDoc)
 
       /** ********* Replication configuration ***********/
       .define(ControllerSocketTimeoutMsProp, INT, 
Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 76774bd..32e69c7 100644
--- 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -130,7 +130,7 @@ class AbstractCreateTopicsRequestTest extends 
BaseRequestTest {
   }
 
   protected def replicaAssignmentToJava(assignments: Map[Int, List[Int]]) = {
-    assignments.map { case (k, v) => (k:Integer, v.map { i => i:Integer 
}.asJava) }.asJava
+    assignments.map { case (k, v) => (k: Integer, v.map { i => i: Integer 
}.asJava) }.asJava
   }
 
   protected def sendCreateTopicRequest(request: CreateTopicsRequest, 
socketServer: SocketServer = controllerSocketServer): CreateTopicsResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index 7127eaf..80f6e9e 100644
--- 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.util
 import java.util.Properties
 
 import kafka.utils.TestUtils
@@ -34,16 +35,22 @@ class CreateTopicsRequestWithPolicyTest extends 
AbstractCreateTopicsRequestTest
 
   override def propertyOverrides(properties: Properties): Unit = {
     super.propertyOverrides(properties)
-    properties.put(KafkaConfig.CreateTopicsPolicyClassNameProp, 
classOf[Policy].getName)
+    properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, 
classOf[Policy].getName)
   }
 
   @Test
   def testValidCreateTopicsRequests() {
     val timeout = 10000
+
     validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
       Map("topic1" -> new CreateTopicsRequest.TopicDetails(5, 
1.toShort)).asJava, timeout).build())
+
     validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
       Map("topic2" -> new CreateTopicsRequest.TopicDetails(5, 
3.toShort)).asJava, timeout, true).build())
+
+    val assignments = replicaAssignmentToJava(Map(0 -> List(1, 0), 1 -> 
List(0, 1)))
+    validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
+      Map("topic3" -> new 
CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build())
   }
 
   @Test
@@ -54,28 +61,62 @@ class CreateTopicsRequestWithPolicyTest extends 
AbstractCreateTopicsRequestTest
 
     // Policy violations
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("topic3" -> new CreateTopicsRequest.TopicDetails(4, 
1.toShort)).asJava, timeout).build(),
-      Map("topic3" -> error(Errors.POLICY_VIOLATION, Some("Topics should have 
at least 5 partitions, received 4"))))
+      Map("policy-topic1" -> new CreateTopicsRequest.TopicDetails(4, 
1.toShort)).asJava, timeout).build(),
+      Map("policy-topic1" -> error(Errors.POLICY_VIOLATION, Some("Topics 
should have at least 5 partitions, received 4"))))
+
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
+      Map("policy-topic2" -> new CreateTopicsRequest.TopicDetails(4, 
1.toShort)).asJava, timeout, true).build(),
+      Map("policy-topic2" -> error(Errors.POLICY_VIOLATION, Some("Topics 
should have at least 5 partitions, received 4"))))
 
+    val assignments = replicaAssignmentToJava(Map(0 -> List(1), 1 -> List(0)))
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
-      Map("topic4" -> new CreateTopicsRequest.TopicDetails(4, 
1.toShort)).asJava, timeout, true).build(),
-      Map("topic4" -> error(Errors.POLICY_VIOLATION, Some("Topics should have 
at least 5 partitions, received 4"))))
+      Map("policy-topic3" -> new 
CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build(),
+      Map("policy-topic3" -> error(Errors.POLICY_VIOLATION,
+        Some("""Topic partitions should have at least 2 partitions, received 1 
for partition 0"""))), checkErrorMessage = true)
 
     // Check that basic errors still work
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
       Map(existingTopic -> new CreateTopicsRequest.TopicDetails(5, 
1.toShort)).asJava, timeout).build(),
       Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("""Topic 
"existing-topic" already exists."""))))
+
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
       Map("error-replication" -> new CreateTopicsRequest.TopicDetails(10, 
(numBrokers + 1).toShort)).asJava, timeout, true).build(),
-      Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR, 
Some("replication factor: 4 larger than available brokers: 3"))))
+      Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
+        Some("replication factor: 4 larger than available brokers: 3"))))
+
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
+      Map("error-replication2" -> new CreateTopicsRequest.TopicDetails(10, -1: 
Short)).asJava, timeout, true).build(),
+      Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR, 
Some("replication factor must be larger than 0"))))
   }
 
 }
 
 object CreateTopicsRequestWithPolicyTest {
+
   class Policy extends CreateTopicPolicy {
-    def validate(requestMetadata: RequestMetadata): Unit =
-      if (requestMetadata.numPartitions < 5)
-        throw new PolicyViolationException(s"Topics should have at least 5 
partitions, received ${requestMetadata.numPartitions}")
+    def configure(configs: util.Map[String, _]): Unit = ()
+
+    def validate(requestMetadata: RequestMetadata): Unit = {
+      import requestMetadata._
+      require(configs.isEmpty, s"Topic configs should be empty, but it is 
$configs")
+      if (numPartitions != null || replicationFactor != null) {
+        require(numPartitions != null, s"numPartitions should not be null, but 
it is $numPartitions")
+        require(replicationFactor != null, s"replicationFactor should not be 
null, but it is $replicationFactor")
+        require(replicasAssignments == null, s"replicaAssigments should be 
null, but it is $replicasAssignments")
+        if (numPartitions < 5)
+          throw new PolicyViolationException(s"Topics should have at least 5 
partitions, received $numPartitions")
+      } else {
+        require(numPartitions == null, s"numPartitions should be null, but it 
is $numPartitions")
+        require(replicationFactor == null, s"replicationFactor should be null, 
but it is $replicationFactor")
+        require(replicasAssignments != null, s"replicaAssigments should not be 
null, but it is $replicasAssignments")
+        replicasAssignments.asScala.foreach { case (partitionId, assignment) =>
+          if (assignment.size < 2)
+            throw new PolicyViolationException("Topic partitions should have 
at least 2 partitions, received " +
+              s"${assignment.size} for partition $partitionId")
+        }
+      }
+    }
+
+    def close(): Unit = ()
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a5ca5a8..259178c 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -547,7 +547,7 @@ class KafkaConfigTest {
         case KafkaConfig.RequestTimeoutMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string
-        case KafkaConfig.CreateTopicsPolicyClassNameProp => //ignore string
+        case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string
 
         case KafkaConfig.PortProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.HostNameProp => // ignore string

Reply via email to