Hi all,

Thanks for starting the discussion!

I think there are differences between start-position and checkpoint-state.
Checkpoint states
are actually intermediate progress-capture in reading data. Source clients
don't care about it
in coding phase, there are purely implementation details. While
start-position, on the other side,
is part of public API, say, paths for FileSource/HiveSource,
OffsetsInitializer for KafkaSource.
Given that, `setStartState` is actually a reposition operation for next
source to start in job runtime.

Besides above, I think this separation of start-position and
checkpoint-state leads to smooth
migration from no-switchable source to switchable source and relaxes source
writers. Source writers
will not have to take into account of switchable-or-not in design phase,
but a postponable decision.
Later enhancement will not break anything. I would be relative hard to
convert start-position to
checkpoint-state in restoring without changing to checkpoint-state
structure and/or serializer:

* `Paths[]` to `PendingSplitsCheckpoint<FileSourceSplit>`.
* `OffsetsInitializer` to `KafkaSourceEnumState`.

This conversion should be implementation detail of next source, not
converter function in my opinion.

With separated start-position concept, I think the `StartStateT` in
proposal should be `Path[]` for
`FileSource`, `OffsetsInitializer` for `KafkaSource`. Seems like it should
belong to source more
than split enumerator.

If we do not reuse checkpoint-state as start-position, then this feature
requires supports from Flink
side(that says it is awkward/tangled to be used as a third party libraries):
* It needs a post-mortem position to delivery possible useful information
for next source.
* It needs to reposition next source, such as `FileSource`, `KafkaSource`,
etc.

Back to the proposal, I think it shows three joints:
1. Separated start-position for next source.
2. Separated end-position from preceding source.
3. Converter from end-position to start-position.

I think some or all of the three should be optional. Let me detail:
1. No configurable start-position. In this situation combination of above
three is a nop,
   and `HybridSource` is just a chain of start-position pre-configured
sources. Current design seems
   overkilling for this use case. This case could also be realized with
help from `InputSelectable`
   and `MultipleInputStreamOperator`, but I think it fits this feature
quite well.
2. I am wonder whether end-position is a must and how it could be useful
for end users in
   a generic-enough source, say `FileSource` ? I could imagine that some
sources may have embedded
   next-position for next source to start. For most generic sources, they
actually have no meaningful
   next-position for other sources. Then would it be too risky to coin this
to generic sources' type ?
   Or we are actually doing this solely to fit type requirement of
`HybridSource.addSource` ?
3. Is it possible for converter function to do blocking operations ? How to
respond to checkpoint
   request when switching split enumerators cross sources ? Does
end-position or start-position
   need to be stored in checkpoint state or not ?

Last, for the name `HybridSource`, would it be possible to use this feature
to switch/chain multiple
homogeneous sources ? Say:

* Two file sources with different formats.
* Migrate from one kafka cluster to another as @Thomas has already pointed
out.



On February 4, 2021 at 10:48:21, Thomas Weise (t...@apache.org) wrote:

Thanks for initiating this discussion and creating the proposal!

I would like to contribute to this effort. Has there been related activity
since the FLIP was created?

If not, I would like to start work on a PoC to validate the design.

Questions/comments:

There could be more use cases for a hybrid source beyond predefined
sequence that is fixed at job submission time. For example, the source
connector could be used to migrate from one external system to another
(like Kafka1 .. KafkaN - based on external trigger/discovery).

I agree with @Aljoscha Krettek <aljos...@apache.org> that it would be
preferable to solve this without special "switchable" interfaces and have
it work with any FLIP-27 source as is. Performing the switch using the
enumerator checkpoint appears viable (not proven though unless coded 😉).
The actual FLIP-27 source reader would need to signal to the
"HybridSourceReader" (HSR) that they are done and then the HSR would send
the switch event to the coordinator?

To further confirm my understanding:

The actual split type that flows between enumerator and reader would be
"HybridSourceSplit" and it would wrap the specific split (in the example
either HDFS or Kafka)?

Switching relies on the previous source's end position to be communicated
as start position to the next source. The position(s) can be exchanged
through the checkpoint state, but "HybridSplitEnumerator" still needs a way
to extract them from the actual enumerator. That could be done by the
enumerator checkpoint state mapping function looking at the current split
assignments, which would not require modification of existing enumerators?

Cheers,
Thomas


On Fri, Jan 8, 2021 at 4:07 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Nicholas,
>
> Thanks for starting the discussion!
>
> I think we might be able to simplify this a bit and re-use existing
> functionality.
>
> There is already `Source.restoreEnumerator()` and
> `SplitEnumerator.snapshotState(). This seems to be roughly what the
> Hybrid Source needs. When the initial source finishes, we can take a
> snapshot (which should include data that the follow-up sources need for
> initialization). Then we need a function that maps the enumerator
> checkpoint types between initial source and new source and we are good
> to go. We wouldn't need to introduce any additional interfaces for
> sources to implement, which would fragment the ecosystem between sources
> that can be used in a Hybrid Source and sources that cannot be used in a
> Hybrid Source.
>
> What do you think?
>
> Best,
> Aljoscha
>
> On 2020/11/03 02:34, Nicholas Jiang wrote:
> >Hi devs,
> >
> >I'd like to start a new FLIP to introduce the Hybrid Source. The hybrid
> >source is a source that contains a list of concrete sources. The hybrid
> >source reads from each contained source in the defined order. It
switches
> >from source A to the next source B when source A finishes.
> >
> >In practice, many Flink jobs need to read data from multiple sources in
> >sequential order. Change Data Capture (CDC) and machine learning feature
> >backfill are two concrete scenarios of this consumption pattern. Users
may
> >have to either run two different Flink jobs or have some hacks in the
> >SourceFunction to address such use cases.
> >
> >To support above scenarios smoothly, the Flink jobs need to first read
> from
> >HDFS for historical data then switch to Kafka for real-time records. The
> >hybrid source has several benefits from the user's perspective:
> >
> >- Switching among multiple sources is easy based on the switchable
source
> >implementations of different connectors.
> >- This supports to automatically switching for user-defined switchable
> >source that constitutes hybrid source.
> >- There is complete and effective mechanism to support smooth source
> >migration between historical and real-time data.
> >
> >Therefore, in this discussion, we propose to introduce a “Hybrid Source”
> API
> >built on top of the new Source API (FLIP-27) to help users to smoothly
> >switch sources. For more detail, please refer to the FLIP design doc[1].
> >
> >I'm looking forward to your feedback.
> >
> >[1]
> >
>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> ><
>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> >
> >
> >Best,
> >Nicholas Jiang
> >
> >
> >
> >--
> >Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>

Reply via email to