Hi all, Thanks for starting the discussion!
I think there are differences between start-position and checkpoint-state. Checkpoint states are actually intermediate progress-capture in reading data. Source clients don't care about it in coding phase, there are purely implementation details. While start-position, on the other side, is part of public API, say, paths for FileSource/HiveSource, OffsetsInitializer for KafkaSource. Given that, `setStartState` is actually a reposition operation for next source to start in job runtime. Besides above, I think this separation of start-position and checkpoint-state leads to smooth migration from no-switchable source to switchable source and relaxes source writers. Source writers will not have to take into account of switchable-or-not in design phase, but a postponable decision. Later enhancement will not break anything. I would be relative hard to convert start-position to checkpoint-state in restoring without changing to checkpoint-state structure and/or serializer: * `Paths[]` to `PendingSplitsCheckpoint<FileSourceSplit>`. * `OffsetsInitializer` to `KafkaSourceEnumState`. This conversion should be implementation detail of next source, not converter function in my opinion. With separated start-position concept, I think the `StartStateT` in proposal should be `Path[]` for `FileSource`, `OffsetsInitializer` for `KafkaSource`. Seems like it should belong to source more than split enumerator. If we do not reuse checkpoint-state as start-position, then this feature requires supports from Flink side(that says it is awkward/tangled to be used as a third party libraries): * It needs a post-mortem position to delivery possible useful information for next source. * It needs to reposition next source, such as `FileSource`, `KafkaSource`, etc. Back to the proposal, I think it shows three joints: 1. Separated start-position for next source. 2. Separated end-position from preceding source. 3. Converter from end-position to start-position. I think some or all of the three should be optional. Let me detail: 1. No configurable start-position. In this situation combination of above three is a nop, and `HybridSource` is just a chain of start-position pre-configured sources. Current design seems overkilling for this use case. This case could also be realized with help from `InputSelectable` and `MultipleInputStreamOperator`, but I think it fits this feature quite well. 2. I am wonder whether end-position is a must and how it could be useful for end users in a generic-enough source, say `FileSource` ? I could imagine that some sources may have embedded next-position for next source to start. For most generic sources, they actually have no meaningful next-position for other sources. Then would it be too risky to coin this to generic sources' type ? Or we are actually doing this solely to fit type requirement of `HybridSource.addSource` ? 3. Is it possible for converter function to do blocking operations ? How to respond to checkpoint request when switching split enumerators cross sources ? Does end-position or start-position need to be stored in checkpoint state or not ? Last, for the name `HybridSource`, would it be possible to use this feature to switch/chain multiple homogeneous sources ? Say: * Two file sources with different formats. * Migrate from one kafka cluster to another as @Thomas has already pointed out. On February 4, 2021 at 10:48:21, Thomas Weise (t...@apache.org) wrote: Thanks for initiating this discussion and creating the proposal! I would like to contribute to this effort. Has there been related activity since the FLIP was created? If not, I would like to start work on a PoC to validate the design. Questions/comments: There could be more use cases for a hybrid source beyond predefined sequence that is fixed at job submission time. For example, the source connector could be used to migrate from one external system to another (like Kafka1 .. KafkaN - based on external trigger/discovery). I agree with @Aljoscha Krettek <aljos...@apache.org> that it would be preferable to solve this without special "switchable" interfaces and have it work with any FLIP-27 source as is. Performing the switch using the enumerator checkpoint appears viable (not proven though unless coded 😉). The actual FLIP-27 source reader would need to signal to the "HybridSourceReader" (HSR) that they are done and then the HSR would send the switch event to the coordinator? To further confirm my understanding: The actual split type that flows between enumerator and reader would be "HybridSourceSplit" and it would wrap the specific split (in the example either HDFS or Kafka)? Switching relies on the previous source's end position to be communicated as start position to the next source. The position(s) can be exchanged through the checkpoint state, but "HybridSplitEnumerator" still needs a way to extract them from the actual enumerator. That could be done by the enumerator checkpoint state mapping function looking at the current split assignments, which would not require modification of existing enumerators? Cheers, Thomas On Fri, Jan 8, 2021 at 4:07 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Nicholas, > > Thanks for starting the discussion! > > I think we might be able to simplify this a bit and re-use existing > functionality. > > There is already `Source.restoreEnumerator()` and > `SplitEnumerator.snapshotState(). This seems to be roughly what the > Hybrid Source needs. When the initial source finishes, we can take a > snapshot (which should include data that the follow-up sources need for > initialization). Then we need a function that maps the enumerator > checkpoint types between initial source and new source and we are good > to go. We wouldn't need to introduce any additional interfaces for > sources to implement, which would fragment the ecosystem between sources > that can be used in a Hybrid Source and sources that cannot be used in a > Hybrid Source. > > What do you think? > > Best, > Aljoscha > > On 2020/11/03 02:34, Nicholas Jiang wrote: > >Hi devs, > > > >I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid > >source is a source that contains a list of concrete sources. The hybrid > >source reads from each contained source in the defined order. It switches > >from source A to the next source B when source A finishes. > > > >In practice, many Flink jobs need to read data from multiple sources in > >sequential order. Change Data Capture (CDC) and machine learning feature > >backfill are two concrete scenarios of this consumption pattern. Users may > >have to either run two different Flink jobs or have some hacks in the > >SourceFunction to address such use cases. > > > >To support above scenarios smoothly, the Flink jobs need to first read > from > >HDFS for historical data then switch to Kafka for real-time records. The > >hybrid source has several benefits from the user's perspective: > > > >- Switching among multiple sources is easy based on the switchable source > >implementations of different connectors. > >- This supports to automatically switching for user-defined switchable > >source that constitutes hybrid source. > >- There is complete and effective mechanism to support smooth source > >migration between historical and real-time data. > > > >Therefore, in this discussion, we propose to introduce a “Hybrid Source” > API > >built on top of the new Source API (FLIP-27) to help users to smoothly > >switch sources. For more detail, please refer to the FLIP design doc[1]. > > > >I'm looking forward to your feedback. > > > >[1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source > >< > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source > > > > > >Best, > >Nicholas Jiang > > > > > > > >-- > >Sent from: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ >