Repository: kafka
Updated Branches:
  refs/heads/trunk 69777260e -> 529786638


KAFKA-5526; Additional `--describe` views for ConsumerGroupCommand (KIP-175)

The `--describe` option of ConsumerGroupCommand is expanded, as proposed in 
[KIP-175](https://cwiki.apache.org/confluence/display/KAFKA/KIP-175%3A+Additional+%27--describe%27+views+for+ConsumerGroupCommand),
 to support:
* `--describe` or `--describe --offsets`: listing of current group offsets
* `--describe --members` or `--describe --members --verbose`: listing of group 
members
* `--describe --state`: group status

Example: With a single partition topic `test1` and a double partition topic 
`test2`, consumers `consumer1` and `consumer11` subscribed to `test`, consumers 
`consumer2` and `consumer22` and `consumer222` subscribed to `test2`, and all 
consumers belonging to group `test-group`, this is an output example of the new 
options above for `test-group`:

```
--describe, or --describe --offsets:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             
CONSUMER-ID                                     HOST            CLIENT-ID
test2           0          0               0               0               
consumer2-bad9496d-0889-47ab-98ff-af17d9460382  /127.0.0.1      consumer2
test2           1          0               0               0               
consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411 /127.0.0.1      consumer22
test1           0          0               0               0               
consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf  /127.0.0.1      consumer1
```

```
--describe --members

CONSUMER-ID                                      HOST            CLIENT-ID      
 #PARTITIONS
consumer2-bad9496d-0889-47ab-98ff-af17d9460382   /127.0.0.1      consumer2      
 1
consumer222-ed2108cd-d368-41f1-8514-5b72aa835bcc /127.0.0.1      consumer222    
 0
consumer11-dc8295d7-8f3f-4438-9b11-7270bab46760  /127.0.0.1      consumer11     
 0
consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411  /127.0.0.1      consumer22     
 1
consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf   /127.0.0.1      consumer1      
 1
```

```
--describe --members --verbose

CONSUMER-ID                                      HOST            CLIENT-ID      
 #PARTITIONS     ASSIGNMENT
consumer2-bad9496d-0889-47ab-98ff-af17d9460382   /127.0.0.1      consumer2      
 1               test2(0)
consumer222-ed2108cd-d368-41f1-8514-5b72aa835bcc /127.0.0.1      consumer222    
 0               -
consumer11-dc8295d7-8f3f-4438-9b11-7270bab46760  /127.0.0.1      consumer11     
 0               -
consumer22-c45e6ee2-0c7d-44a3-94a8-9627f63fb411  /127.0.0.1      consumer22     
 1               test2(1)
consumer1-d51b0345-3194-4305-80db-81a68fa6c5bf   /127.0.0.1      consumer1      
 1               test1(0)
```

```
--describe --state

COORDINATOR (ID)         ASSIGNMENT-STRATEGY       STATE                #MEMBERS
localhost:9092 (0)       range                     Stable               5
```

Note that this PR also addresses the issue reported in 
[KAFKA-6158](https://issues.apache.org/jira/browse/KAFKA-6158) by dynamically 
setting the width of columns `TOPIC`, `CONSUMER-ID`, `HOST`, `CLIENT-ID` and 
`COORDINATOR (ID)`. This avoid truncation of column values when they go over 
the current fixed width of these columns.

The code has been restructured to better support testing of individual values 
and also the console output. Unit tests have been updated and extended to take 
advantage of this restructuring.

Author: Vahid Hashemian <vahidhashem...@us.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #4271 from vahidhashemian/KAFKA-5526


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/52978663
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/52978663
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/52978663

Branch: refs/heads/trunk
Commit: 529786638baeb4335065df4e2b240aad42caca9a
Parents: 6977726
Author: Vahid Hashemian <vahidhashem...@us.ibm.com>
Authored: Fri Dec 15 10:26:00 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Dec 15 10:26:00 2017 -0800

----------------------------------------------------------------------
 .../kafka/admin/ConsumerGroupCommand.scala      | 309 +++++++---
 .../kafka/admin/DescribeConsumerGroupTest.scala | 597 ++++++++++++++++---
 .../admin/ResetConsumerGroupOffsetTest.scala    |   6 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  26 +-
 4 files changed, 764 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/52978663/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 9e35ebc..918593b 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -20,14 +20,15 @@ package kafka.admin
 import java.text.{ParseException, SimpleDateFormat}
 import java.util.{Date, Properties}
 import javax.xml.datatype.DatatypeFactory
-
 import joptsimple.{OptionParser, OptionSpec}
+
 import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, 
PartitionOffsetRequestInfo}
 import kafka.client.ClientUtils
 import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
-import kafka.utils.Implicits._
 import kafka.consumer.SimpleConsumer
 import kafka.utils._
+import kafka.utils.Implicits._
+
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, 
OffsetAndMetadata}
@@ -53,7 +54,7 @@ object ConsumerGroupCommand extends Logging {
       CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer 
groups, describe a consumer group, delete consumer group info, or reset 
consumer group offsets.")
 
     // should have exactly one action
-    val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, 
opts.resetOffsetsOpt).count(opts.options.has _)
+    val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, 
opts.resetOffsetsOpt).count(opts.options.has)
     if (actions != 1)
       CommandLineUtils.printUsageAndDie(opts.parser, "Command must include 
exactly one action: --list, --describe, --delete, --reset-offset")
 
@@ -61,10 +62,10 @@ object ConsumerGroupCommand extends Logging {
 
     val consumerGroupService = {
       if (opts.useOldConsumer) {
-        System.err.println("Note: This will only show information about 
consumers that use ZooKeeper (not those using the Java consumer API).\n")
+        Console.err.println("Note: This will only show information about 
consumers that use ZooKeeper (not those using the Java consumer API).")
         new ZkConsumerGroupService(opts)
       } else {
-        System.err.println("Note: This will not show information about old 
Zookeeper-based consumers.\n")
+        Console.err.println("Note: This will not show information about old 
Zookeeper-based consumers.")
         new KafkaConsumerGroupService(opts)
       }
     }
@@ -72,34 +73,8 @@ object ConsumerGroupCommand extends Logging {
     try {
       if (opts.options.has(opts.listOpt))
         consumerGroupService.listGroups().foreach(println(_))
-      else if (opts.options.has(opts.describeOpt)) {
-        val (state, assignments) = consumerGroupService.describeGroup()
-        val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head
-        assignments match {
-          case None =>
-            // applies to both old and new consumer
-            printError(s"The consumer group '$groupId' does not exist.")
-          case Some(assignments) =>
-            if (opts.useOldConsumer)
-              printAssignment(assignments, false)
-            else
-              state match {
-                case Some("Dead") =>
-                  printError(s"Consumer group '$groupId' does not exist.")
-                case Some("Empty") =>
-                  System.err.println(s"Consumer group '$groupId' has no active 
members.")
-                  printAssignment(assignments, true)
-                case Some("PreparingRebalance") | Some("CompletingRebalance") 
=>
-                  System.err.println(s"Warning: Consumer group '$groupId' is 
rebalancing.")
-                  printAssignment(assignments, true)
-                case Some("Stable") =>
-                  printAssignment(assignments, true)
-                case other =>
-                  // the control should never reach here
-                  throw new KafkaException(s"Expected a valid consumer group 
state, but found '${other.getOrElse("NONE")}'.")
-              }
-        }
-      }
+      else if (opts.options.has(opts.describeOpt))
+        consumerGroupService.describeGroup()
       else if (opts.options.has(opts.deleteOpt)) {
         consumerGroupService match {
           case service: ZkConsumerGroupService => service.deleteGroups()
@@ -129,23 +104,6 @@ object ConsumerGroupCommand extends Logging {
     e.foreach(debug("Exception in consumer group command", _))
   }
 
-  def printAssignment(groupAssignment: Seq[PartitionAssignmentState], 
useNewConsumer: Boolean): Unit = {
-    print("\n%-30s %-10s %-15s %-15s %-10s %-50s".format("TOPIC", "PARTITION", 
"CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID"))
-    if (useNewConsumer)
-      print("%-30s %s".format("HOST", "CLIENT-ID"))
-    println()
-
-    groupAssignment.foreach { consumerAssignment =>
-      print("%-30s %-10s %-15s %-15s %-10s %-50s".format(
-        consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE),
-        consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
-        consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE)))
-      if (useNewConsumer)
-        print("%-30s 
%s".format(consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE)))
-      println()
-    }
-  }
-
   def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, 
OffsetAndMetadata]): Unit = {
     print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
     println()
@@ -165,12 +123,151 @@ object ConsumerGroupCommand extends Logging {
                                                 consumerId: Option[String], 
host: Option[String],
                                                 clientId: Option[String], 
logEndOffset: Option[Long])
 
+  protected case class MemberAssignmentState(group: String, consumerId: 
String, host: String, clientId: String,
+                                             numPartitions: Int, assignment: 
List[TopicPartition])
+
+  protected case class GroupState(group: String, coordinator: Node, 
assignmentStrategy: String, state: String, numMembers: Int)
+
   sealed trait ConsumerGroupService {
 
     def listGroups(): List[String]
 
-    def describeGroup(): (Option[String], 
Option[Seq[PartitionAssignmentState]]) = {
-      collectGroupAssignment(opts.options.valueOf(opts.groupOpt))
+    private def shouldPrintMemberState(group: String, state: Option[String], 
numRows: Option[Int]): Boolean = {
+      // numRows contains the number of data rows, if any, compiled from the 
API call in the caller method.
+      // if it's undefined or 0, there is no relevant group information to 
display.
+      numRows match {
+        case None =>
+          // applies to both old and new consumer
+          printError(s"The consumer group '$group' does not exist.")
+          false
+        case Some(num) =>
+          opts.useOldConsumer || {
+            state match {
+              case Some("Dead") =>
+                printError(s"Consumer group '$group' does not exist.")
+              case Some("Empty") =>
+                Console.err.println(s"Consumer group '$group' has no active 
members.")
+              case Some("PreparingRebalance") | Some("CompletingRebalance") =>
+                Console.err.println(s"Warning: Consumer group '$group' is 
rebalancing.")
+              case Some("Stable") =>
+              case other =>
+                // the control should never reach here
+                throw new KafkaException(s"Expected a valid consumer group 
state, but found '${other.getOrElse("NONE")}'.")
+            }
+            state != Some("Dead") && num > 0
+          }
+      }
+    }
+
+    private def size(colOpt: Option[Seq[Object]]): Option[Int] = 
colOpt.map(_.size)
+
+    private def printOffsets(group: String, state: Option[String], 
assignments: Option[Seq[PartitionAssignmentState]]): Unit = {
+      if (shouldPrintMemberState(group, state, size(assignments))) {
+        // find proper columns width
+        var (maxTopicLen, maxConsumerIdLen, maxHostLen) = (15, 15, 15)
+        assignments match {
+          case None => // do nothing
+          case Some(consumerAssignments) =>
+            consumerAssignments.foreach { consumerAssignment =>
+              maxTopicLen = Math.max(maxTopicLen, 
consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE).length)
+              maxConsumerIdLen = Math.max(maxConsumerIdLen, 
consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE).length)
+              if (!opts.useOldConsumer)
+                maxHostLen = Math.max(maxHostLen, 
consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE).length)
+            }
+        }
+
+        print(s"\n%${-maxTopicLen}s %-10s %-15s %-15s %-15s 
%${-maxConsumerIdLen}s "
+          .format("TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", 
"LAG", "CONSUMER-ID"))
+
+        if (!opts.useOldConsumer)
+          print(s"%${-maxHostLen}s %s".format("HOST", "CLIENT-ID"))
+        println()
+
+        assignments match {
+          case None => // do nothing
+          case Some(consumerAssignments) =>
+            consumerAssignments.foreach { consumerAssignment =>
+              print(s"%${-maxTopicLen}s %-10s %-15s %-15s %-15s 
%${-maxConsumerIdLen}s ".format(
+                consumerAssignment.topic.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.partition.getOrElse(MISSING_COLUMN_VALUE),
+                consumerAssignment.offset.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.logEndOffset.getOrElse(MISSING_COLUMN_VALUE),
+                consumerAssignment.lag.getOrElse(MISSING_COLUMN_VALUE), 
consumerAssignment.consumerId.getOrElse(MISSING_COLUMN_VALUE)))
+              if (!opts.useOldConsumer)
+                print(s"%${-maxHostLen}s 
%s".format(consumerAssignment.host.getOrElse(MISSING_COLUMN_VALUE),
+                  consumerAssignment.clientId.getOrElse(MISSING_COLUMN_VALUE)))
+              println()
+            }
+        }
+      }
+    }
+
+    private def printMembers(group: String, state: Option[String], 
assignments: Option[Seq[MemberAssignmentState]], verbose: Boolean): Unit = {
+      if (shouldPrintMemberState(group, state, size(assignments))) {
+        // find proper columns width
+        var (maxConsumerIdLen, maxHostLen, maxClientIdLen) = (15, 15, 15)
+        assignments match {
+          case None => // do nothing
+          case Some(memberAssignments) =>
+            memberAssignments.foreach { memberAssignment =>
+              maxConsumerIdLen = Math.max(maxConsumerIdLen, 
memberAssignment.consumerId.length)
+              maxHostLen = Math.max(maxHostLen, memberAssignment.host.length)
+              maxClientIdLen = Math.max(maxClientIdLen, 
memberAssignment.clientId.length)
+            }
+        }
+
+        print(s"\n%${-maxConsumerIdLen}s %${-maxHostLen}s %${-maxClientIdLen}s 
%-15s "
+          .format("CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS"))
+        if (verbose)
+          print("%s".format("ASSIGNMENT"))
+        println()
+
+        assignments match {
+          case None => // do nothing
+          case Some(memberAssignments) =>
+            memberAssignments.foreach { memberAssignment =>
+              print(s"%${-maxConsumerIdLen}s %${-maxHostLen}s 
%${-maxClientIdLen}s %-15s ".format(
+                memberAssignment.consumerId, memberAssignment.host, 
memberAssignment.clientId, memberAssignment.numPartitions))
+              if (verbose) {
+                val partitions = memberAssignment.assignment match {
+                  case List() => MISSING_COLUMN_VALUE
+                  case assignment =>
+                    assignment.groupBy(_.topic).map {
+                      case (topic, partitionList) => topic + 
partitionList.map(_.partition).sorted.mkString("(", ",", ")")
+                    }.toList.sorted.mkString(", ")
+                }
+                print("%s".format(partitions))
+              }
+              println()
+            }
+        }
+      }
+    }
+
+    private def printState(group: String, state: GroupState): Unit = {
+      // this method is reachable only for the new consumer option (where the 
given state is always defined)
+      if (shouldPrintMemberState(group, Some(state.state), Some(1))) {
+        val coordinator = 
s"${state.coordinator.host}:${state.coordinator.port} 
(${state.coordinator.idString})"
+        val coordinatorColLen = Math.max(25, coordinator.length)
+        print(s"\n%${-coordinatorColLen}s %-25s %-20s %s".format("COORDINATOR 
(ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"))
+        print(s"\n%${-coordinatorColLen}s %-25s %-20s %s".format(coordinator, 
state.assignmentStrategy, state.state, state.numMembers))
+        println()
+      }
+    }
+
+    def describeGroup(): Unit = {
+      val group = opts.options.valuesOf(opts.groupOpt).asScala.head
+      val membersOptPresent = opts.options.has(opts.membersOpt)
+      val stateOptPresent = opts.options.has(opts.stateOpt)
+      val offsetsOptPresent = opts.options.has(opts.offsetsOpt)
+      val subActions = Seq(membersOptPresent, offsetsOptPresent, 
stateOptPresent).count(_ == true)
+
+      if (subActions == 0 || offsetsOptPresent) {
+        val offsets = collectGroupOffsets()
+        printOffsets(group, offsets._1, offsets._2)
+      } else if (membersOptPresent) {
+        val members = collectGroupMembers(opts.options.has(opts.verboseOpt))
+        printMembers(group, members._1, members._2, 
opts.options.has(opts.verboseOpt))
+      } else
+        printState(group, collectGroupState())
     }
 
     def close(): Unit
@@ -179,7 +276,11 @@ object ConsumerGroupCommand extends Logging {
 
     protected def getLogEndOffset(topicPartition: TopicPartition): 
LogOffsetResult
 
-    protected def collectGroupAssignment(group: String): (Option[String], 
Option[Seq[PartitionAssignmentState]])
+    def collectGroupOffsets(): (Option[String], 
Option[Seq[PartitionAssignmentState]])
+
+    def collectGroupMembers(verbose: Boolean): (Option[String], 
Option[Seq[MemberAssignmentState]]) = throw new UnsupportedOperationException
+
+    def collectGroupState(): GroupState = throw new 
UnsupportedOperationException
 
     protected def collectConsumerAssignment(group: String,
                                             coordinator: Option[Node],
@@ -204,7 +305,7 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    protected def getLag(offset: Option[Long], logEndOffset: Option[Long]): 
Option[Long] =
+    private def getLag(offset: Option[Long], logEndOffset: Option[Long]): 
Option[Long] =
       offset.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))
 
     private def describePartition(group: String,
@@ -258,7 +359,8 @@ object ConsumerGroupCommand extends Logging {
         deleteAllForTopic()
     }
 
-    protected def collectGroupAssignment(group: String): (Option[String], 
Option[Seq[PartitionAssignmentState]]) = {
+    def collectGroupOffsets(): (Option[String], 
Option[Seq[PartitionAssignmentState]]) = {
+      val group = opts.options.valueOf(opts.groupOpt)
       val props = if (opts.options.has(opts.commandConfigOpt)) 
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new 
Properties()
       val channelSocketTimeoutMs = props.getProperty("channelSocketTimeoutMs", 
"600").toInt
       val channelRetryBackoffMs = 
props.getProperty("channelRetryBackoffMsOpt", "300").toInt
@@ -418,13 +520,14 @@ object ConsumerGroupCommand extends Logging {
     private val adminClient = createAdminClient()
 
     // `consumer` is only needed for `describe`, so we instantiate it lazily
-    private var consumer: KafkaConsumer[String, String] = null
+    private var consumer: KafkaConsumer[String, String] = _
 
     def listGroups(): List[String] = {
       adminClient.listAllConsumerGroupsFlattened().map(_.groupId)
     }
 
-    protected def collectGroupAssignment(group: String): (Option[String], 
Option[Seq[PartitionAssignmentState]]) = {
+    def collectGroupOffsets(): (Option[String], 
Option[Seq[PartitionAssignmentState]]) = {
+      val group = opts.options.valueOf(opts.groupOpt)
       val consumerGroupSummary = adminClient.describeConsumerGroup(group, 
opts.options.valueOf(opts.timeoutMsOpt))
       (Some(consumerGroupSummary.state),
         consumerGroupSummary.consumers match {
@@ -437,7 +540,7 @@ object ConsumerGroupCommand extends Logging {
               if (offsets.isEmpty)
                 List[PartitionAssignmentState]()
               else {
-                consumers.sortWith(_.assignment.size > 
_.assignment.size).flatMap { consumerSummary =>
+                
consumers.filter(_.assignment.nonEmpty).sortWith(_.assignment.size > 
_.assignment.size).flatMap { consumerSummary =>
                   val topicPartitions = consumerSummary.assignment.map(tp => 
TopicAndPartition(tp.topic, tp.partition))
                   assignedTopicPartitions = assignedTopicPartitions ++ 
consumerSummary.assignment
                   val partitionOffsets: Map[TopicAndPartition, Option[Long]] = 
consumerSummary.assignment.map { topicPartition =>
@@ -449,37 +552,51 @@ object ConsumerGroupCommand extends Logging {
                 }
               }
 
-            val rowsWithoutConsumer = offsets.filterNot {
-              case (topicPartition, offset) => 
assignedTopicPartitions.contains(topicPartition)
-              }.flatMap {
-                case (topicPartition, offset) =>
-                  val topicAndPartition = new TopicAndPartition(topicPartition)
-                  collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), Seq(topicAndPartition),
-                      Map(topicAndPartition -> Some(offset)), 
Some(MISSING_COLUMN_VALUE),
-                      Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
-                }
+            val rowsWithoutConsumer = 
offsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
+              case (topicPartition, offset) =>
+                val topicAndPartition = new TopicAndPartition(topicPartition)
+                collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), Seq(topicAndPartition),
+                    Map(topicAndPartition -> Some(offset)), 
Some(MISSING_COLUMN_VALUE),
+                    Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
+            }
 
             Some(rowsWithConsumer ++ rowsWithoutConsumer)
       }
       )
     }
 
+    override def collectGroupMembers(verbose: Boolean): (Option[String], 
Option[Seq[MemberAssignmentState]]) = {
+      val group = opts.options.valueOf(opts.groupOpt)
+      val consumerGroupSummary = adminClient.describeConsumerGroup(group, 
opts.options.valueOf(opts.timeoutMsOpt))
+      (Some(consumerGroupSummary.state),
+        consumerGroupSummary.consumers.map(_.map {
+          consumer => MemberAssignmentState(group, consumer.consumerId, 
consumer.host, consumer.clientId, consumer.assignment.length,
+            if (verbose) consumer.assignment else List())
+        })
+      )
+    }
+
+    override def collectGroupState(): GroupState = {
+      val group = opts.options.valueOf(opts.groupOpt)
+      val consumerGroupSummary = adminClient.describeConsumerGroup(group, 
opts.options.valueOf(opts.timeoutMsOpt))
+      GroupState(group, consumerGroupSummary.coordinator, 
consumerGroupSummary.assignmentStrategy,
+        consumerGroupSummary.state, consumerGroupSummary.consumers.get.size)
+    }
+
     protected def getLogEndOffset(topicPartition: TopicPartition): 
LogOffsetResult = {
-      val consumer = getConsumer()
-      val offsets = consumer.endOffsets(List(topicPartition).asJava)
+      val offsets = getConsumer.endOffsets(List(topicPartition).asJava)
       val logStartOffset = offsets.get(topicPartition)
       LogOffsetResult.LogOffset(logStartOffset)
     }
 
     protected def getLogStartOffset(topicPartition: TopicPartition): 
LogOffsetResult = {
-      val consumer = getConsumer()
-      val offsets = consumer.beginningOffsets(List(topicPartition).asJava)
+      val offsets = getConsumer.beginningOffsets(List(topicPartition).asJava)
       val logStartOffset = offsets.get(topicPartition)
       LogOffsetResult.LogOffset(logStartOffset)
     }
 
     protected def getLogTimestampOffset(topicPartition: TopicPartition, 
timestamp: java.lang.Long): LogOffsetResult = {
-      val consumer = getConsumer()
+      val consumer = getConsumer
       consumer.assign(List(topicPartition).asJava)
       val offsetsForTimes = consumer.offsetsForTimes(Map(topicPartition -> 
timestamp).asJava)
       if (offsetsForTimes != null && !offsetsForTimes.isEmpty && 
offsetsForTimes.get(topicPartition) != null)
@@ -500,7 +617,7 @@ object ConsumerGroupCommand extends Logging {
       AdminClient.create(props)
     }
 
-    private def getConsumer() = {
+    private def getConsumer = {
       if (consumer == null)
         consumer = createNewConsumer()
       consumer
@@ -531,7 +648,7 @@ object ConsumerGroupCommand extends Logging {
           val preparedOffsets = prepareOffsetsToReset(groupId, 
partitionsToReset)
           val dryRun = opts.options.has(opts.dryRunOpt)
           if (!dryRun)
-            getConsumer().commitSync(preparedOffsets.asJava)
+            getConsumer.commitSync(preparedOffsets.asJava)
           preparedOffsets
         case currentState =>
           printError(s"Assignments can only be reset if the group '$groupId' 
is inactive, but the current state is $currentState.")
@@ -544,7 +661,7 @@ object ConsumerGroupCommand extends Logging {
         val topicAndPartitions = topicArg.split(":")
         val topic = topicAndPartitions(0)
         topicAndPartitions(1).split(",").map(partition => new 
TopicPartition(topic, partition.toInt))
-      case topic => getConsumer().partitionsFor(topic).asScala
+      case topic => getConsumer.partitionsFor(topic).asScala
         .map(partitionInfo => new TopicPartition(topic, 
partitionInfo.partition))
     }
 
@@ -677,7 +794,7 @@ object ConsumerGroupCommand extends Logging {
         try {
           new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime)
         } catch {
-          case e: ParseException => new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime)
+          case _: ParseException => new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime)
         }
       }
       date.getTime
@@ -736,7 +853,16 @@ object ConsumerGroupCommand extends Logging {
     val ResetToEarliestDoc = "Reset offsets to earliest offset."
     val ResetToLatestDoc = "Reset offsets to latest offset."
     val ResetToCurrentDoc = "Reset offsets to current offset."
-    val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 
'n' can be positive or negative"
+    val ResetShiftByDoc = "Reset offsets shifting current offset by 'n', where 
'n' can be positive or negative."
+    val MembersDoc = "Describe members of the group. This option may be used 
with '--describe' and '--bootstrap-server' options only." + nl +
+      "Example: --bootstrap-server localhost:9092 --describe --group group1 
--members"
+    val VerboseDoc = "Provide additional information, if any, when describing 
the group. This option may be used " +
+      "with '--offsets'/'--members'/'--state' and '--bootstrap-server' options 
only." + nl + "Example: --bootstrap-server localhost:9092 --describe --group 
group1 --members --verbose"
+    val OffsetsDoc = "Describe the group and list all topic partitions in the 
group along with their offset lag. " +
+      "This is the default sub-action of and may be used with '--describe' and 
'--bootstrap-server' options only." + nl +
+      "Example: --bootstrap-server localhost:9092 --describe --group group1 
--offsets"
+    val StateDoc = "Describe the group state. This option may be used with 
'--describe' and '--bootstrap-server' options only." + nl +
+      "Example: --bootstrap-server localhost:9092 --describe --group group1 
--state"
 
     val parser = new OptionParser(false)
     val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
@@ -795,6 +921,20 @@ object ConsumerGroupCommand extends Logging {
                              .withRequiredArg()
                              .describedAs("number-of-offsets")
                              .ofType(classOf[Long])
+    val membersOpt = parser.accepts("members", MembersDoc)
+                           .availableIf(describeOpt)
+                           .availableUnless(zkConnectOpt)
+    val verboseOpt = parser.accepts("verbose", VerboseDoc)
+                           .availableIf(describeOpt)
+                           .availableUnless(zkConnectOpt)
+    val offsetsOpt = parser.accepts("offsets", OffsetsDoc)
+                           .availableIf(describeOpt)
+                           .availableUnless(zkConnectOpt)
+    val stateOpt = parser.accepts("state", StateDoc)
+                         .availableIf(describeOpt)
+                         .availableUnless(zkConnectOpt)
+    parser.mutuallyExclusive(membersOpt, offsetsOpt, stateOpt)
+
     val options = parser.parse(args : _*)
 
     val useOldConsumer = options.has(zkConnectOpt)
@@ -807,29 +947,30 @@ object ConsumerGroupCommand extends Logging {
     def checkArgs() {
       // check required args
       if (options.has(timeoutMsOpt) && (!describeOptPresent || useOldConsumer))
-        debug(s"Option '$timeoutMsOpt' is applicable only when both 
'$bootstrapServerOpt' and '$describeOpt' are used.")
+        debug(s"Option $timeoutMsOpt is applicable only when both 
$bootstrapServerOpt and $describeOpt are used.")
 
       if (useOldConsumer) {
         if (options.has(bootstrapServerOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option 
'$bootstrapServerOpt' is not valid with '$zkConnectOpt'.")
+          CommandLineUtils.printUsageAndDie(parser, s"Option 
$bootstrapServerOpt is not valid with $zkConnectOpt.")
         else if (options.has(newConsumerOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option '$newConsumerOpt' 
is not valid with '$zkConnectOpt'.")
+          CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt 
is not valid with $zkConnectOpt.")
       } else {
         CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
 
         if (options.has(newConsumerOpt)) {
-          Console.err.println("The --new-consumer option is deprecated and 
will be removed in a future major release." +
-            "The new consumer is used by default if the --bootstrap-server 
option is provided.")
+          Console.err.println(s"The $newConsumerOpt option is deprecated and 
will be removed in a future major release." +
+            s"The new consumer is used by default if the $bootstrapServerOpt 
option is provided.")
         }
 
         if (options.has(deleteOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option '$deleteOpt' is 
only valid with '$zkConnectOpt'. Note that " +
-            "there's no need to delete group metadata for the new consumer as 
the group is deleted when the last " +
+          CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is 
only valid with $zkConnectOpt. Note that " +
+            "there is no need to delete group metadata for the new consumer as 
the group is deleted when the last " +
             "committed offset for that group expires.")
       }
 
       if (describeOptPresent)
         CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
+
       if (options.has(deleteOpt) && !options.has(groupOpt) && 
!options.has(topicOpt))
         CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, 
%s, or both".format(deleteOpt, groupOpt, topicOpt))
       if (options.has(resetOffsetsOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/52978663/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala 
b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index e367372..11f2865 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -22,29 +22,36 @@ import java.util.concurrent.TimeUnit
 import java.util.Collections
 import java.util.Properties
 
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, 
ConsumerGroupService, KafkaConsumerGroupService, ZkConsumerGroupService}
 import kafka.consumer.OldConsumer
 import kafka.consumer.Whitelist
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
 
 class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   private val topic = "foo"
   private val group = "test.group"
 
+  private val describeTypeOffsets = Array(Array(""), Array("--offsets"))
+  private val describeTypeMembers = Array(Array("--members"), 
Array("--members", "--verbose"))
+  private val describeTypeState = Array(Array("--state"))
+  private val describeTypes = describeTypeOffsets ++ describeTypeMembers ++ 
describeTypeState
+
   @deprecated("This field will be removed in a future release", "0.11.0.0")
   private val oldConsumers = new ArrayBuffer[OldConsumer]
-  private var consumerGroupService: ConsumerGroupService = _
-  private var consumerGroupExecutor: ConsumerGroupExecutor = _
+  private var consumerGroupService: List[ConsumerGroupService] = List()
+  private var consumerGroupExecutor: List[ConsumerGroupExecutor] = List()
 
   // configure the servers and clients
   override def generateConfigs = {
@@ -61,33 +68,30 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
 
   @After
   override def tearDown(): Unit = {
-    if (consumerGroupService != null)
-      consumerGroupService.close()
-    if (consumerGroupExecutor != null)
-      consumerGroupExecutor.shutdown()
+    consumerGroupService.foreach(_.close)
+    consumerGroupExecutor.foreach(_.shutdown)
     oldConsumers.foreach(_.stop())
     super.tearDown()
   }
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future 
release.", "0.11.0.0")
-  def testDescribeNonExistingGroup() {
+  def testDescribeNonExistingGroupWithOldConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, 
"--describe", "--group", "missing.group"))
-    consumerGroupService = new ZkConsumerGroupService(opts)
-    TestUtils.waitUntilTrue(() => 
consumerGroupService.describeGroup()._2.isEmpty, "Expected no rows in describe 
group results.")
+    val service = getConsumerGroupService(Array("--zookeeper", zkConnect, 
"--describe", "--group", "missing.group"))
+    TestUtils.waitUntilTrue(() => service.collectGroupOffsets()._2.isEmpty, 
"Expected no rows in describe group results.")
   }
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future 
release.", "0.11.0.0")
-  def testDescribeExistingGroup() {
+  def testDescribeExistingGroupWithOldConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, 
"--describe", "--group", group))
-    consumerGroupService = new ZkConsumerGroupService(opts)
+    val service = getConsumerGroupService(Array("--zookeeper", zkConnect, 
"--describe", "--group", group))
+
     TestUtils.waitUntilTrue(() => {
-      val (_, assignments) = consumerGroupService.describeGroup()
+      val (_, assignments) = service.collectGroupOffsets()
       assignments.isDefined &&
       assignments.get.count(_.group == group) == 1 &&
       assignments.get.filter(_.group == group).head.consumerId.exists(_.trim 
!= ConsumerGroupCommand.MISSING_COLUMN_VALUE)
@@ -96,14 +100,13 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future 
release.", "0.11.0.0")
-  def testDescribeExistingGroupWithNoMembers() {
+  def testDescribeExistingGroupWithNoMembersWithOldConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, 
"--describe", "--group", group))
-    consumerGroupService = new ZkConsumerGroupService(opts)
+    val service = getConsumerGroupService(Array("--zookeeper", zkConnect, 
"--describe", "--group", group))
 
     TestUtils.waitUntilTrue(() => {
-      val (_, assignments) = consumerGroupService.describeGroup()
+      val (_, assignments) = service.collectGroupOffsets()
       assignments.isDefined &&
       assignments.get.count(_.group == group) == 1 &&
       assignments.get.filter(_.group == group).head.consumerId.exists(_.trim 
!= ConsumerGroupCommand.MISSING_COLUMN_VALUE)
@@ -111,7 +114,7 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
     oldConsumers.head.stop()
 
     TestUtils.waitUntilTrue(() => {
-      val (_, assignments) = consumerGroupService.describeGroup()
+      val (_, assignments) = service.collectGroupOffsets()
       assignments.isDefined &&
       assignments.get.count(_.group == group) == 1 &&
       assignments.get.filter(_.group == group).head.consumerId.exists(_.trim 
== ConsumerGroupCommand.MISSING_COLUMN_VALUE) // the member should be gone
@@ -120,14 +123,14 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
 
   @Test
   @deprecated("This test has been deprecated and will be removed in a future 
release.", "0.11.0.0")
-  def testDescribeConsumersWithNoAssignedPartitions() {
+  def testDescribeConsumersWithNoAssignedPartitionsWithOldConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     createOldConsumer()
     createOldConsumer()
-    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, 
"--describe", "--group", group))
-    consumerGroupService = new ZkConsumerGroupService(opts)
+    val service = getConsumerGroupService(Array("--zookeeper", zkConnect, 
"--describe", "--group", group))
+
     TestUtils.waitUntilTrue(() => {
-      val (_, assignments) = consumerGroupService.describeGroup()
+      val (_, assignments) = service.collectGroupOffsets()
       assignments.isDefined &&
       assignments.get.count(_.group == group) == 2 &&
       assignments.get.count { x => x.group == group && x.partition.isDefined } 
== 1 &&
@@ -136,74 +139,242 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
   }
 
   @Test
-  def testDescribeNonExistingGroupWithNewConsumer() {
+  def testDescribeNonExistingGroup() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
+    val missingGroup = "missing.group"
+
+    for (describeType <- describeTypes) {
+      // note the group to be queried is a different (non-existing) group
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", missingGroup) ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      val output = TestUtils.grabConsoleOutput(service.describeGroup())
+      assertTrue(s"Expected error was not detected for describe option 
'${describeType.mkString(" ")}'",
+          output.contains(s"Consumer group '$missingGroup' does not exist."))
+    }
+  }
+
+  @Test(expected = classOf[joptsimple.OptionException])
+  def testDescribeWithMultipleSubActions() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group, "--members", "--state")
+    getConsumerGroupService(cgcArgs)
+    fail("Expected an error due to presence of mutually exclusive options")
+  }
+
+  @Test
+  def testDescribeOffsetsOfNonExistingGroup() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, 
topic)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
+    // note the group to be queried is a different (non-existing) group
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", "missing.group")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val (state, assignments) = service.collectGroupOffsets()
+    assertTrue(s"Expected the state to be 'Dead', with no members in the group 
'$group'.",
+        state == Some("Dead") && assignments == Some(List()))
+  }
+
+  @Test
+  def testDescribeMembersOfNonExistingGroup() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
 
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
     // note the group to be queried is a different (non-existing) group
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", "missing.group")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val service = getConsumerGroupService(cgcArgs)
 
-    val (state, assignments) = consumerGroupService.describeGroup()
-    assertTrue("Expected the state to be 'Dead' with no members in the 
group.", state == Some("Dead") && assignments == Some(List()))
+    val (state, assignments) = service.collectGroupMembers(false)
+    assertTrue(s"Expected the state to be 'Dead', with no members in the group 
'$group'.",
+        state == Some("Dead") && assignments == Some(List()))
+
+    val (state2, assignments2) = service.collectGroupMembers(true)
+    assertTrue(s"Expected the state to be 'Dead', with no members in the group 
'$group' (verbose option).",
+        state2 == Some("Dead") && assignments2 == Some(List()))
   }
 
   @Test
-  def testDescribeExistingGroupWithNewConsumer() {
+  def testDescribeStateOfNonExistingGroup() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
+
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, 
topic)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
+    // note the group to be queried is a different (non-existing) group
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", "missing.group")
+    val service = getConsumerGroupService(cgcArgs)
+
+    val state = service.collectGroupState()
+    assertTrue(s"Expected the state to be 'Dead', with no members in the group 
'$group'.",
+        state.state == "Dead" && state.numMembers == 0 &&
+        state.coordinator != null && 
servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+    )
+  }
+
+  @Test
+  def testDescribeExistingGroup() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    for (describeType <- describeTypes) {
+      val group = this.group + describeType.mkString("")
+      // run one consumer in the group consuming from a single-partition topic
+      addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group) ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      TestUtils.waitUntilTrue(() => {
+        val (output, error) = 
TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        output.trim.split("\n").length == 2 && error.isEmpty
+      }, s"Expected a data row and no error in describe results with describe 
type ${describeType.mkString(" ")}.")
+    }
+  }
 
+  @Test
+  def testDescribeOffsetsOfExistingGroup() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-        val (state, assignments) = consumerGroupService.describeGroup()
-        state == Some("Stable") &&
+      val (state, assignments) = service.collectGroupOffsets()
+      state == Some("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 1 &&
         assignments.get.filter(_.group == group).head.consumerId.exists(_.trim 
!= ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.clientId.exists(_.trim 
!= ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.host.exists(_.trim != 
ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-    }, "Expected a 'Stable' group status, rows and valid values for consumer 
id / client id / host columns in describe group results.")
+    }, s"Expected a 'Stable' group status, rows and valid values for consumer 
id / client id / host columns in describe results for group $group.")
+  }
+
+  @Test
+  def testDescribeMembersOfExistingGroup() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupMembers(false)
+      state == Some("Stable") &&
+        (assignments match {
+          case Some(memberAssignments) =>
+            memberAssignments.count(_.group == group) == 1 &&
+              memberAssignments.filter(_.group == group).head.consumerId != 
ConsumerGroupCommand.MISSING_COLUMN_VALUE &&
+              memberAssignments.filter(_.group == group).head.clientId != 
ConsumerGroupCommand.MISSING_COLUMN_VALUE &&
+              memberAssignments.filter(_.group == group).head.host != 
ConsumerGroupCommand.MISSING_COLUMN_VALUE
+          case None =>
+            false
+        })
+    }, s"Expected a 'Stable' group status, rows and valid member information 
for group $group.")
+
+    val (state, assignments) = service.collectGroupMembers(true)
+    assignments match {
+      case None =>
+        fail(s"Expected partition assignments for members of group $group")
+      case Some(memberAssignments) =>
+        assertTrue(s"Expected a topic partition assigned to the single group 
member for group $group",
+          memberAssignments.size == 1 &&
+          memberAssignments.head.assignment.size == 1)
+    }
   }
 
   @Test
-  def testDescribeExistingGroupWithNoMembersWithNewConsumer() {
+  def testDescribeStateOfExistingGroup() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
+
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, 
topic)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val service = getConsumerGroupService(cgcArgs)
 
+    TestUtils.waitUntilTrue(() => {
+      val state = service.collectGroupState()
+      state.state == "Stable" &&
+        state.numMembers == 1 &&
+        state.assignmentStrategy == "range" &&
+        state.coordinator != null &&
+        servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+    }, s"Expected a 'Stable' group status, with one member and round robin 
assignment strategy for group $group.")
+  }
+
+  @Test
+  def testDescribeStateOfExistingGroupWithRoundRobinAssignor() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic, "org.apache.kafka.clients.consumer.RoundRobinAssignor"))
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, _) = consumerGroupService.describeGroup()
-      state == Some("Stable")
-    }, "Expected the group to initially become stable.")
+      val state = service.collectGroupState()
+      state.state == "Stable" &&
+        state.numMembers == 1 &&
+        state.assignmentStrategy == "roundrobin" &&
+        state.coordinator != null &&
+        servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+    }, s"Expected a 'Stable' group status, with one member and round robin 
assignment strategy for group $group.")
+  }
+
+  @Test
+  def testDescribeExistingGroupWithNoMembers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    for (describeType <- describeTypes) {
+      val group = this.group + describeType.mkString("")
+      // run one consumer in the group consuming from a single-partition topic
+      val executor = addConsumerGroupExecutor(new 
ConsumerGroupExecutor(brokerList, 1, group, topic))
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group) ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      TestUtils.waitUntilTrue(() => {
+        val (output, error) = 
TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        output.trim.split("\n").length == 2 && error.isEmpty
+      }, s"Expected describe group results with one data row for describe type 
'${describeType.mkString(" ")}'")
+
+      // stop the consumer so the group has no active member anymore
+      executor.shutdown()
+
+      TestUtils.waitUntilTrue(() => {
+        
TestUtils.grabConsoleError(service.describeGroup()).contains(s"Consumer group 
'$group' has no active members.")
+      }, s"Expected no active member in describe group results with describe 
type ${describeType.mkString(" ")}")
+    }
+  }
+
+  @Test
+  def testDescribeOffsetsOfExistingGroupWithNoMembers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    val executor = addConsumerGroupExecutor(new 
ConsumerGroupExecutor(brokerList, 1, group, topic))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val service = getConsumerGroupService(cgcArgs)
 
-    // Group assignments in describeGroup rely on finding committed consumer 
offsets.
-    // Wait for an offset commit before shutting down the group executor.
     TestUtils.waitUntilTrue(() => {
-      val (_, assignments) = consumerGroupService.describeGroup()
-      assignments.exists(_.exists(_.group == group))
-    }, "Expected to find group in assignments after initial offset commit")
+      val (state, assignments) = service.collectGroupOffsets()
+      state == Some("Stable") && assignments.exists(_.exists(_.group == group))
+    }, "Expected the group to initially become stable, and to find group in 
assignments after initial offset commit.")
 
     // stop the consumer so the group has no active member anymore
-    consumerGroupExecutor.shutdown()
-
-    val (result, succeeded) = 
TestUtils.computeUntilTrue(consumerGroupService.describeGroup()) { case (state, 
assignments) =>
-      val testGroupAssignments = assignments.toSeq.flatMap(_.filter(_.group == 
group))
-      def assignment = testGroupAssignments.head
-      state == Some("Empty") &&
-        testGroupAssignments.size == 1 &&
-        assignment.consumerId.exists(_.trim == 
ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
-        assignment.clientId.exists(_.trim == 
ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
-        assignment.host.exists(_.trim == 
ConsumerGroupCommand.MISSING_COLUMN_VALUE)
+    executor.shutdown()
+
+    val (result, succeeded) = 
TestUtils.computeUntilTrue(service.collectGroupOffsets()) {
+      case (state, assignments) =>
+        val testGroupAssignments = assignments.toSeq.flatMap(_.filter(_.group 
== group))
+        def assignment = testGroupAssignments.head
+        state == Some("Empty") &&
+          testGroupAssignments.size == 1 &&
+          assignment.consumerId.exists(_.trim == 
ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
+          assignment.clientId.exists(_.trim == 
ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
+          assignment.host.exists(_.trim == 
ConsumerGroupCommand.MISSING_COLUMN_VALUE)
     }
     val (state, assignments) = result
     assertTrue(s"Expected no active member in describe group results, state: 
$state, assignments: $assignments",
@@ -211,63 +382,303 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
   }
 
   @Test
-  def testDescribeConsumersWithNoAssignedPartitionsWithNewConsumer() {
+  def testDescribeMembersOfExistingGroupWithNoMembers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    val executor = addConsumerGroupExecutor(new 
ConsumerGroupExecutor(brokerList, 1, group, topic))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupMembers(false)
+      state == Some("Stable") && assignments.exists(_.exists(_.group == group))
+    }, "Expected the group to initially become stable, and to find group in 
assignments after initial offset commit.")
+
+    // stop the consumer so the group has no active member anymore
+    executor.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupMembers(false)
+      state == Some("Empty") && assignments.isDefined && 
assignments.get.isEmpty
+    }, s"Expected no member in describe group members results for group 
'$group'")
+  }
+
+  @Test
+  def testDescribeStateOfExistingGroupWithNoMembers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run one consumer in the group consuming from a single-partition topic
+    val executor = addConsumerGroupExecutor(new 
ConsumerGroupExecutor(brokerList, 1, group, topic))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val state = service.collectGroupState()
+      state.state == "Stable" &&
+        state.numMembers == 1 &&
+        state.coordinator != null &&
+        servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
+    }, s"Expected the group '$group' to initially become stable, and have a 
single member.")
+
+    // stop the consumer so the group has no active member anymore
+    executor.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+      val state = service.collectGroupState()
+      state.state == "Empty" && state.numMembers == 0 && 
state.assignmentStrategy == ""
+    }, s"Expected the group '$group' to become empty after the only member 
leaving.")
+  }
+
+  @Test
+  def testDescribeWithConsumersWithoutAssignedPartitions() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    for (describeType <- describeTypes) {
+      val group = this.group + describeType.mkString("")
+      // run one consumer in the group consuming from a single-partition topic
+      val executor = addConsumerGroupExecutor(new 
ConsumerGroupExecutor(brokerList, 2, group, topic))
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group) ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      TestUtils.waitUntilTrue(() => {
+        val (output, error) = 
TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        val expectedNumRows = if (describeTypeMembers.contains(describeType)) 
3 else 2
+        error.isEmpty && output.trim.split("\n").size == expectedNumRows
+      }, s"Expected a single data row in describe group result with describe 
type '${describeType.mkString(" ")}'")
+    }
+  }
+
+  @Test
+  def testDescribeOffsetsWithConsumersWithoutAssignedPartitions() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
     // run two consumers in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, 
topic)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, 
topic))
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = consumerGroupService.describeGroup()
+      val (state, assignments) = service.collectGroupOffsets()
+      state == Some("Stable") &&
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 1 &&
+        assignments.get.count { x => x.group == group && x.partition.isDefined 
} == 1
+    }, "Expected rows for consumers with no assigned partitions in describe 
group results")
+  }
+
+  @Test
+  def testDescribeMembersWithConsumersWithoutAssignedPartitions() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run two consumers in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, 
topic))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupMembers(false)
       state == Some("Stable") &&
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 2 &&
-        assignments.get.count { x => x.group == group && x.partition.isDefined 
} == 1 &&
-        assignments.get.count { x => x.group == group && x.partition.isEmpty } 
== 1
+        assignments.get.count { x => x.group == group && x.numPartitions == 1 
} == 1 &&
+        assignments.get.count { x => x.group == group && x.numPartitions == 0 
} == 1 &&
+        assignments.get.count(_.assignment.size > 0) == 0
     }, "Expected rows for consumers with no assigned partitions in describe 
group results")
+
+    val (state, assignments) = service.collectGroupMembers(true)
+    assertTrue("Expected additional columns in verbose vesion of describe 
members",
+        state == Some("Stable") && 
assignments.get.count(_.assignment.nonEmpty) > 0)
+  }
+
+  @Test
+  def testDescribeStateWithConsumersWithoutAssignedPartitions() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+
+    // run two consumers in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, 
topic))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val state = service.collectGroupState()
+      state.state == "Stable" && state.numMembers == 2
+    }, "Expected two consumers in describe group results")
   }
 
   @Test
-  def testDescribeWithMultiPartitionTopicAndMultipleConsumersWithNewConsumer() 
{
+  def testDescribeWithMultiPartitionTopicAndMultipleConsumers() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     val topic2 = "foo2"
     adminZkClient.createTopic(topic2, 2, 1)
 
+    for (describeType <- describeTypes) {
+      val group = this.group + describeType.mkString("")
+      // run two consumers in the group consuming from a two-partition topic
+      addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, 
topic2))
+      val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group) ++ describeType
+      val service = getConsumerGroupService(cgcArgs)
+
+      TestUtils.waitUntilTrue(() => {
+        val (output, error) = 
TestUtils.grabConsoleOutputAndError(service.describeGroup())
+        val expectedNumRows = if (describeTypeState.contains(describeType)) 2 
else 3
+        error.isEmpty && output.trim.split("\n").size == expectedNumRows
+      }, s"Expected a single data row in describe group result with describe 
type '${describeType.mkString(" ")}'")
+    }
+  }
+
+  @Test
+  def testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    val topic2 = "foo2"
+    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+
     // run two consumers in the group consuming from a two-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, 
topic2)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, 
topic2))
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val service = getConsumerGroupService(cgcArgs)
 
     TestUtils.waitUntilTrue(() => {
-      val (state, assignments) = consumerGroupService.describeGroup()
+      val (state, assignments) = service.collectGroupOffsets()
       state == Some("Stable") &&
-      assignments.isDefined &&
-      assignments.get.count(_.group == group) == 2 &&
-      assignments.get.count{ x => x.group == group && x.partition.isDefined} 
== 2 &&
-      assignments.get.count{ x => x.group == group && x.partition.isEmpty} == 0
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 2 &&
+        assignments.get.count{ x => x.group == group && x.partition.isDefined} 
== 2 &&
+        assignments.get.count{ x => x.group == group && x.partition.isEmpty} 
== 0
     }, "Expected two rows (one row per consumer) in describe group results.")
   }
 
   @Test
-  def testDescribeGroupWithNewConsumerWithShortInitializationTimeout() {
-    // Let creation of the offsets topic happen during group initialisation to 
ensure that initialization doesn't
+  def testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    val topic2 = "foo2"
+    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+
+    // run two consumers in the group consuming from a two-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, 
topic2))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val (state, assignments) = service.collectGroupMembers(false)
+      state == Some("Stable") &&
+        assignments.isDefined &&
+        assignments.get.count(_.group == group) == 2 &&
+        assignments.get.count{ x => x.group == group && x.numPartitions == 1 } 
== 2 &&
+        assignments.get.count{ x => x.group == group && x.numPartitions == 0 } 
== 0
+    }, "Expected two rows (one row per consumer) in describe group members 
results.")
+
+    val (state, assignments) = service.collectGroupMembers(true)
+    assertTrue("Expected additional columns in verbose vesion of describe 
members",
+        state == Some("Stable") && assignments.get.count(_.assignment.isEmpty) 
== 0)
+  }
+
+  @Test
+  def testDescribeStateWithMultiPartitionTopicAndMultipleConsumers() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
+    val topic2 = "foo2"
+    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+
+    // run two consumers in the group consuming from a two-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, 
topic2))
+
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group)
+    val service = getConsumerGroupService(cgcArgs)
+
+    TestUtils.waitUntilTrue(() => {
+      val state = service.collectGroupState()
+      state.state == "Stable" && state.group == group && state.numMembers == 2
+    }, "Expected a stable group with two members in describe group state 
result.")
+  }
+
+  @Test
+  def testDescribeGroupWithShortInitializationTimeout() {
+    // Let creation of the offsets topic happen during group initialization to 
ensure that initialization doesn't
+    // complete before the timeout expires
+
+    val describeType = describeTypes(Random.nextInt(describeTypes.length))
+    val group = this.group + describeType.mkString("")
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
+    // set the group initialization timeout too low for the group to stabilize
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--timeout", "1", "--group", group) ++ describeType
+    val service = getConsumerGroupService(cgcArgs)
+
+    try {
+      TestUtils.grabConsoleOutputAndError(service.describeGroup())
+      fail(s"The consumer group command should have failed due to low 
initialization timeout (describe type: ${describeType.mkString(" ")})")
+    } catch {
+      case _: TimeoutException => // OK
+    }
+  }
+
+  @Test
+  def testDescribeGroupOffsetsWithShortInitializationTimeout() {
+    // Let creation of the offsets topic happen during group initialization to 
ensure that initialization doesn't
+    // complete before the timeout expires
+
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
+
+    // set the group initialization timeout too low for the group to stabilize
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group, "--timeout", "1")
+    val service = getConsumerGroupService(cgcArgs)
+
+    try {
+      service.collectGroupOffsets()
+      fail("The consumer group command should fail due to low initialization 
timeout")
+    } catch {
+      case _: TimeoutException => // OK
+    }
+  }
+
+  @Test
+  def testDescribeGroupMembersWithShortInitializationTimeout() {
+    // Let creation of the offsets topic happen during group initialization to 
ensure that initialization doesn't
     // complete before the timeout expires
 
     // run one consumer in the group consuming from a single-partition topic
-    consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, 
topic)
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
 
     // set the group initialization timeout too low for the group to stabilize
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", "group", "--timeout", "1")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    consumerGroupService = new KafkaConsumerGroupService(opts)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group, "--timeout", "1")
+    val service = getConsumerGroupService(cgcArgs)
 
     try {
-      consumerGroupService.describeGroup()
+      service.collectGroupMembers(false)
+      fail("The consumer group command should fail due to low initialization 
timeout")
+    } catch {
+      case _: TimeoutException => // OK
+        try {
+          service.collectGroupMembers(true)
+          fail("The consumer group command should fail due to low 
initialization timeout (verbose)")
+        } catch {
+          case _: TimeoutException => // OK
+        }
+    }
+  }
+
+  @Test
+  def testDescribeGroupStateWithShortInitializationTimeout() {
+    // Let creation of the offsets topic happen during group initialization to 
ensure that initialization doesn't
+    // complete before the timeout expires
+
+    // run one consumer in the group consuming from a single-partition topic
+    addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 1, group, 
topic))
+
+    // set the group initialization timeout too low for the group to stabilize
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", 
"--group", group, "--timeout", "1")
+    val service = getConsumerGroupService(cgcArgs)
+
+    try {
+      service.collectGroupState()
       fail("The consumer group command should fail due to low initialization 
timeout")
     } catch {
       case _: TimeoutException => // OK
@@ -281,15 +692,29 @@ class DescribeConsumerGroupTest extends 
KafkaServerTestHarness {
     consumerProps.setProperty("zookeeper.connect", zkConnect)
     oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
   }
+
+  def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
+    val opts = new ConsumerGroupCommandOptions(args)
+    val service = if (opts.useOldConsumer) new ZkConsumerGroupService(opts) 
else new KafkaConsumerGroupService(opts)
+    consumerGroupService = service :: consumerGroupService
+    service
+  }
+
+  def addConsumerGroupExecutor(executor: ConsumerGroupExecutor): 
ConsumerGroupExecutor = {
+    consumerGroupExecutor = executor :: consumerGroupExecutor
+    executor
+  }
 }
 
 
