Repository: kafka Updated Branches: refs/heads/trunk a4802962c -> 0aff45096
KAFKA-3158; ConsumerGroupCommand should tell whether group is actually dead This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message. Author: Ishita Mandhan <[email protected]> Reviewers: Vahid Hashemian <[email protected]>, Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #1429 from imandhan/KAFKA-3158 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0aff4509 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0aff4509 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0aff4509 Branch: refs/heads/trunk Commit: 0aff450961a8dd14cc7820ee8d1c9eea855439b0 Parents: a480296 Author: Ishita Mandhan <[email protected]> Authored: Sat May 28 23:30:10 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat May 28 23:30:10 2016 +0100 ---------------------------------------------------------------------- .../main/scala/kafka/admin/AdminClient.scala | 10 +++--- .../kafka/admin/ConsumerGroupCommand.scala | 33 +++++++++++--------- .../integration/kafka/api/AdminClientTest.scala | 2 +- 3 files changed, 24 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0aff4509/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index ebb5026..8572ceb 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -143,21 +143,21 @@ class AdminClient(val time: Time, clientHost: String, assignment: List[TopicPartition]) - def describeConsumerGroup(groupId: String): List[ConsumerSummary] = { + def describeConsumerGroup(groupId: String): Option[List[ConsumerSummary]] = { val group = describeGroup(groupId) if (group.state == "Dead") - return List.empty[ConsumerSummary] + return None if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group") if (group.state == "Stable") { - group.members.map { member => + Some(group.members.map { member => val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList) - } + }) } else { - List.empty + Some(List.empty) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0aff4509/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 414e7ba..b086d8f 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -312,22 +312,25 @@ object ConsumerGroupCommand { } protected def describeGroup(group: String) { - val consumerSummaries = adminClient.describeConsumerGroup(group) - if (consumerSummaries.isEmpty) - println(s"Consumer group `${group}` does not exist or is rebalancing.") - else { - val consumer = getConsumer() - printDescribeHeader() - consumerSummaries.foreach { consumerSummary => - val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) - val partitionOffsets = topicPartitions.flatMap { topicPartition => - Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => - topicPartition -> offsetAndMetadata.offset + adminClient.describeConsumerGroup(group) match { + case None => println(s"Consumer group `${group}` does not exist.") + case Some(consumerSummaries) => + if (consumerSummaries.isEmpty) + println(s"Consumer group `${group}` is rebalancing.") + else { + val consumer = getConsumer() + printDescribeHeader() + consumerSummaries.foreach { consumerSummary => + val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) + val partitionOffsets = topicPartitions.flatMap { topicPartition => + Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata => + topicPartition -> offsetAndMetadata.offset + } + }.toMap + describeTopicPartition(group, topicPartitions, partitionOffsets.get, + _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}")) } - }.toMap - describeTopicPartition(group, topicPartitions, partitionOffsets.get, - _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}")) - } + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0aff4509/core/src/test/scala/integration/kafka/api/AdminClientTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala index 7fae81e..3d39475 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala @@ -106,7 +106,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging { val consumerSummaries = client.describeConsumerGroup(groupId) assertEquals(1, consumerSummaries.size) - assertEquals(Set(tp, tp2), consumerSummaries.head.assignment.toSet) + assertEquals(Some(Set(tp, tp2)), consumerSummaries.map(_.head.assignment.toSet)) } @Test
