Repository: kafka Updated Branches: refs/heads/trunk fbbe5821c -> 94909a8f8
KAFKA-4357; Fix consumer group describe output when there is no active member (old consumer) Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Sriharsha Chintalapani <har...@hortonworks.com>, Jason Gustafson <ja...@confluent.io> Closes #2075 from vahidhashemian/KAFKA-4357 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94909a8f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94909a8f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94909a8f Branch: refs/heads/trunk Commit: 94909a8f83bfe214726f85130ad04d867e022894 Parents: fbbe582 Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Tue Nov 1 11:36:12 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Nov 1 11:36:12 2016 -0700 ---------------------------------------------------------------------- .../kafka/admin/ConsumerGroupCommand.scala | 35 ++++++++++---------- .../kafka/admin/DescribeConsumerGroupTest.scala | 29 +++++++++++++++- 2 files changed, 46 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/94909a8f/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index a9cd6d3..b53856e 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -71,25 +71,26 @@ object ConsumerGroupCommand extends Logging { val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head assignments match { case None => + // applies to both old and new consumer printError(s"The consumer group '$groupId' does not exist.") case Some(assignments) => - state match { - case Some("Dead") => - printError(s"Consumer group '$groupId' does not exist.") - case Some("Empty") => - printError(s"Consumer group '$groupId' has no active members.") - case Some("PreparingRebalance") | Some("AwaitingSync") => - System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.") - printAssignment(assignments, !opts.useOldConsumer) - case Some("Stable") => - printAssignment(assignments, !opts.useOldConsumer) - case Some(other) => - // the control should never reach here - throw new KafkaException(s"Expected a valid consumer group state, but found '$other'.") - case None => - // the control should never reach here - throw new KafkaException("Expected a valid consumer group state, but none found.") - } + if (opts.useOldConsumer) + printAssignment(assignments, false) + else + state match { + case Some("Dead") => + printError(s"Consumer group '$groupId' does not exist.") + case Some("Empty") => + printError(s"Consumer group '$groupId' has no active members.") + case Some("PreparingRebalance") | Some("AwaitingSync") => + System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.") + printAssignment(assignments, true) + case Some("Stable") => + printAssignment(assignments, true) + case other => + // the control should never reach here + throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.") + } } } else if (opts.options.has(opts.deleteOpt)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/94909a8f/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index 39bcb7a..b9c760d 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -90,7 +90,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { val (_, assignments) = consumerGroupCommand.describeGroup() assignments.isDefined && assignments.get.filter(_.group == group).size == 1 && - assignments.get.filter(_.group == group).head.consumerId.isDefined + assignments.get.filter(_.group == group).head.consumerId.exists(_.trim.nonEmpty) }, "Expected rows and a member id column in describe group results.") // cleanup @@ -99,6 +99,33 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { } @Test + def testDescribeExistingGroupWithNoMembers() { + // mocks + val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock() + + // stubs + val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group)) + val consumerGroupCommand = new ZkConsumerGroupService(opts) + + // simulation + EasyMock.replay(consumerMock) + + // action/test + val (_, a1) = consumerGroupCommand.describeGroup() // there should be a member here + consumerMock.stop() + TestUtils.waitUntilTrue(() => { + val (_, assignments) = consumerGroupCommand.describeGroup() + assignments.isDefined && + assignments.get.filter(_.group == group).size == 1 && + assignments.get.filter(_.group == group).head.consumerId.isDefined && + assignments.get.filter(_.group == group).head.consumerId.exists(_.trim.isEmpty) // the member should be gone + }, "Expected no active member in describe group results.") + + // cleanup + consumerGroupCommand.close() + } + + @Test def testDescribeConsumersWithNoAssignedPartitions() { // mocks val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()