Repository: kafka
Updated Branches:
  refs/heads/trunk c4b8a7eab -> 198302fee


KAFKA-4416; Add a `--group` option to console consumer

This simplifies running a console consumer as part of a custom group. Note that 
group id can be provided via only one of the three possible means: directly 
using `--group ` option (as part of this PR), as property via 
`--consumer-property` option,  or inside a config file via `--consumer.config` 
option.

Author: Vahid Hashemian <vahidhashem...@us.ibm.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Apurva Mehta <apu...@confluent.io>, 
Jason Gustafson <ja...@confluent.io>

Closes #2150 from vahidhashemian/KAFKA-4416


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/198302fe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/198302fe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/198302fe

Branch: refs/heads/trunk
Commit: 198302feecf17cd1eff38e6d93e807d7b04922f2
Parents: c4b8a7e
Author: Vahid Hashemian <vahidhashem...@us.ibm.com>
Authored: Wed Oct 4 08:50:56 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Oct 4 08:50:56 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     |  27 ++++-
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 112 ++++++++++++++++++-
 2 files changed, 134 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/198302fe/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index c014caf..8b20601 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -340,6 +340,10 @@ object ConsoleConsumer extends Logging {
       .ofType(classOf[String])
       .defaultsTo("read_uncommitted")
 
+    val groupIdOpt = parser.accepts("group", "The consumer group id of the 
consumer.")
+      .withRequiredArg
+      .describedAs("consumer group id")
+      .ofType(classOf[String])
 
     if (args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "The console consumer is a 
tool that reads data from Kafka and outputs it to standard output.")
@@ -456,10 +460,25 @@ object ConsoleConsumer extends Logging {
       KafkaMetricsReporter.startReporters(verifiableProps)
     }
 
-    //Provide the consumer with a randomly assigned group id
-    if (!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-      consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
s"console-consumer-${new Random().nextInt(100000)}")
-      groupIdPassed = false
+    // if the group id is provided in more than place (through different 
means) all values must be the same
+    val groupIdsProvided = Set(
+        Option(options.valueOf(groupIdOpt)),                           // via 
--group
+        Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)),     // via 
--consumer-property
+        Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via 
--cosumer.config
+      ).flatten
+
+    if (groupIdsProvided.size > 1) {
+      CommandLineUtils.printUsageAndDie(parser, "The group ids provided in 
different places (directly using '--group', "
+                                              + "via '--consumer-property', or 
via '--consumer.config') do not match. "
+                                              + s"Detected group ids: 
${groupIdsProvided.mkString("'", "', '", "'")}")
+    }
+
+    groupIdsProvided.headOption match {
+      case Some(group) =>
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group)
+      case None =>
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
s"console-consumer-${new Random().nextInt(100000)}")
+        groupIdPassed = false
     }
 
     def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/198302fe/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 9fbd3df..364f6b3 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -368,7 +368,8 @@ class ConsoleConsumerTest {
   def shouldParseConfigsFromFile() {
     val propsFile = TestUtils.tempFile()
     val propsStream = new FileOutputStream(propsFile)
-    propsStream.write("request.timeout.ms=1000".getBytes())
+    propsStream.write("request.timeout.ms=1000\n".getBytes())
+    propsStream.write("group.id=group1".getBytes())
     propsStream.close()
     val args: Array[String] = Array(
       "--bootstrap-server", "localhost:9092",
@@ -379,5 +380,114 @@ class ConsoleConsumerTest {
     val config = new ConsoleConsumer.ConsumerConfig(args)
 
     assertEquals("1000", 
config.consumerProps.getProperty("request.timeout.ms"))
+    assertEquals("group1", config.consumerProps.getProperty("group.id"))
+  }
+
+  @Test
+  def groupIdsProvidedInDifferentPlacesMustMatch() {
+    Exit.setExitProcedure((_, message) => throw new 
IllegalArgumentException(message.orNull))
+
+    // different in all three places
+    var propsFile = TestUtils.tempFile()
+    var propsStream = new FileOutputStream(propsFile)
+    propsStream.write("group.id=group-from-file".getBytes())
+    propsStream.close()
+    var args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "group-from-arguments",
+      "--consumer-property", "group.id=group-from-properties",
+      "--consumer.config", propsFile.getAbsolutePath
+    )
+
+    try {
+      new ConsoleConsumer.ConsumerConfig(args)
+      fail("Expected groups ids provided in different places to match")
+    } catch {
+      case e: IllegalArgumentException => //OK
+    }
+
+    // the same in all three places
+    propsFile = TestUtils.tempFile()
+    propsStream = new FileOutputStream(propsFile)
+    propsStream.write("group.id=test-group".getBytes())
+    propsStream.close()
+    args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "test-group",
+      "--consumer-property", "group.id=test-group",
+      "--consumer.config", propsFile.getAbsolutePath
+    )
+
+    var config = new ConsoleConsumer.ConsumerConfig(args)
+    var props = ConsoleConsumer.getNewConsumerProps(config)
+    assertEquals("test-group", props.getProperty("group.id"))
+
+    // different via --consumer-property and --consumer.config
+    propsFile = TestUtils.tempFile()
+    propsStream = new FileOutputStream(propsFile)
+    propsStream.write("group.id=group-from-file".getBytes())
+    propsStream.close()
+    args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--consumer-property", "group.id=group-from-properties",
+      "--consumer.config", propsFile.getAbsolutePath
+    )
+
+    try {
+      new ConsoleConsumer.ConsumerConfig(args)
+      fail("Expected groups ids provided in different places to match")
+    } catch {
+      case e: IllegalArgumentException => //OK
+    }
+
+    // different via --consumer-property and --group
+    args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "group-from-arguments",
+      "--consumer-property", "group.id=group-from-properties"
+    )
+
+    try {
+      new ConsoleConsumer.ConsumerConfig(args)
+      fail("Expected groups ids provided in different places to match")
+    } catch {
+      case e: IllegalArgumentException => //OK
+    }
+
+    // different via --group and --consumer.config
+    propsFile = TestUtils.tempFile()
+    propsStream = new FileOutputStream(propsFile)
+    propsStream.write("group.id=group-from-file".getBytes())
+    propsStream.close()
+    args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "group-from-arguments",
+      "--consumer.config", propsFile.getAbsolutePath
+    )
+
+    try {
+      new ConsoleConsumer.ConsumerConfig(args)
+      fail("Expected groups ids provided in different places to match")
+    } catch {
+      case e: IllegalArgumentException => //OK
+    }
+
+    // via --group only
+    args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "group-from-arguments"
+    )
+
+    config = new ConsoleConsumer.ConsumerConfig(args)
+    props = ConsoleConsumer.getNewConsumerProps(config)
+    assertEquals("group-from-arguments", props.getProperty("group.id"))
+
+    Exit.resetExitProcedure()
   }
 }

Reply via email to