Repository: kafka Updated Branches: refs/heads/trunk c4b8a7eab -> 198302fee
KAFKA-4416; Add a `--group` option to console consumer This simplifies running a console consumer as part of a custom group. Note that group id can be provided via only one of the three possible means: directly using `--group ` option (as part of this PR), as property via `--consumer-property` option, or inside a config file via `--consumer.config` option. Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Apurva Mehta <apu...@confluent.io>, Jason Gustafson <ja...@confluent.io> Closes #2150 from vahidhashemian/KAFKA-4416 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/198302fe Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/198302fe Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/198302fe Branch: refs/heads/trunk Commit: 198302feecf17cd1eff38e6d93e807d7b04922f2 Parents: c4b8a7e Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Wed Oct 4 08:50:56 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Oct 4 08:50:56 2017 -0700 ---------------------------------------------------------------------- .../scala/kafka/tools/ConsoleConsumer.scala | 27 ++++- .../unit/kafka/tools/ConsoleConsumerTest.scala | 112 ++++++++++++++++++- 2 files changed, 134 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/198302fe/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index c014caf..8b20601 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -340,6 +340,10 @@ object ConsoleConsumer extends Logging { .ofType(classOf[String]) .defaultsTo("read_uncommitted") + val groupIdOpt = parser.accepts("group", "The consumer group id of the consumer.") + .withRequiredArg + .describedAs("consumer group id") + .ofType(classOf[String]) if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") @@ -456,10 +460,25 @@ object ConsoleConsumer extends Logging { KafkaMetricsReporter.startReporters(verifiableProps) } - //Provide the consumer with a randomly assigned group id - if (!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}") - groupIdPassed = false + // if the group id is provided in more than place (through different means) all values must be the same + val groupIdsProvided = Set( + Option(options.valueOf(groupIdOpt)), // via --group + Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property + Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --cosumer.config + ).flatten + + if (groupIdsProvided.size > 1) { + CommandLineUtils.printUsageAndDie(parser, "The group ids provided in different places (directly using '--group', " + + "via '--consumer-property', or via '--consumer.config') do not match. " + + s"Detected group ids: ${groupIdsProvided.mkString("'", "', '", "'")}") + } + + groupIdsProvided.headOption match { + case Some(group) => + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group) + case None => + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}") + groupIdPassed = false } def tryParse(parser: OptionParser, args: Array[String]): OptionSet = { http://git-wip-us.apache.org/repos/asf/kafka/blob/198302fe/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 9fbd3df..364f6b3 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -368,7 +368,8 @@ class ConsoleConsumerTest { def shouldParseConfigsFromFile() { val propsFile = TestUtils.tempFile() val propsStream = new FileOutputStream(propsFile) - propsStream.write("request.timeout.ms=1000".getBytes()) + propsStream.write("request.timeout.ms=1000\n".getBytes()) + propsStream.write("group.id=group1".getBytes()) propsStream.close() val args: Array[String] = Array( "--bootstrap-server", "localhost:9092", @@ -379,5 +380,114 @@ class ConsoleConsumerTest { val config = new ConsoleConsumer.ConsumerConfig(args) assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms")) + assertEquals("group1", config.consumerProps.getProperty("group.id")) + } + + @Test + def groupIdsProvidedInDifferentPlacesMustMatch() { + Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) + + // different in all three places + var propsFile = TestUtils.tempFile() + var propsStream = new FileOutputStream(propsFile) + propsStream.write("group.id=group-from-file".getBytes()) + propsStream.close() + var args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-property", "group.id=group-from-properties", + "--consumer.config", propsFile.getAbsolutePath + ) + + try { + new ConsoleConsumer.ConsumerConfig(args) + fail("Expected groups ids provided in different places to match") + } catch { + case e: IllegalArgumentException => //OK + } + + // the same in all three places + propsFile = TestUtils.tempFile() + propsStream = new FileOutputStream(propsFile) + propsStream.write("group.id=test-group".getBytes()) + propsStream.close() + args = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--consumer-property", "group.id=test-group", + "--consumer.config", propsFile.getAbsolutePath + ) + + var config = new ConsoleConsumer.ConsumerConfig(args) + var props = ConsoleConsumer.getNewConsumerProps(config) + assertEquals("test-group", props.getProperty("group.id")) + + // different via --consumer-property and --consumer.config + propsFile = TestUtils.tempFile() + propsStream = new FileOutputStream(propsFile) + propsStream.write("group.id=group-from-file".getBytes()) + propsStream.close() + args = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "group.id=group-from-properties", + "--consumer.config", propsFile.getAbsolutePath + ) + + try { + new ConsoleConsumer.ConsumerConfig(args) + fail("Expected groups ids provided in different places to match") + } catch { + case e: IllegalArgumentException => //OK + } + + // different via --consumer-property and --group + args = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer-property", "group.id=group-from-properties" + ) + + try { + new ConsoleConsumer.ConsumerConfig(args) + fail("Expected groups ids provided in different places to match") + } catch { + case e: IllegalArgumentException => //OK + } + + // different via --group and --consumer.config + propsFile = TestUtils.tempFile() + propsStream = new FileOutputStream(propsFile) + propsStream.write("group.id=group-from-file".getBytes()) + propsStream.close() + args = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments", + "--consumer.config", propsFile.getAbsolutePath + ) + + try { + new ConsoleConsumer.ConsumerConfig(args) + fail("Expected groups ids provided in different places to match") + } catch { + case e: IllegalArgumentException => //OK + } + + // via --group only + args = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "group-from-arguments" + ) + + config = new ConsoleConsumer.ConsumerConfig(args) + props = ConsoleConsumer.getNewConsumerProps(config) + assertEquals("group-from-arguments", props.getProperty("group.id")) + + Exit.resetExitProcedure() } }