Repository: kafka Updated Branches: refs/heads/trunk 33e879a38 -> c3c0c04e6
KAFKA-2490: support new consumer in ConsumerGroupCommand Author: Ashish Singh <[email protected]> Reviewers: Guozhang Wang, Jason Gustafson Closes #299 from SinghAsDev/KAFKA-2490 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c3c0c04e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c3c0c04e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c3c0c04e Branch: refs/heads/trunk Commit: c3c0c04e62253f2a9f78b383bbf0d1a04d9b3b25 Parents: 33e879a Author: Ashish Singh <[email protected]> Authored: Thu Nov 5 18:56:26 2015 -0800 Committer: Gwen Shapira <[email protected]> Committed: Thu Nov 5 18:56:26 2015 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/admin/AdminClient.scala | 35 +++- .../kafka/admin/ConsumerGroupCommand.scala | 173 ++++++++++++++----- .../integration/kafka/api/AdminClientTest.scala | 2 +- 3 files changed, 164 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c3c0c04e/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index ddd3114..1c4aa52 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -19,9 +19,9 @@ import kafka.common.KafkaException import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary} import kafka.utils.Logging import org.apache.kafka.clients._ -import org.apache.kafka.clients.consumer.internals.{SendFailedException, ConsumerProtocol, ConsumerNetworkClient, RequestFuture} +import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, SendFailedException} import org.apache.kafka.common.config.ConfigDef.{Importance, Type} -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SaslConfigs, SslConfigs} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import org.apache.kafka.common.errors.DisconnectException import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.Selector @@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.{SystemTime, Time, Utils} -import org.apache.kafka.common.{TopicPartition, Cluster, Node} +import org.apache.kafka.common.{Cluster, Node, TopicPartition} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -147,10 +147,21 @@ class AdminClient(val time: Time, GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members) } - def describeConsumerGroup(groupId: String): Map[String, List[TopicPartition]] = { + def describeConsumerGroup(groupId: String): (Map[TopicPartition, String], Map[String, List[TopicPartition]]) = { val group = describeGroup(groupId) + try { + val membersAndTopicPartitions: Map[String, List[TopicPartition]] = getMembersAndTopicPartitions(group) + val owners = getOwners(group) + (owners, membersAndTopicPartitions) + } catch { + case (ex: IllegalArgumentException) => + throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group.") + } + } + + def getMembersAndTopicPartitions(group: GroupSummary): Map[String, List[TopicPartition]] = { if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group") + throw new IllegalArgumentException(s"${group} is not a valid GroupSummary") group.members.map { case member => @@ -159,6 +170,20 @@ class AdminClient(val time: Time, }.toMap } + def getOwners(groupSummary: GroupSummary): Map[TopicPartition, String] = { + if (groupSummary.protocolType != ConsumerProtocol.PROTOCOL_TYPE) + throw new IllegalArgumentException(s"${groupSummary} is not a valid GroupSummary") + + groupSummary.members.flatMap { + case member => + val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) + val partitions = assignment.partitions().asScala.toList + partitions.map { + case partition: TopicPartition => + partition -> "%s_%s".format(member.memberId, member.clientHost) + }.toMap + }.toMap + } } object AdminClient { http://git-wip-us.apache.org/repos/asf/kafka/blob/c3c0c04e/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index a30c12d..91dc4e3 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -18,21 +18,23 @@ package kafka.admin -import kafka.utils._ -import org.I0Itec.zkclient.ZkClient -import kafka.common._ import java.util.Properties + +import joptsimple.{OptionParser, OptionSpec} +import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo} import kafka.client.ClientUtils -import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest} -import org.I0Itec.zkclient.exception.ZkNoNodeException -import kafka.common.TopicAndPartition -import joptsimple.{OptionSpec, OptionParser} -import scala.collection.{Set, mutable} +import kafka.common.{TopicAndPartition, _} import kafka.consumer.SimpleConsumer -import collection.JavaConversions._ -import org.apache.kafka.common.utils.Utils +import kafka.utils._ +import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.security.JaasUtils +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.utils.Utils +import scala.collection.JavaConversions._ +import scala.collection.{Set, mutable} object ConsumerGroupCommand { @@ -56,7 +58,7 @@ object ConsumerGroupCommand { try { if (opts.options.has(opts.listOpt)) - list(zkUtils) + list(zkUtils, opts) else if (opts.options.has(opts.describeOpt)) describe(zkUtils, opts) else if (opts.options.has(opts.deleteOpt)) @@ -70,20 +72,46 @@ object ConsumerGroupCommand { } } - def list(zkUtils: ZkUtils) { - zkUtils.getConsumerGroups().foreach(println) + def list(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { + val useNewConsumer = opts.options.has(opts.newConsumerOpt) + if (!useNewConsumer) + zkUtils.getConsumerGroups().foreach(println) + else { + val adminClient = createAndGetAdminClient(opts) + adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId)) + } + } + + def createAndGetAdminClient(opts: ConsumerGroupCommandOptions): AdminClient = { + AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt)) } def describe(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { + val useNewConsumer = opts.options.has(opts.newConsumerOpt) + val group = opts.options.valueOf(opts.groupOpt) val configs = parseConfigs(opts) val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt - val group = opts.options.valueOf(opts.groupOpt) - val topics = zkUtils.getTopicsByConsumerGroup(group) - if (topics.isEmpty) { + def warnNoTopicsForGroupFound: Unit = { println("No topic available for consumer group provided") } - topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) + + println("%s, %s, %s, %s, %s, %s, %s" + .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) + + if (!useNewConsumer) { + val topics = zkUtils.getTopicsByConsumerGroup(group) + if (topics.isEmpty) { + warnNoTopicsForGroupFound + } + topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs, opts)) + } else { + val (owners, groupAndTopicPartitions) = createAndGetAdminClient(opts).describeConsumerGroup(group) + + if (groupAndTopicPartitions.isEmpty) + warnNoTopicsForGroupFound + groupAndTopicPartitions.foreach(x => describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, x._2.map(tp => new TopicAndPartition(tp.topic(), tp.partition())), owners)) + } } def delete(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) { @@ -152,15 +180,18 @@ object ConsumerGroupCommand { group: String, topic: String, channelSocketTimeoutMs: Int, - channelRetryBackoffMs: Int) { + channelRetryBackoffMs: Int, + opts: ConsumerGroupCommandOptions) { val topicPartitions = getTopicPartitions(zkUtils, topic) + describeTopicPartition(zkUtils, group, channelSocketTimeoutMs, channelRetryBackoffMs, opts, topicPartitions) + } + + def describeTopicPartition(zkUtils: ZkUtils, group: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int, opts: ConsumerGroupCommandOptions, topicPartitions: Seq[TopicAndPartition], owners: Map[TopicPartition, String] = null): Unit = { val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs) - println("%s, %s, %s, %s, %s, %s, %s" - .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER")) topicPartitions .sortBy { case topicPartition => topicPartition.partition } .foreach { topicPartition => - describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition)) + describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition), opts, owners) } } @@ -208,34 +239,84 @@ object ConsumerGroupCommand { group: String, topic: String, partition: Int, - offsetOpt: Option[Long]) { - val topicAndPartition = TopicAndPartition(topic, partition) + offsetOpt: Option[Long], + opts: ConsumerGroupCommandOptions, + owners: Map[TopicPartition, String] = null) { + val topicPartition = new TopicPartition(topic, partition) val groupDirs = new ZKGroupTopicDirs(group, topic) - val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1 + val useNewConsumer: Boolean = opts.options.has(opts.newConsumerOpt) + val owner = if (useNewConsumer) owners.get(new TopicPartition(topic, partition)) else zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1 + def print(logEndOffset: Long): Unit = { + val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _) + println("%s, %s, %s, %s, %s, %s, %s" + .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none"))) + } zkUtils.getLeaderForPartition(topic, partition) match { case Some(-1) => println("%s, %s, %s, %s, %s, %s, %s" .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none"))) case Some(brokerId) => - val consumerOpt = getConsumer(zkUtils, brokerId) - consumerOpt match { - case Some(consumer) => - val request = - OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - consumer.close() - - val lag = offsetOpt.filter(_ != -1).map(logEndOffset - _) - println("%s, %s, %s, %s, %s, %s, %s" - .format(group, topic, partition, offsetOpt.getOrElse("unknown"), logEndOffset, lag.getOrElse("unknown"), owner.getOrElse("none"))) - case None => // ignore + if (useNewConsumer) { + val consumerOpt = getNewConsumer(zkUtils, brokerId) + consumerOpt match { + case Some(consumer) => + consumer.assign(List(topicPartition)) + consumer.seekToEnd(topicPartition) + val logEndOffset = consumer.position(topicPartition) + consumer.close() + print(logEndOffset) + case None => // ignore + } + } else { + val consumerOpt = getZkConsumer(zkUtils, brokerId) + consumerOpt match { + case Some(consumer) => + val topicAndPartition: TopicAndPartition = new TopicAndPartition(topicPartition.topic(), topicPartition.partition()) + val request = + OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val logEndOffset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + consumer.close() + print(logEndOffset) + case None => // ignore + } } case None => - println("No broker for partition %s".format(topicAndPartition)) + println("No broker for partition %s".format(topicPartition)) + } + } + + private def getNewConsumer(zkUtils: ZkUtils, brokerId: Int): Option[KafkaConsumer[String, String]] = { + try { + zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { + case Some(brokerInfoString) => + Json.parseFull(brokerInfoString) match { + case Some(m) => + val brokerInfo = m.asInstanceOf[Map[String, Any]] + val host = brokerInfo.get("host").get.asInstanceOf[String] + val port = brokerInfo.get("port").get.asInstanceOf[Int] + val deserializer: String = (new StringDeserializer).getClass.getName + val properties: Properties = new Properties() + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host + ":" + port) + properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroupCommand") + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer) + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) + Some(new KafkaConsumer[String, String](properties)) + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) + } + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) + } + } catch { + case t: Throwable => + println("Could not parse broker info due to " + t.getMessage) + None } } - private def getConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = { + private def getZkConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = { try { zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { case Some(brokerInfoString) => @@ -261,6 +342,7 @@ object ConsumerGroupCommand { class ConsumerGroupCommandOptions(args: Array[String]) { val ZkConnectDoc = "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over." + val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to." val GroupDoc = "The consumer group we wish to act on." val TopicDoc = "The topic whose consumer group information should be deleted." val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600" @@ -273,12 +355,17 @@ object ConsumerGroupCommand { "information for the given consumer groups. For instance --group g1 --group g2 --topic t1" + nl + "Pass in just a topic to delete the given topic's partition offsets and ownership information " + "for every consumer group. For instance --topic t1" + nl + - "WARNING: Only does deletions on consumer groups that are not active." + "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active." + val NewConsumerDoc = "Use new consumer." val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc) + .withRequiredArg + .describedAs("server to connect to") + .ofType(classOf[String]) val groupOpt = parser.accepts("group", GroupDoc) .withRequiredArg .describedAs("consumer group") @@ -294,13 +381,19 @@ object ConsumerGroupCommand { val listOpt = parser.accepts("list", ListDoc) val describeOpt = parser.accepts("describe", DescribeDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) + val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc) val options = parser.parse(args : _*) val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) def checkArgs() { // check required args - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + if (options.has(newConsumerOpt)) { + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) + if (options.has(deleteOpt)) + CommandLineUtils.printUsageAndDie(parser, "Option %s does not work with %s".format(deleteOpt, newConsumerOpt)) + } else + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) if (options.has(describeOpt)) CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt)) http://git-wip-us.apache.org/repos/asf/kafka/blob/c3c0c04e/core/src/test/scala/integration/kafka/api/AdminClientTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala index 7d529ec..5b8cbc2 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala @@ -105,7 +105,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging { !consumers(0).assignment().isEmpty }, "Expected non-empty assignment") - val assignment = client.describeConsumerGroup(groupId) + val (_, assignment) = client.describeConsumerGroup(groupId) assertEquals(1, assignment.size) for (partitions <- assignment.values) assertEquals(Set(tp, tp2), partitions.toSet)
