Repository: kafka Updated Branches: refs/heads/trunk e3bdc84d8 -> fd6d7bcf3
KAFKA-4591; Create Topic Policy follow-up 1. Added javadoc to public classes 2. Removed `s` from config name for consistency with interface name 3. The policy interface now implements Configurable and AutoCloseable as per the KIP 4. Use `null` instead of `-1` in `RequestMetadata` 5. Perform all broker validation before invoking the policy 6. Add tests Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #2388 from ijuma/create-topic-policy-docs-and-config-name-change Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd6d7bcf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd6d7bcf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd6d7bcf Branch: refs/heads/trunk Commit: fd6d7bcf335166a524dc9a29a50c96af8f1c1c02 Parents: e3bdc84 Author: Ismael Juma <ism...@juma.me.uk> Authored: Wed Jan 18 02:43:10 2017 +0000 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Wed Jan 18 02:43:10 2017 +0000 ---------------------------------------------------------------------- .../common/errors/PolicyViolationException.java | 3 + .../apache/kafka/common/protocol/Errors.java | 2 +- .../kafka/server/policy/CreateTopicPolicy.java | 72 ++++++++++++++++++-- .../src/main/scala/kafka/admin/AdminUtils.scala | 36 ++++++---- .../main/scala/kafka/server/AdminManager.scala | 56 ++++++++++----- .../main/scala/kafka/server/KafkaConfig.scala | 6 +- .../AbstractCreateTopicsRequestTest.scala | 2 +- .../CreateTopicsRequestWithPolicyTest.scala | 59 +++++++++++++--- .../unit/kafka/server/KafkaConfigTest.scala | 2 +- 9 files changed, 183 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java index 7923444..393a6df 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/PolicyViolationException.java @@ -17,6 +17,9 @@ package org.apache.kafka.common.errors; +/** + * Exception thrown if a create topics request does not satisfy the configured policy for a topic. + */ public class PolicyViolationException extends ApiException { public PolicyViolationException(String message) { http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index f30f889..e7689e2 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -166,7 +166,7 @@ public enum Errors { " the message was sent to an incompatible broker. See the broker logs for more details.")), UNSUPPORTED_FOR_MESSAGE_FORMAT(43, new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")), - POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the system policy.")); + POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy.")); private static final Logger log = LoggerFactory.getLogger(Errors.class); http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java index 94f1e76..22a7c1d 100644 --- a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java +++ b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java @@ -13,42 +13,90 @@ package org.apache.kafka.server.policy; +import org.apache.kafka.common.Configurable; import org.apache.kafka.common.errors.PolicyViolationException; import java.util.Collections; import java.util.List; import java.util.Map; -public interface CreateTopicPolicy { +/** + * An interface for enforcing a policy on create topics requests. + * + * Common use cases are requiring that the replication factor, min.insync.replicas and/or retention settings for a + * topic are within an allowable range. + * + * If <code>create.topic.policy.class.name</code> is defined, Kafka will create an instance of the specified class + * using the default constructor and will then pass the broker configs to its <code>configure()</code> method. During + * broker shutdown, the <code>close()</code> method will be invoked so that resources can be released (if necessary). + */ +public interface CreateTopicPolicy extends Configurable, AutoCloseable { + /** + * Class containing the create request parameters. + */ class RequestMetadata { private final String topic; - private final int numPartitions; - private final short replicationFactor; + private final Integer numPartitions; + private final Short replicationFactor; private final Map<Integer, List<Integer>> replicasAssignments; private final Map<String, String> configs; - public RequestMetadata(String topic, int numPartitions, short replicationFactor, - Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) { + /** + * Create an instance of this class with the provided parameters. + * + * This constructor is public to make testing of <code>CreateTopicPolicy</code> implementations easier. + * + * @param topic the name of the topic to created. + * @param numPartitions the number of partitions to create or null if replicasAssignments is set. + * @param replicationFactor the replication factor for the topic or null if replicaAssignments is set. + * @param replicasAssignments replica assignments or null if numPartitions and replicationFactor is set. The + * assignment is a map from partition id to replica (broker) ids. + * @param configs topic configs for the topic to be created, not including broker defaults. Broker configs are + * passed via the {@code configure()} method of the policy implementation. + */ + public RequestMetadata(String topic, Integer numPartitions, Short replicationFactor, + Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) { this.topic = topic; this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; - this.replicasAssignments = Collections.unmodifiableMap(replicasAssignments); + this.replicasAssignments = replicasAssignments == null ? null : Collections.unmodifiableMap(replicasAssignments); this.configs = Collections.unmodifiableMap(configs); } + /** + * Return the name of the topic to create. + */ public String topic() { return topic; } - public int numPartitions() { + /** + * Return the number of partitions to create or null if replicaAssignments is not null. + */ + public Integer numPartitions() { return numPartitions; } + /** + * Return the number of replicas to create or null if replicaAssignments is not null. + */ + public Short replicationFactor() { + return replicationFactor; + } + + /** + * Return a map from partition id to replica (broker) ids or null if numPartitions and replicationFactor are + * set instead. + */ public Map<Integer, List<Integer>> replicasAssignments() { return replicasAssignments; } + /** + * Return topic configs in the request, not including broker defaults. Broker configs are passed via + * the {@code configure()} method of the policy implementation. + */ public Map<String, String> configs() { return configs; } @@ -63,5 +111,15 @@ public interface CreateTopicPolicy { } } + /** + * Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error + * message if the create request parameters for the provided topic do not satisfy this policy. + * + * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation + * failure only affects the relevant topic, other topics in the request will still be processed. + * + * @param requestMetadata the create request parameters for the provided topic. + * @throws PolicyViolationException if the request parameters do not satisfy this policy. + */ void validate(RequestMetadata requestMetadata) throws PolicyViolationException; } http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 916db48..02d5fe0 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -417,12 +417,11 @@ object AdminUtils extends Logging with AdminUtilities { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig) } - def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils, - topic: String, - partitionReplicaAssignment: Map[Int, Seq[Int]], - config: Properties = new Properties, - update: Boolean = false, - validateOnly: Boolean = false) { + def validateCreateOrUpdateTopic(zkUtils: ZkUtils, + topic: String, + partitionReplicaAssignment: Map[Int, Seq[Int]], + config: Properties, + update: Boolean): Unit = { // validate arguments Topic.validate(topic) @@ -450,18 +449,25 @@ object AdminUtils extends Logging with AdminUtilities { // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported - if (!update) { + if (!update) LogConfig.validate(config) - if (!validateOnly) { - // write out the config if there is any, this isn't transactional with the partition assignments - writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config) - } - } + } - if (!validateOnly) { - // create the partition assignment - writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update) + def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils, + topic: String, + partitionReplicaAssignment: Map[Int, Seq[Int]], + config: Properties = new Properties, + update: Boolean = false) { + validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update) + + // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported + if (!update) { + // write out the config if there is any, this isn't transactional with the partition assignments + writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config) } + + // create the partition assignment + writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update) } private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) { http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/main/scala/kafka/server/AdminManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 75a71f5..6dc224d 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -23,7 +23,7 @@ import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.log.LogConfig import kafka.metrics.KafkaMetricsGroup import kafka.utils._ -import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, PolicyViolationException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.CreateTopicsRequest._ @@ -43,7 +43,7 @@ class AdminManager(val config: KafkaConfig, private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId) private val createTopicPolicy = - Option(config.getConfiguredInstance(KafkaConfig.CreateTopicsPolicyClassNameProp, classOf[CreateTopicPolicy])) + Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy])) def hasDelayedTopicOperations = topicPurgatory.delayed() != 0 @@ -80,29 +80,49 @@ class AdminManager(val config: KafkaConfig, && !arguments.replicasAssignments.isEmpty) throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " + "Both cannot be used at the same time.") - else { - createTopicPolicy.foreach(_.validate(new RequestMetadata(topic, arguments.numPartitions, - arguments.replicationFactor, arguments.replicasAssignments, arguments.configs))) - - if (!arguments.replicasAssignments.isEmpty) { - // Note: we don't check that replicaAssignment doesn't contain unknown brokers - unlike in add-partitions case, - // this follows the existing logic in TopicCommand - arguments.replicasAssignments.asScala.map { case (partitionId, replicas) => - (partitionId.intValue, replicas.asScala.map(_.intValue)) - } - } else { - AdminUtils.assignReplicasToBrokers(brokers, arguments.numPartitions, arguments.replicationFactor) + else if (!arguments.replicasAssignments.isEmpty) { + // Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case, + // this follows the existing logic in TopicCommand + arguments.replicasAssignments.asScala.map { case (partitionId, replicas) => + (partitionId.intValue, replicas.asScala.map(_.intValue)) } - } + } else + AdminUtils.assignReplicasToBrokers(brokers, arguments.numPartitions, arguments.replicationFactor) } trace(s"Assignments for topic $topic are $assignments ") - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, - update = false, validateOnly = validateOnly) + + createTopicPolicy match { + case Some(policy) => + AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, assignments, configs, update = false) + + // Use `null` for unset fields in the public API + val numPartitions: java.lang.Integer = + if (arguments.numPartitions == NO_NUM_PARTITIONS) null else arguments.numPartitions + val replicationFactor: java.lang.Short = + if (arguments.replicationFactor == NO_REPLICATION_FACTOR) null else arguments.replicationFactor + val replicaAssignments = if (arguments.replicasAssignments.isEmpty) null else arguments.replicasAssignments + + policy.validate(new RequestMetadata(topic, numPartitions, replicationFactor, replicaAssignments, + arguments.configs)) + + if (!validateOnly) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false) + + case None => + if (validateOnly) + AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, assignments, configs, update = false) + else + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false) + } CreateTopicMetadata(topic, assignments, new CreateTopicsResponse.Error(Errors.NONE, null)) } catch { - case e: Throwable => + // Log client errors at a lower level than unexpected exceptions + case e@ (_: PolicyViolationException | _: ApiException) => info(s"Error processing create topic request for topic $topic with arguments $arguments", e) CreateTopicMetadata(topic, Map(), new CreateTopicsResponse.Error(Errors.forException(e), e.getMessage)) + case e: Throwable => + error(s"Error processing create topic request for topic $topic with arguments $arguments", e) + CreateTopicMetadata(topic, Map(), new CreateTopicsResponse.Error(Errors.forException(e), e.getMessage)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 891327f..3c2a72d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -281,7 +281,7 @@ object KafkaConfig { val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" val AutoCreateTopicsEnableProp = "auto.create.topics.enable" val MinInSyncReplicasProp = "min.insync.replicas" - val CreateTopicsPolicyClassNameProp = "create.topics.policy.class.name" + val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name" /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms" val DefaultReplicationFactorProp = "default.replication.factor" @@ -491,7 +491,7 @@ object KafkaConfig { "produce with acks of \"all\". This will ensure that the producer raises an exception " + "if a majority of replicas do not receive a write." - val CreateTopicsPolicyClassNameDoc = "The create topics policy class that should be used for validation. The class should " + + val CreateTopicPolicyClassNameDoc = "The create topic policy class that should be used for validation. The class should " + "implement the <code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface." /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels" @@ -693,7 +693,7 @@ object KafkaConfig { .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc) .define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc) .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, LogMessageTimestampDifferenceMaxMsDoc) - .define(CreateTopicsPolicyClassNameProp, CLASS, null, LOW, CreateTopicsPolicyClassNameDoc) + .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc) /** ********* Replication configuration ***********/ .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc) http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala index 76774bd..32e69c7 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala @@ -130,7 +130,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { } protected def replicaAssignmentToJava(assignments: Map[Int, List[Int]]) = { - assignments.map { case (k, v) => (k:Integer, v.map { i => i:Integer }.asJava) }.asJava + assignments.map { case (k, v) => (k: Integer, v.map { i => i: Integer }.asJava) }.asJava } protected def sendCreateTopicRequest(request: CreateTopicsRequest, socketServer: SocketServer = controllerSocketServer): CreateTopicsResponse = { http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala index 7127eaf..80f6e9e 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala @@ -17,6 +17,7 @@ package kafka.server +import java.util import java.util.Properties import kafka.utils.TestUtils @@ -34,16 +35,22 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest override def propertyOverrides(properties: Properties): Unit = { super.propertyOverrides(properties) - properties.put(KafkaConfig.CreateTopicsPolicyClassNameProp, classOf[Policy].getName) + properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[Policy].getName) } @Test def testValidCreateTopicsRequests() { val timeout = 10000 + validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder( Map("topic1" -> new CreateTopicsRequest.TopicDetails(5, 1.toShort)).asJava, timeout).build()) + validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder( Map("topic2" -> new CreateTopicsRequest.TopicDetails(5, 3.toShort)).asJava, timeout, true).build()) + + val assignments = replicaAssignmentToJava(Map(0 -> List(1, 0), 1 -> List(0, 1))) + validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder( + Map("topic3" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build()) } @Test @@ -54,28 +61,62 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest // Policy violations validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( - Map("topic3" -> new CreateTopicsRequest.TopicDetails(4, 1.toShort)).asJava, timeout).build(), - Map("topic3" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4")))) + Map("policy-topic1" -> new CreateTopicsRequest.TopicDetails(4, 1.toShort)).asJava, timeout).build(), + Map("policy-topic1" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4")))) + + validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( + Map("policy-topic2" -> new CreateTopicsRequest.TopicDetails(4, 1.toShort)).asJava, timeout, true).build(), + Map("policy-topic2" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4")))) + val assignments = replicaAssignmentToJava(Map(0 -> List(1), 1 -> List(0))) validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( - Map("topic4" -> new CreateTopicsRequest.TopicDetails(4, 1.toShort)).asJava, timeout, true).build(), - Map("topic4" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4")))) + Map("policy-topic3" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build(), + Map("policy-topic3" -> error(Errors.POLICY_VIOLATION, + Some("""Topic partitions should have at least 2 partitions, received 1 for partition 0"""))), checkErrorMessage = true) // Check that basic errors still work validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( Map(existingTopic -> new CreateTopicsRequest.TopicDetails(5, 1.toShort)).asJava, timeout).build(), Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("""Topic "existing-topic" already exists.""")))) + validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( Map("error-replication" -> new CreateTopicsRequest.TopicDetails(10, (numBrokers + 1).toShort)).asJava, timeout, true).build(), - Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR, Some("replication factor: 4 larger than available brokers: 3")))) + Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR, + Some("replication factor: 4 larger than available brokers: 3")))) + + validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( + Map("error-replication2" -> new CreateTopicsRequest.TopicDetails(10, -1: Short)).asJava, timeout, true).build(), + Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR, Some("replication factor must be larger than 0")))) } } object CreateTopicsRequestWithPolicyTest { + class Policy extends CreateTopicPolicy { - def validate(requestMetadata: RequestMetadata): Unit = - if (requestMetadata.numPartitions < 5) - throw new PolicyViolationException(s"Topics should have at least 5 partitions, received ${requestMetadata.numPartitions}") + def configure(configs: util.Map[String, _]): Unit = () + + def validate(requestMetadata: RequestMetadata): Unit = { + import requestMetadata._ + require(configs.isEmpty, s"Topic configs should be empty, but it is $configs") + if (numPartitions != null || replicationFactor != null) { + require(numPartitions != null, s"numPartitions should not be null, but it is $numPartitions") + require(replicationFactor != null, s"replicationFactor should not be null, but it is $replicationFactor") + require(replicasAssignments == null, s"replicaAssigments should be null, but it is $replicasAssignments") + if (numPartitions < 5) + throw new PolicyViolationException(s"Topics should have at least 5 partitions, received $numPartitions") + } else { + require(numPartitions == null, s"numPartitions should be null, but it is $numPartitions") + require(replicationFactor == null, s"replicationFactor should be null, but it is $replicationFactor") + require(replicasAssignments != null, s"replicaAssigments should not be null, but it is $replicasAssignments") + replicasAssignments.asScala.foreach { case (partitionId, assignment) => + if (assignment.size < 2) + throw new PolicyViolationException("Topic partitions should have at least 2 partitions, received " + + s"${assignment.size} for partition $partitionId") + } + } + } + + def close(): Unit = () } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fd6d7bcf/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index a5ca5a8..259178c 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -547,7 +547,7 @@ class KafkaConfigTest { case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.AuthorizerClassNameProp => //ignore string - case KafkaConfig.CreateTopicsPolicyClassNameProp => //ignore string + case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.HostNameProp => // ignore string