-class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) 
extends Runnable {
+class ConsumerThread(broker: String, id: Int, groupId: String, topic: String, 
strategy: String)
+    extends Runnable {
   val props = new Properties
   props.put("bootstrap.servers", broker)
   props.put("group.id", groupId)
   props.put("key.deserializer", classOf[StringDeserializer].getName)
   props.put("value.deserializer", classOf[StringDeserializer].getName)
+  props.put("partition.assignment.strategy", strategy)
   val consumer = new KafkaConsumer(props)
 
   def run() {
@@ -310,11 +735,11 @@ class ConsumerThread(broker: String, id: Int, groupId: 
String, topic: String) ex
 }
 
 
-class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: 
String, topic: String) {
+class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: 
String, topic: String, strategy: String = 
"org.apache.kafka.clients.consumer.RangeAssignor") {
   val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers)
   private val consumers = new ArrayBuffer[ConsumerThread]()
   for (i <- 1 to numConsumers) {
-    val consumer = new ConsumerThread(broker, i, groupId, topic)
+    val consumer = new ConsumerThread(broker, i, groupId, topic, strategy)
     consumers += consumer
     executor.submit(consumer)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/52978663/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala 
b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index f26d3c4..725073e 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -119,7 +119,7 @@ class ResetConsumerGroupOffsetTest extends 
KafkaServerTestHarness {
     val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1)
 
     TestUtils.waitUntilTrue(() => {
-      val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+      val (_, assignmentsOption) = consumerGroupCommand.collectGroupOffsets()
       assignmentsOption match {
         case Some(assignments) =>
           val sumOffset = assignments.filter(_.topic.exists(_ == topic1))
@@ -164,7 +164,7 @@ class ResetConsumerGroupOffsetTest extends 
KafkaServerTestHarness {
     val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1)
 
     TestUtils.waitUntilTrue(() => {
-      val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+      val (_, assignmentsOption) = consumerGroupCommand.collectGroupOffsets()
       assignmentsOption match {
         case Some(assignments) =>
           val sumOffset = (assignments.filter(_.topic.exists(_ == topic1))
@@ -330,7 +330,7 @@ class ResetConsumerGroupOffsetTest extends 
KafkaServerTestHarness {
     val executor = createConsumerGroupExecutor(brokerList, numConsumers, 
group, topic)
 
     TestUtils.waitUntilTrue(() => {
-      val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
+      val (_, assignmentsOption) = consumerGroupCommand.collectGroupOffsets()
       assignmentsOption match {
         case Some(assignments) =>
           val sumOffset = assignments.filter(_.topic.exists(_ == topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/52978663/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1da2d7b..bb15e7b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1503,10 +1503,34 @@ object TestUtils extends Logging {
   def grabConsoleOutput(f: => Unit) : String = {
     val out = new ByteArrayOutputStream
     try scala.Console.withOut(out)(f)
-    finally scala.Console.out.flush
+    finally scala.Console.out.flush()
     out.toString
   }
 
+  /**
+   * Capture the console error during the execution of the provided function.
+   */
+  def grabConsoleError(f: => Unit) : String = {
+    val err = new ByteArrayOutputStream
+    try scala.Console.withErr(err)(f)
+    finally scala.Console.err.flush()
+    err.toString
+  }
+
+  /**
+   * Capture both the console output and console error during the execution of 
the provided function.
+   */
+  def grabConsoleOutputAndError(f: => Unit) : (String, String) = {
+    val out = new ByteArrayOutputStream
+    val err = new ByteArrayOutputStream
+    try scala.Console.withOut(out)(scala.Console.withErr(err)(f))
+    finally {
+      scala.Console.out.flush()
+      scala.Console.err.flush()
+    }
+    (out.toString, err.toString)
+  }
+
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {

Reply via email to