FrankChen021 opened a new issue, #18282: URL: https://github.com/apache/druid/issues/18282
This problem has been there for a very long time. See https://github.com/apache/druid/issues/11658 When `OffsetOutOfRangeException` is raised, and if auto reset is not enabled(which is the default one), the behavior is fixed(see https://github.com/apache/druid/pull/18226) so that all data ingested before the exception WILL be handled, and task will FAIL. This is similar to kafka client behavior if 'auto.offset.reset' is not given. However, when auto reset is enabled, current behaviour does not work as expected. There're several bugs there: 1. task resets the offset by using the expired offset, which is wrong, instead, the earliest offset should be used 2. overlord kills task after offset is reset, and all ingested data lost 3. overlord can reset the offset when it starts a new task. and this time, overlord will CLEAR(which is wrong) the offset stored in the metadatastore, and throws a exception, which delays the start of task as we can see that, for the auto reset, both task and overlord are involved in making decision of offset reset which makes it overcomplicated. The following sequence diagram shows how the task and overlord works. ```mermaid sequenceDiagram participant Task participant Overlord participant Metdatastorage alt Reset Offset from tasks Task->>Overlord: 1.Reset Offset with expired offset (WRONG) Overlord->>Metdatastorage: 2.Read offset from metadata storage Overlord->>Overlord: 3.Check if offset is available Overlord->>Overlord: 4.Reset offset if NOT available (with current offset PLUS offset from metadata storage) Overlord->>Overlord: 5.Throw exception and wait for next retry if offset is available Overlord->>Task: Kill tasks end alt Starting a new task Overlord->>Overlord: 6.Start a new task Overlord->>Metdatastorage: 7.Read offset from metadata storage Overlord->>Overlord: 8.Check if offset is available Overlord->>Overlord: 9.Reset offset if NOT available (with current offset MINUS(which is WRONG) offset from metadata storage) Overlord->>Overlord: 10.Throw exception and wait for next retry to start a task(introducing delay) end ``` 1. Tasks reset the offset by using 'nextOffset' which is currently expired offset. this is wrong https://github.com/apache/druid/blob/59d257816b85dbeeca336b8e25d341d67bbc5697/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java#L134-L155 2. overlord throws an exception after reset, which delays the start of a task https://github.com/apache/druid/blob/ec25d8706cb63672c9da8dec255b79cba46288cc/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L3996-L4006 3. overlord clears the offset when reseting offset. https://github.com/apache/druid/blob/ec25d8706cb63672c9da8dec255b79cba46288cc/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L1900-L1906 in above code block, it call 'minus' method instead of 'plus' method. the `resetOffsetsInternal` used by task correctly calls the `plus` method. In next turn, overlord find that there's no offset for this partition, it reset the offset to earliest or latest which is based on `useEarliestOffset` property. https://github.com/apache/druid/blob/ec25d8706cb63672c9da8dec255b79cba46288cc/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L1965-L1975 As a subsquence, when `OffsetOutOfRangeException` is raised, in extreme cases, new tasks are continously forked but all fail. # Proposed Change To make it simple and clear, the key point is that, ONLY overlord is responsible for reseting the offset. 1. remove offset reset logic from task. when OffsetOutOfRangeException is raised, it fail but commits all ingested data as fixed by https://github.com/apache/druid/pull/18226 2. when supervisor tries to start a new task, it will know that last stored offset is not available, and tries to reset 1. use the EARLIEST offset from stream(like Kafka) to reset offset 2. correctly merge the reset offset and stored offset before update offset in metadata storage. this ensure that the overlord always resets the offset to earliest so that less data are lost. 3. issue an alert from overlord that offset has been reset 4. after reset, return the this offset so that supervisor can immediately start task from the reset offset -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
