Scanteianu commented on code in PR #12753:
URL: https://github.com/apache/kafka/pull/12753#discussion_r1007760016


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    val consumer = createConsumer()
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatest(): Unit = {
+    val consumer = createConsumer()
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 7 //arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
totalRecords)
+    // ensure that new records are consumed only if they're after the seek 
position
+    val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+    assertEquals(nextRecord.offset(),outOfRangePos)
+
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatestPastEnd(): Unit = {
+    // todo: this seems broken - it definitely shouldn't seek to the 
beginning; see notes below
+    // one valid behaviour might be for it to keep returning empty until the 
seek offset is reached
+    // another might be for it to always seek to the latest new record (ie: 
the one at totalRecords position, or
+    // position 10). But since we've already consumed the first 10 records, it 
really shouldn't rewind.
+    val consumer = createConsumer()
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 17 //arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
totalRecords)
+    // why is this resetting to 0? this seems like a bug
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0, startingTimestamp = 0)

Review Comment:
   this definitely seems wrong (I left the test in to demonstrate that it 
happens - the test passes), please review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to