Repository: kafka Updated Branches: refs/heads/trunk c35c47981 -> 1ad74f5b7
KAFKA-5629; ConsoleConsumer should respect auto.offset.reset if specified on the command line when "auto.offset.reset" property is specified on the command line but overridden by the code during startup. Currently the ConsoleConsumer silently overrides that setting, which can create confusing behavior. Author: Soenke Liebau <soenke.lie...@opencore.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3566 from soenkeliebau/KAFKA-5629 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ad74f5b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ad74f5b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ad74f5b Branch: refs/heads/trunk Commit: 1ad74f5b760d16f3990a5024e717f4826278675d Parents: c35c479 Author: Soenke Liebau <soenke.lie...@opencore.com> Authored: Wed Aug 9 13:22:05 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Aug 9 13:22:05 2017 -0700 ---------------------------------------------------------------------- .../scala/kafka/tools/ConsoleConsumer.scala | 32 +++++- .../unit/kafka/tools/ConsoleConsumerTest.scala | 111 ++++++++++++++++++- 2 files changed, 140 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1ad74f5b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index f86b28b..ad0844a 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -173,7 +173,7 @@ object ConsoleConsumer extends Logging { props.putAll(config.consumerProps) props.putAll(config.extraConsumerProps) - props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest") + setAutoOffsetResetValue(config, props) props.put("zookeeper.connect", config.zkConnectionStr) if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) && @@ -196,7 +196,7 @@ object ConsoleConsumer extends Logging { props.putAll(config.consumerProps) props.putAll(config.extraConsumerProps) - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") + setAutoOffsetResetValue(config, props) props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") @@ -204,6 +204,34 @@ object ConsoleConsumer extends Logging { props } + /** + * Used by both getNewConsumerProps and getOldConsumerProps to retrieve the correct value for the + * consumer parameter 'auto.offset.reset'. + * Order of priority is: + * 1. Explicitly set parameter via --consumer.property command line parameter + * 2. Explicit --from-beginning given -> 'earliest' + * 3. Default value of 'latest' + * + * In case both --from-beginning and an explicit value are specified an error is thrown if these + * are conflicting. + */ + def setAutoOffsetResetValue(config: ConsumerConfig, props: Properties) { + if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) { + // auto.offset.reset parameter was specified on the command line + if (config.options.has(config.resetBeginningOpt) && "latest".equals(props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))) { + // conflicting options - latest und earliest, throw an error + System.err.println("Can't simultaneously specify --from-beginning and 'auto.offset.reset=latest', please remove one option") + Exit.exit(1) + } + // nothing to do, checking for valid parameter values happens later and the specified + // value was already copied during .putall operation + } else { + // no explicit value for auto.offset.reset was specified + // if --from-beginning was specified use "earliest", otherwise default to "latest" + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") + } + } + class ConsumerConfig(args: Array[String]) { val parser = new OptionParser(false) val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") http://git-wip-us.apache.org/repos/asf/kafka/blob/1ad74f5b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index e0917a2..c188b41 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -21,7 +21,8 @@ import java.io.{PrintStream, FileOutputStream} import kafka.common.MessageFormatter import kafka.consumer.{BaseConsumer, BaseConsumerRecord} -import kafka.utils.TestUtils +import kafka.utils.{Exit, TestUtils} +import org.apache.kafka.clients.consumer.ConsumerConfig import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test @@ -176,6 +177,114 @@ class ConsoleConsumerTest { } @Test + def shouldParseValidOldConsumerValidConfigWithAutoOffsetReset() { + //Given + val args: Array[String] = Array( + "--zookeeper", "localhost:2181", + "--topic", "test", + "--consumer-property", "auto.offset.reset=earliest") + + //When + val config = new ConsoleConsumer.ConsumerConfig(args) + val consumerProperties = ConsoleConsumer.getOldConsumerProps(config) + + //Then + assertTrue(config.useOldConsumer) + assertEquals("localhost:2181", config.zkConnectionStr) + assertEquals("test", config.topicArg) + assertEquals(false, config.fromBeginning) + assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) + } + + @Test + def shouldParseValidNewSimpleConsumerValidConfigWithAutoOffsetReset() { + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "auto.offset.reset=latest") + + //When + val config = new ConsoleConsumer.ConsumerConfig(args) + val consumerProperties = ConsoleConsumer.getNewConsumerProps(config) + + //Then + assertFalse(config.useOldConsumer) + assertEquals("localhost:9092", config.bootstrapServer) + assertEquals("test", config.topicArg) + assertEquals(false, config.fromBeginning) + assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) + } + + @Test + def shouldParseValidNewSimpleConsumerValidConfigWithAutoOffsetResetAndMatchingFromBeginning() { + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "auto.offset.reset=earliest", + "--from-beginning") + + //When + val config = new ConsoleConsumer.ConsumerConfig(args) + val consumerProperties = ConsoleConsumer.getNewConsumerProps(config) + + //Then + assertFalse(config.useOldConsumer) + assertEquals("localhost:9092", config.bootstrapServer) + assertEquals("test", config.topicArg) + assertEquals(true, config.fromBeginning) + assertEquals("earliest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) + } + + @Test + def shouldParseValidNewSimpleConsumerValidConfigWithNoOffsetReset() { + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test") + + //When + val config = new ConsoleConsumer.ConsumerConfig(args) + val consumerProperties = ConsoleConsumer.getNewConsumerProps(config) + + //Then + assertFalse(config.useOldConsumer) + assertEquals("localhost:9092", config.bootstrapServer) + assertEquals("test", config.topicArg) + assertEquals(false, config.fromBeginning) + assertEquals("latest", consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldStopWheValidConfigWithAutoOffsetResetAndConflictingFromBeginning() { + + // Override exit procedure to throw an exception instead of exiting, so we can catch the exit + // properly for this test case + Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) + + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--consumer-property", "auto.offset.reset=latest", + "--from-beginning") + + // Enclose test calls in try-finally to ensure the exit procedure is + // reset at the end + try { + val config = new ConsoleConsumer.ConsumerConfig(args) + val consumerProperties = ConsoleConsumer.getNewConsumerProps(config) + } finally + { + Exit.resetExitProcedure() + } + + // Should have thrown an exception before here, if we reach this line we can fail the test + fail() + } + + @Test def shouldParseConfigsFromFile() { val propsFile = TestUtils.tempFile() val propsStream = new FileOutputStream(propsFile)