Repository: kafka Updated Branches: refs/heads/0.9.0 2261763bc -> 6483ae685
KAFKA-2831; Do not use ZKUtils in `ConsumerGroupCommand` if `new-consumer` is used Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ashish Singh <[email protected]>, Jun Rao <[email protected]> Closes #528 from ijuma/kafka-2831-consumer-group-command-zookeeper-new-consumer (cherry picked from commit 5fc4546de7f238a8ee9c6f0b4fe276f0da47707c) Signed-off-by: Jun Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6483ae68 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6483ae68 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6483ae68 Branch: refs/heads/0.9.0 Commit: 6483ae68507d3c864c9087116c6deb194620db18 Parents: 2261763 Author: Ismael Juma <[email protected]> Authored: Mon Nov 16 14:04:26 2015 -0800 Committer: Jun Rao <[email protected]> Committed: Mon Nov 16 14:04:32 2015 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/admin/AdminClient.scala | 16 +- .../kafka/admin/ConsumerGroupCommand.scala | 542 ++++++++++--------- 2 files changed, 310 insertions(+), 248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6483ae68/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 1dea28b..181080f 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -146,11 +146,10 @@ class AdminClient(val time: Time, GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members) } - case class ConsumerSummary( - memberId: String, - clientId: String, - clientHost: String, - assignment: List[TopicPartition]) + case class ConsumerSummary(memberId: String, + clientId: String, + clientHost: String, + assignment: List[TopicPartition]) def describeConsumerGroup(groupId: String): List[ConsumerSummary] = { val group = describeGroup(groupId) @@ -169,6 +168,11 @@ class AdminClient(val time: Time, List.empty } } + + def close() { + client.close() + } + } object AdminClient { @@ -249,4 +253,4 @@ object AdminClient { highLevelClient, bootstrapCluster.nodes().asScala.toList) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6483ae68/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index c29efe4..2d95767 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -17,7 +17,6 @@ package kafka.admin - import java.util.Properties import joptsimple.{OptionParser, OptionSpec} @@ -33,7 +32,7 @@ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.utils.Utils -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.{Set, mutable} object ConsumerGroupCommand { @@ -41,306 +40,352 @@ object ConsumerGroupCommand { def main(args: Array[String]) { val opts = new ConsumerGroupCommandOptions(args) - if(args.length == 0) + if (args.length == 0) CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.") // should have exactly one action val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) - if(actions != 1) + if (actions != 1) CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete") opts.checkArgs() - val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt), - 30000, - 30000, - JaasUtils.isZkSecurityEnabled()) + val consumerGroupService = { + if (opts.options.has(opts.newConsumerOpt)) new KafkaConsumerGroupService(opts) + else new ZkConsumerGroupService(opts) + } try { if (opts.options.has(opts.listOpt)) - list(zkUtils, opts) + consumerGroupService.list() else if (opts.options.has(opts.describeOpt)) - describe(zkUtils, opts) - else if (opts.options.has(opts.deleteOpt)) - delete(zkUtils, opts) + consumerGroupService.describe() + else if (opts.options.has(opts.deleteOpt)) { + consumerGroupService match { + case service: ZkConsumerGroupService => service.delete() + case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService") + } + } } catch { case e: Throwable => println("Error while executing consumer group command " + e.getMessage) println(Utils.stackTrace(e)) } finally { - zkUtils.close() + consumerGroupService.close() } } - def list(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { - val useNewConsumer = opts.options.has(opts.newConsumerOpt) - if (!useNewConsumer) - zkUtils.getConsumerGroups().foreach(println) - else { - val adminClient = createAndGetAdminClient(opts) - adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId)) - } + private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = { + val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*""")) + require(configsToBeAdded.forall(config => config.length == 2), + "Invalid config: all configs to be added must be in the format \"key=val\".") + val props = new Properties + configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) + props } - def createAndGetAdminClient(opts: ConsumerGroupCommandOptions): AdminClient = { - AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt)) - } + sealed trait ConsumerGroupService { - def describe(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { - val useNewConsumer = opts.options.has(opts.newConsumerOpt) - val group = opts.options.valueOf(opts.groupOpt) - val configs = parseConfigs(opts) - val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt - val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt + def list(): Unit - println("%s, %s, %s, %s, %s, %s, %s" - .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) + def describe() { + describeGroup(opts.options.valueOf(opts.groupOpt)) + } - if (!useNewConsumer) { - val topics = zkUtils.getTopicsByConsumerGroup(group) - if (topics.isEmpty) { - println("No topic available for consumer group provided") - } else { - topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) - } - } else { - val consumers = createAndGetAdminClient(opts).describeConsumerGroup(group) + def close(): Unit - if (consumers.isEmpty) { - println(s"Consumer group, ${group}, does not exist or is rebalancing.") - } else { - consumers.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x.assignment.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), Option("%s_%s".format(x.clientId, x.clientHost)))) + protected def opts: ConsumerGroupCommandOptions + + protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult + + protected def describeGroup(group: String): Unit + + protected def describeTopicPartition(group: String, + topicPartitions: Seq[TopicAndPartition], + getPartitionOffset: TopicAndPartition => Option[Long], + getOwner: TopicAndPartition => Option[String]): Unit = { + topicPartitions + .sortBy { case topicPartition => topicPartition.partition } + .foreach { topicPartition => + describePartition(group, topicPartition.topic, topicPartition.partition, getPartitionOffset(topicPartition), + getOwner(topicPartition)) + } + } + + protected def printDescribeHeader() { + println("GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER") + } + + private def describePartition(group: String, + topic: String, + partition: Int, + offsetOpt: Option[Long], + ownerOpt: Option[String]) { + def print(logEndOffset: Option[Long]): Unit = { + val lag = offsetOpt.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset)) + println(Seq(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset.getOrElse("unknown"), + lag.getOrElse("unknown"), ownerOpt.getOrElse("none")).mkString(", ")) + } + getLogEndOffset(topic, partition) match { + case LogEndOffsetResult.LogEndOffset(logEndOffset) => print(Some(logEndOffset)) + case LogEndOffsetResult.Unknown => print(None) + case LogEndOffsetResult.Ignore => } } + } - def delete(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { - if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) { - deleteForTopic(zkUtils, opts) + class ZkConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService { + + private val zkUtils = { + val zkUrl = opts.options.valueOf(opts.zkConnectOpt) + ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled) } - else if (opts.options.has(opts.groupOpt)) { - deleteForGroup(zkUtils, opts) + + def close() { + zkUtils.close() } - else if (opts.options.has(opts.topicOpt)) { - deleteAllForTopic(zkUtils, opts) + + def list() { + zkUtils.getConsumerGroups().foreach(println) } - } - private def deleteForGroup(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { - val groups = opts.options.valuesOf(opts.groupOpt) - groups.foreach { group => - try { - if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group)) - println("Deleted all consumer group information for group %s in zookeeper.".format(group)) + def delete() { + if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) + deleteForTopic() + else if (opts.options.has(opts.groupOpt)) + deleteForGroup() + else if (opts.options.has(opts.topicOpt)) + deleteAllForTopic() + } + + protected def describeGroup(group: String) { + val configs = parseConfigs(opts) + val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt + val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt + val topics = zkUtils.getTopicsByConsumerGroup(group) + if (topics.isEmpty) + println("No topic available for consumer group provided") + printDescribeHeader() + topics.foreach(topic => describeTopic(group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) + } + + private def describeTopic(group: String, + topic: String, + channelSocketTimeoutMs: Int, + channelRetryBackoffMs: Int) { + val topicPartitions = getTopicPartitions(topic) + val groupDirs = new ZKGroupTopicDirs(group, topic) + val ownerByTopicPartition = topicPartitions.flatMap { topicPartition => + zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + topicPartition.partition)._1.map { owner => + topicPartition -> owner + } + }.toMap + val partitionOffsets = getPartitionOffsets(group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) + describeTopicPartition(group, topicPartitions, partitionOffsets.get, ownerByTopicPartition.get) + } + + private def getTopicPartitions(topic: String): Seq[TopicAndPartition] = { + val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic)) + val partitions = topicPartitionMap.getOrElse(topic, Seq.empty) + partitions.map(TopicAndPartition(topic, _)) + } + + protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = { + zkUtils.getLeaderForPartition(topic, partition) match { + case Some(-1) => LogEndOffsetResult.Unknown + case Some(brokerId) => + getZkConsumer(brokerId).map { consumer => + val topicAndPartition = new TopicAndPartition(topic, partition) + val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + consumer.close() + LogEndOffsetResult.LogEndOffset(logEndOffset) + }.getOrElse(LogEndOffsetResult.Ignore) + case None => + println(s"No broker for partition ${new TopicPartition(topic, partition)}") + LogEndOffsetResult.Ignore + } + } + + private def getPartitionOffsets(group: String, + topicPartitions: Seq[TopicAndPartition], + channelSocketTimeoutMs: Int, + channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = { + val offsetMap = mutable.Map[TopicAndPartition, Long]() + val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs) + channel.send(OffsetFetchRequest(group, topicPartitions)) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) + + offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => + if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { + val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) + // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool + // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) + try { + val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong + offsetMap.put(topicAndPartition, offset) + } catch { + case z: ZkNoNodeException => + println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper." + .format(group, topicAndPartition)) + } + } + else if (offsetAndMetadata.error == ErrorMapping.NoError) + offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else - println("Delete for group %s failed because its consumers are still active.".format(group)) + println("Could not fetch offset from kafka for group %s partition %s due to %s." + .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } - catch { - case e: ZkNoNodeException => - println("Delete for group %s failed because group does not exist.".format(group)) + channel.disconnect() + offsetMap.toMap + } + + private def deleteForGroup() { + val groups = opts.options.valuesOf(opts.groupOpt) + groups.asScala.foreach { group => + try { + if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group)) + println("Deleted all consumer group information for group %s in zookeeper.".format(group)) + else + println("Delete for group %s failed because its consumers are still active.".format(group)) + } + catch { + case e: ZkNoNodeException => + println("Delete for group %s failed because group does not exist.".format(group)) + } } } - } - private def deleteForTopic(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { - val groups = opts.options.valuesOf(opts.groupOpt) - val topic = opts.options.valueOf(opts.topicOpt) - Topic.validate(topic) - groups.foreach { group => - try { - if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) - println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic)) - else - println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic)) + private def deleteForTopic() { + val groups = opts.options.valuesOf(opts.groupOpt) + val topic = opts.options.valueOf(opts.topicOpt) + Topic.validate(topic) + groups.asScala.foreach { group => + try { + if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) + println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic)) + else + println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic)) + } + catch { + case e: ZkNoNodeException => + println("Delete for group %s topic %s failed because group does not exist.".format(group, topic)) + } } - catch { - case e: ZkNoNodeException => - println("Delete for group %s topic %s failed because group does not exist.".format(group, topic)) + } + + private def deleteAllForTopic() { + val topic = opts.options.valueOf(opts.topicOpt) + Topic.validate(topic) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic) + println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic)) + } + + private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = { + try { + zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { + case Some(brokerInfoString) => + Json.parseFull(brokerInfoString) match { + case Some(m) => + val brokerInfo = m.asInstanceOf[Map[String, Any]] + val host = brokerInfo.get("host").get.asInstanceOf[String] + val port = brokerInfo.get("port").get.asInstanceOf[Int] + Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand")) + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) + } + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) + } + } catch { + case t: Throwable => + println("Could not parse broker info due to " + t.getMessage) + None } } - } - private def deleteAllForTopic(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { - val topic = opts.options.valueOf(opts.topicOpt) - Topic.validate(topic) - AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic) - println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic)) } - private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = { - val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*""")) - require(configsToBeAdded.forall(config => config.length == 2), - "Invalid config: all configs to be added must be in the format \"key=val\".") - val props = new Properties - configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) - props - } + class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService { - private def describeTopic(zkUtils: ZkUtils, - group: String, - topic: String, - channelSocketTimeoutMs: Int, - channelRetryBackoffMs: Int, - opts: ConsumerGroupCommandOptions) { - val topicPartitions = getTopicPartitions(zkUtils, topic) - describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, topicPartitions) - } + private val adminClient = createAdminClient() - def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owner: Option[String] = None): Unit = { - val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) - topicPartitions - .sortBy { case topicPartition => topicPartition.partition } - .foreach { topicPartition => - describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts, owner) - } - } + // `consumer` is only needed for `describe`, so we instantiate it lazily + private var consumer: KafkaConsumer[String, String] = null - private def getTopicPartitions(zkUtils: ZkUtils, topic: String) = { - val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic)) - val partitions = topicPartitionMap.getOrElse(topic, Seq.empty) - partitions.map(TopicAndPartition(topic, _)) - } + def list() { + adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId)) + } - private def getPartitionOffsets(zkUtils: ZkUtils, - group: String, - topicPartitions: Seq[TopicAndPartition], - channelSocketTimeoutMs: Int, - channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = { - val offsetMap = mutable.Map[TopicAndPartition, Long]() - val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs) - channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) - - offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => - if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { - val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) - // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool - // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) - try { - val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong - offsetMap.put(topicAndPartition, offset) - } catch { - case z: ZkNoNodeException => - println("Could not fetch offset from zookeeper for group %s partition %s due to missing offset data in zookeeper." - .format(group, topicAndPartition)) + protected def describeGroup(group: String) { + val consumerSummaries = adminClient.describeConsumerGroup(group) + if (consumerSummaries.isEmpty) + println(s"Consumer group `${group}` does not exist or is rebalancing.") + else { + val consumer = getConsumer() + printDescribeHeader() + consumerSummaries.foreach { consumerSummary => + val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) + val partitionOffsets = topicPartitions.flatMap { topicPartition => + Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => + topicPartition -> offsetAndMetadata.offset + } + }.toMap + describeTopicPartition(group, topicPartitions, partitionOffsets.get, + _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}")) } } - else if (offsetAndMetadata.error == ErrorMapping.NoError) - offsetMap.put(topicAndPartition, offsetAndMetadata.offset) - else - println("Could not fetch offset from kafka for group %s partition %s due to %s." - .format(group, topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } - channel.disconnect() - offsetMap.toMap - } - private def describePartition(zkUtils: ZkUtils, - group: String, - topic: String, - partition: Int, - offsetOpt: Option[Long], - opts: ConsumerGroupCommandOptions, - ownerOpt: Option[String] = None) { - val topicPartition = new TopicPartition(topic, partition) - val groupDirs = new ZKGroupTopicDirs(group, topic) - val useNewConsumer: Boolean = opts.options.has(opts.newConsumerOpt) - val owner: Option[String] = if (useNewConsumer) ownerOpt else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1 - def print(logEndOffset: Long): Unit = { - val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _) - println("%s, %s, %s, %s, %s, %s, %s" - .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none"))) + protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = { + val consumer = getConsumer() + val topicPartition = new TopicPartition(topic, partition) + consumer.assign(List(topicPartition).asJava) + consumer.seekToEnd(topicPartition) + val logEndOffset = consumer.position(topicPartition) + LogEndOffsetResult.LogEndOffset(logEndOffset) } - zkUtils.getLeaderForPartition(topic, partition) match { - case Some(-1) => - println("%s, %s, %s, %s, %s, %s, %s" - .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none"))) - case Some(brokerId) => - if (useNewConsumer) { - val consumerOpt = getNewConsumer(zkUtils, brokerId) - consumerOpt match { - case Some(consumer) => - consumer.assign(List(topicPartition)) - consumer.seekToEnd(topicPartition) - val logEndOffset = consumer.position(topicPartition) - consumer.close() - print(logEndOffset) - case None => // ignore - } - } else { - val consumerOpt = getZkConsumer(zkUtils, brokerId) - consumerOpt match { - case Some(consumer) => - val topicAndPartition: TopicAndPartition = new TopicAndPartition(topicPartition.topic(), topicPartition.partition()) - val request = - OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - consumer.close() - print(logEndOffset) - case None => // ignore - } - } - case None => - println("No broker for partition %s".format(topicPartition)) + + def close() { + adminClient.close() + if (consumer != null) consumer.close() } - } - private def getNewConsumer(zkUtils: ZkUtils, brokerId: Int): Option[KafkaConsumer[String, String]] = { - try { - zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { - case Some(brokerInfoString) => - Json.parseFull(brokerInfoString) match { - case Some(m) => - val brokerInfo = m.asInstanceOf[Map[String, Any]] - val host = brokerInfo.get("host").get.asInstanceOf[String] - val port = brokerInfo.get("port").get.asInstanceOf[Int] - val deserializer: String = (new StringDeserializer).getClass.getName - val properties: Properties = new Properties() - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host + ":" + port) - properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroupCommand") - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer) - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) - Some(new KafkaConsumer[String, String](properties)) - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) - } - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) - } - } catch { - case t: Throwable => - println("Could not parse broker info due to " + t.getMessage) - None + private def createAdminClient(): AdminClient = + AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt)) + + private def getConsumer() = { + if (consumer == null) + consumer = createNewConsumer() + consumer } - } - private def getZkConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = { - try { - zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { - case Some(brokerInfoString) => - Json.parseFull(brokerInfoString) match { - case Some(m) => - val brokerInfo = m.asInstanceOf[Map[String, Any]] - val host = brokerInfo.get("host").get.asInstanceOf[String] - val port = brokerInfo.get("port").get.asInstanceOf[Int] - Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand")) - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) - } - case None => - throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) - } - } catch { - case t: Throwable => - println("Could not parse broker info due to " + t.getMessage) - None + private def createNewConsumer(): KafkaConsumer[String, String] = { + val properties = new Properties() + val deserializer = (new StringDeserializer).getClass.getName + val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt) + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) + properties.put(ConsumerConfig.GROUP_ID_CONFIG, opts.options.valueOf(opts.groupOpt)) + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer) + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) + new KafkaConsumer(properties) } + + } + + sealed trait LogEndOffsetResult + + object LogEndOffsetResult { + case class LogEndOffset(value: Long) extends LogEndOffsetResult + case object Unknown extends LogEndOffsetResult + case object Ignore extends LogEndOffsetResult } class ConsumerGroupCommandOptions(args: Array[String]) { - val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + val ZkConnectDoc = "REQUIRED (unless new-consumer is used): The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over." val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to." val GroupDoc = "The consumer group we wish to act on." @@ -390,10 +435,23 @@ object ConsumerGroupCommand { // check required args if (options.has(newConsumerOpt)) { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) + + if (options.has(zkConnectOpt)) + CommandLineUtils.printUsageAndDie(parser, s"Option $zkConnectOpt is not valid with $newConsumerOpt") + if (options.has(deleteOpt)) - CommandLineUtils.printUsageAndDie(parser, "Option %s does not work with %s".format(deleteOpt, newConsumerOpt)) - } else + CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is not valid with $newConsumerOpt. Note that " + + "there's no need to delete group metadata for the new consumer as it is automatically deleted when the last " + + "member leaves") + + } else { CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + + if (options.has(bootstrapServerOpt)) + CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is only valid with $newConsumerOpt") + + } + if (options.has(describeOpt)) CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
