showuon commented on code in PR #12753:
URL: https://github.com/apache/kafka/pull/12753#discussion_r1019962773


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,76 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatest(): Unit = {
+
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)

Review Comment:
   Some explanation:
   To reduce consumer/follower fetch overhead, we will wait for a while in 
broker side if there's no records for a specific offset, to reduce the network 
traffic from client/broker. Ex: in your example, you send 10 records to broker, 
after received 10 records (offset 0 -9), the next consumer poll will try to 
fetch offset 10, but there's no offset 10 record in broker. So, when broker 
receiving the FETCH request, it'll wait in broker side for `fetch.max.wait.ms` 
(default 500 ms). If there's records arrived within 500ms, it'll respond to 
client. You can imagine, that reduce the network traffic a lot. And, if after 
500ms, there's no record arrived, it'll respond with empty result. So, your 
current test will run like this:
   
   1. sending 10 records to broker
   2. consumer poll 10 records + one more fetch for offset 10
   3. consumer seek to 17
   4. consumer poll, but not sending FETCH request because there's already an 
in-flight FETCH request not responded, yet
   5. send another 10 records
   6. consumer poll, the previous FETCH responded with 10 records, starting 
from offset 10. But they got discarded because fetcher found the starting 
offset is not what we expected, so send another FETCH staring from offset 17
   7. received records starting from offset 17
   
   If we set the  `fetch.max.wait.ms` to 0, it'll be like this:
   1. sending 10 records to broker
   2. consumer poll 10 records + one more fetch for offset 10, **and responded 
with empty results immediately**
   3. consumer seek to 17
   //4. consumer poll, but not sending FETCH request because there's already an 
in-flight FETCH request not responded, yet
   4. consumer poll, sending FETCH request, got out of range error, reset 
position to latest, 10
   5. send another 10 records
   //6. consumer poll, it'll get 10 records, but got discarded because fetcher 
found the starting offset is not what we expected, so send another FETCH 
staring from offset 17
   6. consumer poll, got 10 records
   7. received records starting from offset **10**
   
   Hope that's clear



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to