xinbinhuang commented on PR #20289: URL: https://github.com/apache/flink/pull/20289#issuecomment-1196291153
@tweise Thank you so much for reviewing the PR! I just realized that I might have misread jira issue as `HybridSource: Support dynamic stop position in HybridSource` instead of `HybridSource: Support dynamic stop position in FileSource`. So this PR actually aimed to design an _**generic interface to allow any sources to participate in dynamic source switch**_. With that in mind. Let me explain how I came up with the current design & implementation. After reviewing the current logic of the hybrid source (amazing work 🎉 🎉 !!), I understand that the current implementation support transferring the end position to the next enumerator. However, it lacks the mechanism to know where is the end position (i.e. offset for a kafka partition). And these "end positions" are probably unknown beforehand, or it would be the same as [fixed start position](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#fixed-start-position-at-graph-construction-time)? Therefore, I think the key is to transfer the "end position" to the enumerator from source reader during source switch. There are a few points to consider: 1. What would be the "end position" to transfer to the next source? I believe this varied by use cases. Some may find it enough to use `file.path`, while some may require to derive the timestamp or offset from the content of the file (i.e. kafka archive, and the implementation can vary by companies.). Since we can't anticipate all use cases, passing all finished splits seem to be a reasonable solution here and let the developer to decide how to derive the position from them. 2. Where to make the changes? I aimed to implement this s.t. most existing sources can benefit from it out of the box with minimal changes and no breaking changes to them. 3. How to store the "finished splits" before source switch? Per FLIP-27, the enumerator only knows what splits to consume but not the actual progress - only source reader knows about it. So we need to store them and transfer them to the enumerator during source switch. However, most existing sources implements `SourceReaderBase` and it [purges them from state](https://github.com/apache/flink/blob/3e2620bc785b3b4d82f1f188eb7b1e0e129b14d3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L193-L194) once it reaches the end position. One naive solution would be to adjust `SourceReaderBase` to also store finished splits into the state. However, this'll affect all sources running in production and is probably a big backward incompatible changes right away. Therefore, I decided to store them only in the `HybridSourceReader`, and the existing sources only need to implement one method (`DynamicHybridSourceReader::getFinishedSplits`) that allows the finished splits to be extracted during checkpoint and source switch. This process is transparent to all existing sources, and only happens when used with the `HybridSource`. With the above points, the current implementation works as follow: - On each checkpoint, `HybridSourceReader` retrieve the finished states (marked with `HybridSourceSplit.isFinished = true`) from the underlying reader and checkpoint them for persistent along with the unfinished states. - Upon source switch, `HybridSourceReader` will send all the finished splits in `SourceReaderFinishedEvent` to the enumerator. - Enumerator will pass along those finished splits to in `SourceSwitchContext` to the next source. And the next source can them use the splits to derive the start positions. #### Changes required on existing non hybrid sources: - Implements `DynamicHybridSourceReader::getFinishedSplits` on `SourceReaderBase`. #### API changes on `HybridSource` - Added `SourceSwitchContext::getPreviousSplits` which returns finished splits from the previous source. It's a lot of words, so really appreciate your patience for reading this. Let me know if there are anything unclear, I'm happy to chat more about this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org