xinbinhuang commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1196291153

   
   @tweise Thank you so much for reviewing the PR! I just realized that I might 
have misread jira issue as `HybridSource: Support dynamic stop position in 
HybridSource` instead of `HybridSource: Support dynamic stop position in 
FileSource`. So this PR actually aimed to design an _**generic interface to 
allow any sources to participate in dynamic source switch**_. With that in 
mind. Let me explain how I came up with the current design & implementation.
   
   After reviewing the current logic of the hybrid source (amazing work 🎉 🎉 
!!), I understand that the current implementation support transferring the end 
position to the next enumerator. However, it lacks the mechanism to know where 
is the end position (i.e. offset for a kafka partition). And these "end 
positions" are probably unknown beforehand, or it would be the same as [fixed 
start 
position](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#fixed-start-position-at-graph-construction-time)?
   Therefore, I think the key is to transfer the "end position" to the 
enumerator from source reader during source switch. 
   There are a few points to consider:
   
   1. What would be the "end position" to transfer to the next source? 
   
   I believe this varied by use cases. Some may find it enough to use 
`file.path`, while some may require to derive the timestamp or offset from the 
content of the file (i.e. kafka archive, and the implementation can vary by 
companies.). Since we can't anticipate all use cases, passing all finished 
splits seem to be a reasonable solution here and let the developer to decide 
how to derive the position from them.
   
   2. Where to make the changes?
   
   I aimed to implement this s.t. most existing sources can benefit from it out 
of the box with minimal changes and no breaking changes to them.
   
   
   3. How to store the "finished splits" before source switch?
   
   Per FLIP-27, the enumerator only knows what splits to consume but not the 
actual progress - only source reader knows about it. So we need to store them 
and transfer them to the enumerator during source switch. However, most 
existing sources implements `SourceReaderBase` and it [purges them from 
state](https://github.com/apache/flink/blob/3e2620bc785b3b4d82f1f188eb7b1e0e129b14d3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L193-L194)
 once it reaches the end position. One naive solution would be to adjust 
`SourceReaderBase` to also store finished splits into the state. However, 
this'll affect all sources running in production and is probably a big backward 
incompatible changes right away. Therefore, I decided to store them only in the 
`HybridSourceReader`, and the existing sources only need to implement one 
method (`DynamicHybridSourceReader::getFinishedSplits`) that allows the 
finished splits to be extracted
  during checkpoint and source switch. This process is transparent to all 
existing sources, and only happens when used with the `HybridSource`.
   
   With the above points, the current implementation works as follow:
   - On each checkpoint,  `HybridSourceReader`  retrieve the finished states 
(marked with  `HybridSourceSplit.isFinished = true`) from the underlying reader 
and checkpoint them for persistent along with the unfinished states.
   - Upon source switch,  `HybridSourceReader`  will send all the finished 
splits in  `SourceReaderFinishedEvent`  to the enumerator.
   - Enumerator will pass along those finished splits to in  
`SourceSwitchContext`  to the next source. And the next source can them use the 
splits to derive the start positions.
   
   #### Changes required on existing non hybrid sources:
   - Implements `DynamicHybridSourceReader::getFinishedSplits` on 
`SourceReaderBase`.
   
   #### API changes on `HybridSource`
   - Added `SourceSwitchContext::getPreviousSplits` which returns finished 
splits from the previous source.
   
   It's a lot of words, so really appreciate your patience for reading this. 
Let me know if there are anything unclear, I'm happy to chat more about this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to