showuon commented on code in PR #12753:
URL: https://github.com/apache/kafka/pull/12753#discussion_r1027448552


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,77 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestRecordsBeforeTimeout(): Unit = {
+
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    // long wait time should (theoretically) mean that more records show up 
before this expires
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"3000")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp,0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 7 //arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    //some new records show up before fetch max wait time
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
totalRecords)
+    // ensure that new records start with the offset that was passed to 
seek(), as new records have arrived
+    val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+    assertEquals(outOfRangePos,nextRecord.offset())
+
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestNoRecordsBeforeTimeout(): Unit = {

Review Comment:
   If you agree to remove the above test, this test could be simply renamed to 
`testFetchOutOfRangeOffsetResetConfigLatest()`. WDYT?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,77 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestRecordsBeforeTimeout(): Unit = {

Review Comment:
   I don't think this test is testing reset offset logic. Could we remove this 
test?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,77 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)

Review Comment:
   Although everything works well now, I think we should still add 
`this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")` 
for earliest test, to make sure the offset reset is working as expected



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,77 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestRecordsBeforeTimeout(): Unit = {
+
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    // long wait time should (theoretically) mean that more records show up 
before this expires
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"3000")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp,0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 7 //arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    //some new records show up before fetch max wait time
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
totalRecords)
+    // ensure that new records start with the offset that was passed to 
seek(), as new records have arrived
+    val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+    assertEquals(outOfRangePos,nextRecord.offset())
+
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestNoRecordsBeforeTimeout(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"0")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    //consume some, but not all of the records
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt/2, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 17 //arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
totalRecords)
+    val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+    //ensure the seek went to the last known record at the time of the 
previous poll
+    assertEquals(totalRecords,nextRecord.offset())

Review Comment:
   Please add a space after comma, i.e. `assertEquals(totalRecords, 
nextRecord.offset())`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,6 +1581,23 @@ public void commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, Of
      * is invoked for the same partition more than once, the latest offset 
will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of 
consumption, to reset the fetch offsets
      *
+     * The next Consumer Record which will be retrieved when poll() is invoked 
will either have the offset specified or,
+     * by default, with {@link ConsumerConfig AUTO_RESET_CONFIG} set to 
"latest"
+     * a higher numbered offset. A higher numbered offset will be returned if 
there is no consumer record with the offset
+     * specified but there is one with a higher offset.
+     * seek(0) is equivalent to seek to beginning for a topic with beginning 
offset 0.

Review Comment:
   This is not always correct. The start offset might not be 0 if it got 
deleted manually or exceeding retention time/size.... etc
   * {@link #seekToBeginning(Collection)} is equivalent to seek to beginning ...



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,6 +1581,23 @@ public void commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, Of
      * is invoked for the same partition more than once, the latest offset 
will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of 
consumption, to reset the fetch offsets
      *
+     * The next Consumer Record which will be retrieved when poll() is invoked 
will either have the offset specified or,
+     * by default, with {@link ConsumerConfig AUTO_RESET_CONFIG} set to 
"latest"
+     * a higher numbered offset. A higher numbered offset will be returned if 
there is no consumer record with the offset
+     * specified but there is one with a higher offset.
+     * seek(0) is equivalent to seek to beginning for a topic with beginning 
offset 0.
+     * seekToEnd() is equivalent to seeking to the highest known offset + 1.
+     *
+     * Seeking past the end of the highest known offset means an invalid 
offset is reached.
+     * Invalid offset behaviour is controlled by
+     * the {@link ConsumerConfig AUTO_RESET_CONFIG} property. If this is set 
to "earliest", the next poll() will seek to the beginning
+     * before returning a record. If it is set to "latest", it will seek to 
the last known record (similar to seekToEnd()).

Review Comment:
   seek will happen right after you call `seek`, it doesn't happen after 
calling `poll`. How about this:
   `If this is set to "earliest", the next poll will return records from the 
starting offset. `



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,77 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestRecordsBeforeTimeout(): Unit = {
+
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    // long wait time should (theoretically) mean that more records show up 
before this expires
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"3000")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp,0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 7 //arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    //some new records show up before fetch max wait time
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
totalRecords)
+    // ensure that new records start with the offset that was passed to 
seek(), as new records have arrived
+    val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+    assertEquals(outOfRangePos,nextRecord.offset())
+
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestNoRecordsBeforeTimeout(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"0")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    //consume some, but not all of the records
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt/2, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 17 //arbitrary, much higher offset

Review Comment:
   Please add a space after `//`



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,77 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestRecordsBeforeTimeout(): Unit = {
+
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    // long wait time should (theoretically) mean that more records show up 
before this expires
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"3000")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp,0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 7 //arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    //some new records show up before fetch max wait time
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
totalRecords)
+    // ensure that new records start with the offset that was passed to 
seek(), as new records have arrived
+    val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+    assertEquals(outOfRangePos,nextRecord.offset())
+
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestNoRecordsBeforeTimeout(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"0")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    //consume some, but not all of the records
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt/2, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 17 //arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
totalRecords)
+    val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+    //ensure the seek went to the last known record at the time of the 
previous poll

