FrankChen021 opened a new issue, #18282:
URL: https://github.com/apache/druid/issues/18282

   This problem has been there for a very long time. See 
https://github.com/apache/druid/issues/11658
   
   When `OffsetOutOfRangeException` is raised, and if auto reset is not 
enabled(which is the default one), the behavior is fixed(see 
https://github.com/apache/druid/pull/18226) so that all data ingested before 
the exception WILL be handled, and task will FAIL. This is similar to kafka 
client behavior if 'auto.offset.reset' is not given.
   
   However, when auto reset is enabled, current behaviour does not work as 
expected.
   There're several bugs there:
   1. task resets the offset by using the expired offset, which is wrong, 
instead, the earliest offset should be used
   2. overlord kills task after offset is reset, and all ingested data lost
   3. overlord can reset the offset when it starts a new task. and this time, 
overlord will CLEAR(which is wrong) the offset stored in the metadatastore, and 
throws a exception, which delays the start of task
   
   as we can see that, for the auto reset, both task and overlord are involved 
in making decision of offset reset which makes it overcomplicated.
   
   The following sequence diagram shows how the task and overlord works.
   
   ```mermaid
   sequenceDiagram
       participant Task
       participant Overlord
       participant Metdatastorage
   
       alt Reset Offset from tasks
         Task->>Overlord: 1.Reset Offset with expired offset (WRONG)
         Overlord->>Metdatastorage: 2.Read offset from metadata storage
         Overlord->>Overlord: 3.Check if offset is available
         Overlord->>Overlord: 4.Reset offset if NOT available (with current 
offset PLUS offset from metadata storage)
         Overlord->>Overlord: 5.Throw exception and wait for next retry if 
offset is available
         Overlord->>Task: Kill tasks
       end    
       
       alt Starting a new task
         Overlord->>Overlord: 6.Start a new task
         Overlord->>Metdatastorage: 7.Read offset from metadata storage
         Overlord->>Overlord: 8.Check if offset is available
         Overlord->>Overlord: 9.Reset offset if NOT available (with current 
offset MINUS(which is WRONG) offset from metadata storage)
         Overlord->>Overlord: 10.Throw exception and wait for next retry to 
start a task(introducing delay)
       end
   ```
   
   
   1. Tasks reset the offset by using 'nextOffset' which is currently expired 
offset. this is wrong
   
https://github.com/apache/druid/blob/59d257816b85dbeeca336b8e25d341d67bbc5697/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java#L134-L155
   
   2. overlord throws an exception after reset, which delays the start of a task
   
https://github.com/apache/druid/blob/ec25d8706cb63672c9da8dec255b79cba46288cc/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L3996-L4006
   
   3. overlord clears the offset when reseting offset. 
   
https://github.com/apache/druid/blob/ec25d8706cb63672c9da8dec255b79cba46288cc/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L1900-L1906
   
   in above code block, it call 'minus' method instead of 'plus' method. the 
`resetOffsetsInternal` used by task correctly calls the `plus` method.
   In next turn, overlord find that there's no offset for this partition, it 
reset the offset to earliest or latest which is based on `useEarliestOffset` 
property. 
   
https://github.com/apache/druid/blob/ec25d8706cb63672c9da8dec255b79cba46288cc/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L1965-L1975
   
   As a subsquence, when `OffsetOutOfRangeException` is raised, in extreme 
cases, new tasks are continously forked but all fail.
   
   # Proposed Change
   To make it simple and clear, the key point is that, ONLY overlord is 
responsible for reseting the offset.
   
   1. remove offset reset logic from task. when OffsetOutOfRangeException is 
raised, it fail but commits all ingested data as fixed by 
https://github.com/apache/druid/pull/18226
   2. when supervisor tries to start a new task, it will know that last stored 
offset is not available, and tries to reset
       1. use the EARLIEST offset from stream(like Kafka) to reset offset
       2. correctly merge the reset offset and stored offset before update 
offset in metadata storage. this ensure that the overlord always resets the 
offset to earliest so that less data are lost.
       3. issue an alert from overlord that offset has been reset
       4. after reset, return the this offset so that supervisor can 
immediately start task from the reset offset 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to