Repository: kafka
Updated Branches:
  refs/heads/trunk c35c47981 -> 1ad74f5b7


KAFKA-5629; ConsoleConsumer should respect auto.offset.reset if specified on 
the command line

when "auto.offset.reset" property is specified on the command line but 
overridden by the code during startup. Currently the ConsoleConsumer silently 
overrides that setting, which can create confusing behavior.

Author: Soenke Liebau <soenke.lie...@opencore.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3566 from soenkeliebau/KAFKA-5629


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ad74f5b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ad74f5b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ad74f5b

Branch: refs/heads/trunk
Commit: 1ad74f5b760d16f3990a5024e717f4826278675d
Parents: c35c479
Author: Soenke Liebau <soenke.lie...@opencore.com>
Authored: Wed Aug 9 13:22:05 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Aug 9 13:22:05 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     |  32 +++++-
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 111 ++++++++++++++++++-
 2 files changed, 140 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1ad74f5b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index f86b28b..ad0844a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -173,7 +173,7 @@ object ConsoleConsumer extends Logging {
 
     props.putAll(config.consumerProps)
     props.putAll(config.extraConsumerProps)
-    props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else 
"largest")
+    setAutoOffsetResetValue(config, props)
     props.put("zookeeper.connect", config.zkConnectionStr)
 
     if (!config.options.has(config.deleteConsumerOffsetsOpt) && 
config.options.has(config.resetBeginningOpt) &&
@@ -196,7 +196,7 @@ object ConsoleConsumer extends Logging {
 
     props.putAll(config.consumerProps)
     props.putAll(config.extraConsumerProps)
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if 
(config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
+    setAutoOffsetResetValue(config, props)
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
@@ -204,6 +204,34 @@ object ConsoleConsumer extends Logging {
     props
   }
 
+  /**
+    * Used by both getNewConsumerProps and getOldConsumerProps to retrieve the 
correct value for the
+    * consumer parameter 'auto.offset.reset'.
+    * Order of priority is:
+    *   1. Explicitly set parameter via --consumer.property command line 
parameter
+    *   2. Explicit --from-beginning given -> 'earliest'
+    *   3. Default value of 'latest'
+    *
+    * In case both --from-beginning and an explicit value are specified an 
error is thrown if these
+    * are conflicting.
+    */
+  def setAutoOffsetResetValue(config: ConsumerConfig, props: Properties) {
+    if (props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
+      // auto.offset.reset parameter was specified on the command line
+      if (config.options.has(config.resetBeginningOpt) && 
"latest".equals(props.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))) {
+        // conflicting options - latest und earliest, throw an error
+        System.err.println("Can't simultaneously specify --from-beginning and 
'auto.offset.reset=latest', please remove one option")
+        Exit.exit(1)
+      }
+      // nothing to do, checking for valid parameter values happens later and 
the specified
+      // value was already copied during .putall operation
+    } else {
+      // no explicit value for auto.offset.reset was specified
+      // if --from-beginning was specified use "earliest", otherwise default 
to "latest"
+      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if 
(config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
+    }
+  }
+
   class ConsumerConfig(args: Array[String]) {
     val parser = new OptionParser(false)
     val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ad74f5b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index e0917a2..c188b41 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -21,7 +21,8 @@ import java.io.{PrintStream, FileOutputStream}
 
 import kafka.common.MessageFormatter
 import kafka.consumer.{BaseConsumer, BaseConsumerRecord}
-import kafka.utils.TestUtils
+import kafka.utils.{Exit, TestUtils}
+import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
@@ -176,6 +177,114 @@ class ConsoleConsumerTest {
   }
 
   @Test
+  def shouldParseValidOldConsumerValidConfigWithAutoOffsetReset() {
+    //Given
+    val args: Array[String] = Array(
+      "--zookeeper", "localhost:2181",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=earliest")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getOldConsumerProps(config)
+
+    //Then
+    assertTrue(config.useOldConsumer)
+    assertEquals("localhost:2181", config.zkConnectionStr)
+    assertEquals("test", config.topicArg)
+    assertEquals(false, config.fromBeginning)
+    assertEquals("earliest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test
+  def shouldParseValidNewSimpleConsumerValidConfigWithAutoOffsetReset() {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=latest")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getNewConsumerProps(config)
+
+    //Then
+    assertFalse(config.useOldConsumer)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(false, config.fromBeginning)
+    assertEquals("latest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test
+  def 
shouldParseValidNewSimpleConsumerValidConfigWithAutoOffsetResetAndMatchingFromBeginning()
 {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=earliest",
+      "--from-beginning")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getNewConsumerProps(config)
+
+    //Then
+    assertFalse(config.useOldConsumer)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(true, config.fromBeginning)
+    assertEquals("earliest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test
+  def shouldParseValidNewSimpleConsumerValidConfigWithNoOffsetReset() {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val consumerProperties = ConsoleConsumer.getNewConsumerProps(config)
+
+    //Then
+    assertFalse(config.useOldConsumer)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(false, config.fromBeginning)
+    assertEquals("latest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldStopWheValidConfigWithAutoOffsetResetAndConflictingFromBeginning() 
{
+
+    // Override exit procedure to throw an exception instead of exiting, so we 
can catch the exit
+    // properly for this test case
+    Exit.setExitProcedure((_, message) => throw new 
IllegalArgumentException(message.orNull))
+
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--consumer-property", "auto.offset.reset=latest",
+      "--from-beginning")
+
+    // Enclose test calls in try-finally to ensure the exit procedure is
+    // reset at the end
+    try {
+      val config = new ConsoleConsumer.ConsumerConfig(args)
+      val consumerProperties = ConsoleConsumer.getNewConsumerProps(config)
+    } finally
+    {
+      Exit.resetExitProcedure()
+    }
+
+    // Should have thrown an exception before here, if we reach this line we 
can fail the test
+    fail()
+  }
+
+  @Test
   def shouldParseConfigsFromFile() {
     val propsFile = TestUtils.tempFile()
     val propsStream = new FileOutputStream(propsFile)

Reply via email to