Updated Branches: refs/heads/trunk 167acb832 -> 1032bf740
kafka-1232; make TopicCommand more consistent; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1032bf74 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1032bf74 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1032bf74 Branch: refs/heads/trunk Commit: 1032bf74054a198d5d663713be1152a9216ca150 Parents: 167acb8 Author: Jun Rao <[email protected]> Authored: Thu Feb 6 21:00:17 2014 -0800 Committer: Jun Rao <[email protected]> Committed: Thu Feb 6 21:00:17 2014 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/admin/TopicCommand.scala | 125 +++++++++++-------- .../scala/kafka/utils/CommandLineUtils.scala | 24 +++- 2 files changed, 90 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1032bf74/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 65510eb..fc8d686 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -40,10 +40,9 @@ object TopicCommand { opts.parser.printHelpOn(System.err) System.exit(1) } - - CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) - if (!opts.options.has(opts.listOpt)) CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) - + + opts.checkArgs() + val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) try { @@ -67,10 +66,13 @@ object TopicCommand { } private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = { - val topicsSpec = opts.options.valueOf(opts.topicOpt) - val topicsFilter = new Whitelist(topicsSpec) - val allTopics = ZkUtils.getAllTopics(zkClient) - allTopics.filter(topicsFilter.isTopicAllowed).sorted + val allTopics = ZkUtils.getAllTopics(zkClient).sorted + if (opts.options.has(opts.topicOpt)) { + val topicsSpec = opts.options.valueOf(opts.topicOpt) + val topicsFilter = new Whitelist(topicsSpec) + allTopics.filter(topicsFilter.isTopicAllowed) + } else + allTopics } def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { @@ -107,10 +109,8 @@ object TopicCommand { val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) - println("adding partitions succeeded!") + println("Adding partitions succeeded!") } - if(opts.options.has(opts.replicationFactorOpt)) - Utils.croak("Changing the replication factor is not supported.") } } @@ -123,53 +123,49 @@ object TopicCommand { } def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { - if(opts.options.has(opts.topicsWithOverridesOpt)) { - ZkUtils.getAllTopics(zkClient).sorted.foreach { topic => - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) - if(configs.size() != 0) { - val replicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) - val numPartitions = replicaAssignment.size - val replicationFactor = replicaAssignment.head._2.size - println("\nTopic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s".format(topic, numPartitions, - replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) - } - } - } else { - for(topic <- ZkUtils.getAllTopics(zkClient).sorted) + val topics = getTopics(zkClient, opts) + for(topic <- topics) println(topic) - } } def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false + val reportOverriddenConfigs = if (opts.options.has(opts.topicsWithOverridesOpt)) true else false val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet for (topic <- topics) { ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match { case Some(topicPartitionAssignment) => + val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions + val describePartitions: Boolean = !reportOverriddenConfigs val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) - if (!reportUnavailablePartitions && !reportUnderReplicatedPartitions) { - println(topic) - val config = AdminUtils.fetchTopicConfig(zkClient, topic) - println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", ")) - println("\tpartitions: " + sortedPartitions.size) + if (describeConfigs) { + val configs = AdminUtils.fetchTopicConfig(zkClient, topic) + if (!reportOverriddenConfigs || configs.size() != 0) { + val numPartitions = topicPartitionAssignment.size + val replicationFactor = topicPartitionAssignment.head._2.size + println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s" + .format(topic, numPartitions, replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) + } } - for ((partitionId, assignedReplicas) <- sortedPartitions) { - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId) - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId) - if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) || - (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) || - (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) { - print("\t\ttopic: " + topic) - print("\tpartition: " + partitionId) - print("\tleader: " + (if(leader.isDefined) leader.get else "none")) - print("\treplicas: " + assignedReplicas.mkString(",")) - println("\tisr: " + inSyncReplicas.mkString(",")) + if (describePartitions) { + for ((partitionId, assignedReplicas) <- sortedPartitions) { + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId) + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId) + if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) || + (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) || + (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) { + print("\tTopic: " + topic) + print("\tPartition: " + partitionId) + print("\tLeader: " + (if(leader.isDefined) leader.get else "none")) + print("\tReplicas: " + assignedReplicas.mkString(",")) + println("\tIsr: " + inSyncReplicas.mkString(",")) + } } } case None => - println("topic " + topic + " doesn't exist!") + println("Topic " + topic + " doesn't exist!") } } } @@ -187,15 +183,15 @@ object TopicCommand { } def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = { - val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.split("""\s*=\s*""")) - if(opts.options.has(opts.createOpt)) - require(configsToBeDeleted.size == 0, "Invalid topic config: all configs on create topic must be in the format \"key=val\".") - require(configsToBeDeleted.forall(config => config.length == 1), - "Invalid topic config: all configs to be deleted must be in the format \"key\".") - val propsToBeDeleted = new Properties - configsToBeDeleted.foreach(pair => propsToBeDeleted.setProperty(pair(0).trim, "")) - LogConfig.validateNames(propsToBeDeleted) - configsToBeDeleted.map(pair => pair(0)) + if (opts.options.has(opts.deleteConfigOpt)) { + val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim()) + val propsToBeDeleted = new Properties + configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, "")) + LogConfig.validateNames(propsToBeDeleted) + configsToBeDeleted + } + else + Seq.empty } def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = { @@ -245,7 +241,7 @@ object TopicCommand { .withRequiredArg .describedAs("replication factor") .ofType(classOf[java.lang.Integer]) - val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.") + val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created or altered.") .withRequiredArg .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " + "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...") @@ -255,9 +251,32 @@ object TopicCommand { val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions", "if set when describing topics, only show partitions whose leader is not available") val topicsWithOverridesOpt = parser.accepts("topics-with-overrides", - "if set when listing topics, only show topics that have overridden configs") + "if set when describing topics, only show topics that have overridden configs") val options = parser.parse(args : _*) + + val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, deleteOpt, describeOpt, listOpt) + + def checkArgs() { + // check required args + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + if (!options.has(listOpt) && !options.has(describeOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) + + // check invalid args + CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, + allTopicLevelOpts -- Set(alterOpt, createOpt) + partitionsOpt + replicationFactorOpt) + CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt, + allTopicLevelOpts -- Set(describeOpt) + reportUnavailablePartitionsOpt + topicsWithOverridesOpt) + CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt, + allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt) + CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt, + allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt) + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1032bf74/core/src/main/scala/kafka/utils/CommandLineUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala index 5f563ca..726c302 100644 --- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala +++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -17,20 +17,32 @@ package kafka.utils import joptsimple.{OptionSpec, OptionSet, OptionParser} +import scala.collection.Set -/** + /** * Helper functions for dealing with command line utilities */ object CommandLineUtils extends Logging { - def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { - for(arg <- required) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") + def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { + for(arg <- required) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + } + + def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption: OptionSpec[_], invalidOptions: Set[OptionSpec[_]]) { + if(options.has(usedOption)) { + for(arg <- invalidOptions) { + if(options.has(arg)) { + System.err.println("Option \"" + usedOption + "\" can't be used with option\"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } } } - + } } \ No newline at end of file
