Repository: kafka Updated Branches: refs/heads/trunk 53fc26723 -> 34e9cc5df
KAFKA-4349; Handle 'PreparingRebalance' and 'AwaitingSync' states in consumer group describe The edge case where consumer group state is `PreparingRebalance` or `AwaitingSync` will be separately handled as the group assignment is not yet determined. Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #2070 from vahidhashemian/KAFKA-4349 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34e9cc5d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34e9cc5d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34e9cc5d Branch: refs/heads/trunk Commit: 34e9cc5dfae1f0a7b2cab51cb2939d48ba048964 Parents: 53fc267 Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Thu Oct 27 18:41:38 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Oct 27 18:41:38 2016 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/admin/AdminClient.scala | 10 +++++-- .../kafka/admin/ConsumerGroupCommand.scala | 31 +++++++++++--------- 2 files changed, 25 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/34e9cc5d/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 22a8abb..1179557 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -149,9 +149,15 @@ class AdminClient(val time: Time, Errors.forCode(metadata.errorCode()).maybeThrow() val consumers = metadata.members.map { consumer => - val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment))) - ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, assignment.partitions.toList) + ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match { + case "Stable" => + val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment))) + assignment.partitions.toList + case _ => + List() + }) }.toList + ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator) } http://git-wip-us.apache.org/repos/asf/kafka/blob/34e9cc5d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 6300d76..a9cd6d3 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -73,20 +73,23 @@ object ConsumerGroupCommand extends Logging { case None => printError(s"The consumer group '$groupId' does not exist.") case Some(assignments) => - if (assignments.isEmpty) - state match { - case Some("Dead") => - printError(s"Consumer group '$groupId' does not exist.") - case Some("Empty") => - printError(s"Consumer group '$groupId' has no active members.") - case Some(_) => - printError(s"Consumer group '$groupId' is rebalancing.") - case None => - // the control should never reach here - throw new KafkaException("Expected a valid consumer group state, but none found.") - } - else - printAssignment(assignments, !opts.useOldConsumer) + state match { + case Some("Dead") => + printError(s"Consumer group '$groupId' does not exist.") + case Some("Empty") => + printError(s"Consumer group '$groupId' has no active members.") + case Some("PreparingRebalance") | Some("AwaitingSync") => + System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.") + printAssignment(assignments, !opts.useOldConsumer) + case Some("Stable") => + printAssignment(assignments, !opts.useOldConsumer) + case Some(other) => + // the control should never reach here + throw new KafkaException(s"Expected a valid consumer group state, but found '$other'.") + case None => + // the control should never reach here + throw new KafkaException("Expected a valid consumer group state, but none found.") + } } } else if (opts.options.has(opts.deleteOpt)) {