discussed the PR with Thosmas offline. Thomas, please correct me if I
missed anything.

Right now, the PR differs from the FLIP-150 doc regarding the converter.
* Current PR uses the enumerator checkpoint state type as the input for the
converter
* FLIP-150 defines a new EndStateT interface.
It seems that the FLIP-150 approach of EndStateT is more flexible, as
transition EndStateT doesn't have to be included in the upstream source
checkpoint state.

Let's look at two use cases:
1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 pm,
then Kafka source starts with initial position of 5 pm. In this case, there
is no need for converter or EndStateT since the starting time for Kafka
source is known and fixed.
2) dynamic cutover time at 1 hour before now. This is useful when the
bootstrap of historic data takes a long time (like days or weeks) and we
don't know the exact time of cutover when a job is launched. Instead, we
are instructing the file source to stop when it gets close to live data. In
this case, hybrid source construction will specify a relative time (now - 1
hour), the EndStateT (of file source) will be resolved to an absolute time
for cutover. We probably don't need to include EndStateT (end timestamp) as
the file source checkpoint state. Hence, the separate EndStateT is probably
more desirable.

We also discussed the converter for the Kafka source. Kafka source supports
different OffsetsInitializer impls (including TimestampOffsetsInitializer).
To support the dynamic cutover time (use case #2 above), we can plug in a
SupplierTimestampOffsetInitializer, where the starting timestamp is not set
during source/job construction. Rather it is a supplier model where the
starting timestamp value is set to the resolved absolute timestamp during
switch.

Thanks,
Steven



On Thu, May 20, 2021 at 8:59 PM Thomas Weise <t...@apache.org> wrote:

> Hi Nicholas,
>
> Thanks for taking a look at the PR!
>
> 1. Regarding switching mechanism:
>
> There has been previous discussion in this thread regarding the pros
> and cons of how the switching can be exposed to the user.
>
> With fixed start positions, no special switching interface to transfer
> information between enumerators is required. Sources are configured as
> they would be when used standalone and just plugged into HybridSource.
> I expect that to be a common use case. You can find an example for
> this in the ITCase:
>
>
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
>
> For dynamic start position, the checkpoint state is used to transfer
> information from old to new enumerator. An example for that can be
> found here:
>
>
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
>
> That may look verbose, but the code to convert from one state to
> another can be factored out into a utility and the function becomes a
> one-liner.
>
> For common sources like files and Kafka we can potentially (later)
> implement the conversion logic as part of the respective connector's
> checkpoint and split classes.
>
> I hope that with the PR up for review, we can soon reach a conclusion
> on how we want to expose this to the user.
>
> Following is an example for Files -> Files -> Kafka that I'm using for
> e2e testing. It exercises both ways of setting the start position.
>
> https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
>
>
> 2. Regarding the events used to implement the actual switch between
> enumerator and readers: I updated the PR with javadoc to clarify the
> intent. Please let me know if that helps or let's continue to discuss
> those details on the PR?
>
>
> Thanks,
> Thomas
>
>
> On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <programg...@163.com>
> wrote:
> >
> > Hi Thomas,
> >
> >    Sorry for later reply for your POC. I have reviewed the based abstract
> > implementation of your pull request:
> > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > mechanism, this level of abstraction is not concise enough, which doesn't
> > make connector contribution easier. In theory, it is necessary to
> introduce
> > a set of interfaces to support the switching mechanism. The
> SwitchableSource
> > and SwitchableSplitEnumerator interfaces are needed for connector
> > expansibility.
> >    In other words, the whole switching process of above mentioned PR is
> > different from that mentioned in FLIP-150. In the above implementation,
> the
> > source reading switching is executed after receving the
> SwitchSourceEvent,
> > which could be before the sending SourceReaderFinishEvent. This timeline
> of
> > source reading switching could be discussed here.
> >    @Stephan @Becket, if you are available, please help to review the
> > abstract implementation, and compare with the interfaces mentioned in
> > FLIP-150.
> >
> > Thanks,
> > Nicholas Jiang
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>

Reply via email to