Review Comment:
   Please add a space after `//`



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,77 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestRecordsBeforeTimeout(): Unit = {
+
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    // long wait time should (theoretically) mean that more records show up 
before this expires
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"3000")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp,0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 7 //arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    //some new records show up before fetch max wait time
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
totalRecords)
+    // ensure that new records start with the offset that was passed to 
seek(), as new records have arrived
+    val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+    assertEquals(outOfRangePos,nextRecord.offset())
+
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestNoRecordsBeforeTimeout(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"0")

Review Comment:
   Could we add a comment above this line to mention why we need to set 
`FETCH_MAX_WAIT_MS_CONFIG` to 0?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,77 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestRecordsBeforeTimeout(): Unit = {
+
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    // long wait time should (theoretically) mean that more records show up 
before this expires
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"3000")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp,0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 7 //arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    //some new records show up before fetch max wait time
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
totalRecords)
+    // ensure that new records start with the offset that was passed to 
seek(), as new records have arrived
+    val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+    assertEquals(outOfRangePos,nextRecord.offset())
+
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestNoRecordsBeforeTimeout(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
"0")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    //consume some, but not all of the records

Review Comment:
   Please add a space after `//`



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,77 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {

Review Comment:
   nit: Since it's not an invalid offset, it's out of range offset, could we 
rename to `testFetchOutOfRangeOffsetResetConfigEarliest`?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -672,7 +672,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testFetchInvalidOffset(): Unit = {
     this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none")
-    val consumer = createConsumer()
+    val consumer = createConsumer(configOverrides = this.consumerConfig)

Review Comment:
   Nice fix!



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,6 +1581,23 @@ public void commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, Of
      * is invoked for the same partition more than once, the latest offset 
will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of 
consumption, to reset the fetch offsets
      *
+     * The next Consumer Record which will be retrieved when poll() is invoked 
will either have the offset specified or,
+     * by default, with {@link ConsumerConfig AUTO_RESET_CONFIG} set to 
"latest"
+     * a higher numbered offset. A higher numbered offset will be returned if 
there is no consumer record with the offset
+     * specified but there is one with a higher offset.
+     * seek(0) is equivalent to seek to beginning for a topic with beginning 
offset 0.
+     * seekToEnd() is equivalent to seeking to the highest known offset + 1.
+     *
+     * Seeking past the end of the highest known offset means an invalid 
offset is reached.

Review Comment:
   Seeking to the offset earlier than start offset will also return out of 
range exception. So, maybe update to:
   `Seeking to the offset smaller than the log start offset or larger than the 
log end offset or high watermark means an invalid offset is reached`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,6 +1581,23 @@ public void commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, Of
      * is invoked for the same partition more than once, the latest offset 
will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of 
consumption, to reset the fetch offsets
      *
+     * The next Consumer Record which will be retrieved when poll() is invoked 
will either have the offset specified or,
+     * by default, with {@link ConsumerConfig AUTO_RESET_CONFIG} set to 
"latest"
+     * a higher numbered offset. A higher numbered offset will be returned if 
there is no consumer record with the offset
+     * specified but there is one with a higher offset.

Review Comment:
   It's hard to read for these 4 lines. I don't know why we should mention 
"latest" reset config here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,6 +1581,23 @@ public void commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, Of
      * is invoked for the same partition more than once, the latest offset 
will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of 
consumption, to reset the fetch offsets
      *
+     * The next Consumer Record which will be retrieved when poll() is invoked 
will either have the offset specified or,
+     * by default, with {@link ConsumerConfig AUTO_RESET_CONFIG} set to 
"latest"
+     * a higher numbered offset. A higher numbered offset will be returned if 
there is no consumer record with the offset
+     * specified but there is one with a higher offset.
+     * seek(0) is equivalent to seek to beginning for a topic with beginning 
offset 0.
+     * seekToEnd() is equivalent to seeking to the highest known offset + 1.
+     *
+     * Seeking past the end of the highest known offset means an invalid 
offset is reached.
+     * Invalid offset behaviour is controlled by
+     * the {@link ConsumerConfig AUTO_RESET_CONFIG} property. If this is set 
to "earliest", the next poll() will seek to the beginning
+     * before returning a record. If it is set to "latest", it will seek to 
the last known record (similar to seekToEnd()).
+     * Note that, in the "latest" if any new records show up before {@link 
ConsumerConfig FETCH_MAX_WAIT_MS_CONFIG}, then
+     * the next poll() may succeed in seeking to the requested offset, or 
otherwise it will seek to the new latest known offset.

Review Comment:
   I'm thinking we don't need to mention it here. First, it's not just `latest` 
case, `earlier` will also has this behavior. Second, it's too detail to users 
to know about it, and make users more confused about it, like so, how should we 
do about it? Third, so, back to the point you tried to mentioned, I think 
you're trying to explain the case:
   1. current log end offset is 10
   2. seek to 17
   3. poll -> got nothing
   4. produce 10 more records
   5. poll -> got records offset starting from 17
   
   Is that right? I think that's expected from user's point of view. User tried 
to seek to offset 17 and returning records starting from 17, everything works 
fine, isn't it? 
   
   I think if you really want to talk about it, maybe you can say: the seek 
offset won't change to the in-flight fetch request, it will take effect in next 
fetch request. 
   
   WDYT?
   
   IMO, we don't need to mention it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to