> Converter function relies on the specific enumerator capabilities to set
the new start position (e.g.
fileSourceEnumerator.getEndTimestamp() and
kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)

I guess the premise is that a converter is for a specific tuple of
(upstream source, downstream source) . We don't have to define generic
EndtStateT and SwitchableEnumerator interfaces. That should work.

The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
probably promoting uniformity across sources that support hybrid/switchable
source.

On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <t...@apache.org> wrote:

> Hi Steven,
>
> Thank you for the thorough review of the PR and for bringing this back
> to the mailing list.
>
> All,
>
> I updated the FLIP-150 page to highlight aspects in which the PR
> deviates from the original proposal [1]. The goal would be to update
> the FLIP soon and bring it to a vote, as previously suggested offline
> by Nicholas.
>
> A few minor issues in the PR are outstanding and I'm working on test
> coverage for the recovery behavior, which should be completed soon.
>
> The dynamic position transfer needs to be concluded before we can move
> forward however.
>
> There have been various ideas, including the special
> "SwitchableEnumerator" interface, using enumerator checkpoint state or
> an enumerator interface extension to extract the end state.
>
> One goal in the FLIP is to "Reuse the existing Source connectors built
> with FLIP-27 without any change." and I think it is important to honor
> that goal given that fixed start positions do not require interface
> changes.
>
> Based on the feedback the following might be a good solution for
> runtime position transfer:
>
> * User supplies the optional converter function (not applicable for
> fixed positions).
> * Instead of relying on the enumerator checkpoint state [2], the
> converter function will be supplied with the current and next
> enumerator (source.createEnumerator).
> * Converter function relies on the specific enumerator capabilities to
> set the new start position (e.g.
> fileSourceEnumerator.getEndTimestamp() and
> kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> * HybridSourceSplitEnumerator starts new underlying enumerator
>
> With this approach, there is no need to augment FLIP-27 interfaces and
> custom source capabilities are easier to integrate. Removing the
> mandate to rely on enumerator checkpoint state also avoids potential
> upgrade/compatibility issues.
>
> Thoughts?
>
> Thanks,
> Thomas
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> [2]
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
>
>
> On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <stevenz...@gmail.com> wrote:
> >
> > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > missed anything.
> >
> > Right now, the PR differs from the FLIP-150 doc regarding the converter.
> > * Current PR uses the enumerator checkpoint state type as the input for
> the
> > converter
> > * FLIP-150 defines a new EndStateT interface.
> > It seems that the FLIP-150 approach of EndStateT is more flexible, as
> > transition EndStateT doesn't have to be included in the upstream source
> > checkpoint state.
> >
> > Let's look at two use cases:
> > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5
> pm,
> > then Kafka source starts with initial position of 5 pm. In this case,
> there
> > is no need for converter or EndStateT since the starting time for Kafka
> > source is known and fixed.
> > 2) dynamic cutover time at 1 hour before now. This is useful when the
> > bootstrap of historic data takes a long time (like days or weeks) and we
> > don't know the exact time of cutover when a job is launched. Instead, we
> > are instructing the file source to stop when it gets close to live data.
> In
> > this case, hybrid source construction will specify a relative time (now
> - 1
> > hour), the EndStateT (of file source) will be resolved to an absolute
> time
> > for cutover. We probably don't need to include EndStateT (end timestamp)
> as
> > the file source checkpoint state. Hence, the separate EndStateT is
> probably
> > more desirable.
> >
> > We also discussed the converter for the Kafka source. Kafka source
> supports
> > different OffsetsInitializer impls (including
> TimestampOffsetsInitializer).
> > To support the dynamic cutover time (use case #2 above), we can plug in a
> > SupplierTimestampOffsetInitializer, where the starting timestamp is not
> set
> > during source/job construction. Rather it is a supplier model where the
> > starting timestamp value is set to the resolved absolute timestamp during
> > switch.
> >
> > Thanks,
> > Steven
> >
> >
> >
> > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <t...@apache.org> wrote:
> >
> > > Hi Nicholas,
> > >
> > > Thanks for taking a look at the PR!
> > >
> > > 1. Regarding switching mechanism:
> > >
> > > There has been previous discussion in this thread regarding the pros
> > > and cons of how the switching can be exposed to the user.
> > >
> > > With fixed start positions, no special switching interface to transfer
> > > information between enumerators is required. Sources are configured as
> > > they would be when used standalone and just plugged into HybridSource.
> > > I expect that to be a common use case. You can find an example for
> > > this in the ITCase:
> > >
> > >
> > >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
> > >
> > > For dynamic start position, the checkpoint state is used to transfer
> > > information from old to new enumerator. An example for that can be
> > > found here:
> > >
> > >
> > >
> https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
> > >
> > > That may look verbose, but the code to convert from one state to
> > > another can be factored out into a utility and the function becomes a
> > > one-liner.
> > >
> > > For common sources like files and Kafka we can potentially (later)
> > > implement the conversion logic as part of the respective connector's
> > > checkpoint and split classes.
> > >
> > > I hope that with the PR up for review, we can soon reach a conclusion
> > > on how we want to expose this to the user.
> > >
> > > Following is an example for Files -> Files -> Kafka that I'm using for
> > > e2e testing. It exercises both ways of setting the start position.
> > >
> > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
> > >
> > >
> > > 2. Regarding the events used to implement the actual switch between
> > > enumerator and readers: I updated the PR with javadoc to clarify the
> > > intent. Please let me know if that helps or let's continue to discuss
> > > those details on the PR?
> > >
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <programg...@163.com>
> > > wrote:
> > > >
> > > > Hi Thomas,
> > > >
> > > >    Sorry for later reply for your POC. I have reviewed the based
> abstract
> > > > implementation of your pull request:
> > > > https://github.com/apache/flink/pull/15924. IMO, for the switching
> > > > mechanism, this level of abstraction is not concise enough, which
> doesn't
> > > > make connector contribution easier. In theory, it is necessary to
> > > introduce
> > > > a set of interfaces to support the switching mechanism. The
> > > SwitchableSource
> > > > and SwitchableSplitEnumerator interfaces are needed for connector
> > > > expansibility.
> > > >    In other words, the whole switching process of above mentioned PR
> is
> > > > different from that mentioned in FLIP-150. In the above
> implementation,
> > > the
> > > > source reading switching is executed after receving the
> > > SwitchSourceEvent,
> > > > which could be before the sending SourceReaderFinishEvent. This
> timeline
> > > of
> > > > source reading switching could be discussed here.
> > > >    @Stephan @Becket, if you are available, please help to review the
> > > > abstract implementation, and compare with the interfaces mentioned in
> > > > FLIP-150.
> > > >
> > > > Thanks,
> > > > Nicholas Jiang
> > > >
> > > >
> > > >
> > > > --
> > > > Sent from:
> > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > >
>

Reply via email to