Thanks for the suggestions and feedback on the PR.

A variation of hybrid source that can switch back and forth was
brought up before and it is something that will be eventually
required. It was also suggested by Stephan that in the future there
may be more than one implementation of hybrid source for different
requirements.

I want to bring back the topic of how enumerator end state can be
converted into start position from the PR [1]. We started in the FLIP
page with "switchable" interfaces, the prototype had checkpoint
conversion and now the PR has a function that allows to augment the
source. Each of these has pros and cons but we will need to converge.

1. Switchable interfaces
* unified solution
* requires sources to implement a special interface to participate in
HybridSource, even when no dynamic conversion is needed

2. Checkpoint state
* unified solution
* no interface changes
* requires implementation change to existing enumerators to include
end state (like a timestamp) into their checkpoint state
* existing sources work as is for fixed start position

3. Source modification at switch time to set start position
* can be solved per source, least restrictive
* no interface changes
* requires enumerator to expose end state (as a getter) and source to
be either mutable or source to be copied and augmented with the start
position.
* existing sources work as is for fixed start position

I think more eyes might help to finalize the approach.

[1] https://github.com/apache/flink/pull/15924#discussion_r649929865

On Mon, Jun 7, 2021 at 11:18 PM Steven Wu <stevenz...@gmail.com> wrote:
>
> > hybrid sounds to me more like the source would constantly switch back and 
> > forth
>
> Initially, the focus of hybrid source is more like a sequenced chain.
>
> But in the future it would be cool that hybrid sources can intelligently 
> switch back and forth between historical data source (like Iceberg) and live 
> data source (like Kafka). E.g.,
> - if the Flink job is lagging behind Kafka retention, automatically switch to 
> Iceberg source
> - once job caught up, switch back to Kafka source
>
> That can simplify operational aspects of manually switching.
>
>
> On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise <ar...@apache.org> wrote:
>>
>> Sorry for joining the party so late, but it's such an interesting FLIP with
>> a huge impact that I wanted to add my 2 cents. [1]
>> I'm mirroring some basic question from the PR review to this thread because
>> it's about the name:
>>
>> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
>> similar.
>> Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
>> does not carry the concatentation concept as well (hybrid sounds to me more
>> like the source would constantly switch back and forth).
>>
>> Could we take a few minutes to think if this is the most intuitive name for
>> new users? I'm especially hoping that natives might give some ideas (or
>> declare that Hybrid is perfect).
>>
>> [1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
>>
>> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu <stevenz...@gmail.com> wrote:
>>
>> > > Converter function relies on the specific enumerator capabilities to set
>> > the new start position (e.g.
>> > fileSourceEnumerator.getEndTimestamp() and
>> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
>> >
>> > I guess the premise is that a converter is for a specific tuple of
>> > (upstream source, downstream source) . We don't have to define generic
>> > EndtStateT and SwitchableEnumerator interfaces. That should work.
>> >
>> > The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
>> > probably promoting uniformity across sources that support hybrid/switchable
>> > source.
>> >
>> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <t...@apache.org> wrote:
>> >
>> > > Hi Steven,
>> > >
>> > > Thank you for the thorough review of the PR and for bringing this back
>> > > to the mailing list.
>> > >
>> > > All,
>> > >
>> > > I updated the FLIP-150 page to highlight aspects in which the PR
>> > > deviates from the original proposal [1]. The goal would be to update
>> > > the FLIP soon and bring it to a vote, as previously suggested offline
>> > > by Nicholas.
>> > >
>> > > A few minor issues in the PR are outstanding and I'm working on test
>> > > coverage for the recovery behavior, which should be completed soon.
>> > >
>> > > The dynamic position transfer needs to be concluded before we can move
>> > > forward however.
>> > >
>> > > There have been various ideas, including the special
>> > > "SwitchableEnumerator" interface, using enumerator checkpoint state or
>> > > an enumerator interface extension to extract the end state.
>> > >
>> > > One goal in the FLIP is to "Reuse the existing Source connectors built
>> > > with FLIP-27 without any change." and I think it is important to honor
>> > > that goal given that fixed start positions do not require interface
>> > > changes.
>> > >
>> > > Based on the feedback the following might be a good solution for
>> > > runtime position transfer:
>> > >
>> > > * User supplies the optional converter function (not applicable for
>> > > fixed positions).
>> > > * Instead of relying on the enumerator checkpoint state [2], the
>> > > converter function will be supplied with the current and next
>> > > enumerator (source.createEnumerator).
>> > > * Converter function relies on the specific enumerator capabilities to
>> > > set the new start position (e.g.
>> > > fileSourceEnumerator.getEndTimestamp() and
>> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
>> > > * HybridSourceSplitEnumerator starts new underlying enumerator
>> > >
>> > > With this approach, there is no need to augment FLIP-27 interfaces and
>> > > custom source capabilities are easier to integrate. Removing the
>> > > mandate to rely on enumerator checkpoint state also avoids potential
>> > > upgrade/compatibility issues.
>> > >
>> > > Thoughts?
>> > >
>> > > Thanks,
>> > > Thomas
>> > >
>> > > [1]
>> > >
>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
>> > > [2]
>> > >
>> > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
>> > >
>> > >
>> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <stevenz...@gmail.com> wrote:
>> > > >
>> > > > discussed the PR with Thosmas offline. Thomas, please correct me if I
>> > > > missed anything.
>> > > >
>> > > > Right now, the PR differs from the FLIP-150 doc regarding the
>> > converter.
>> > > > * Current PR uses the enumerator checkpoint state type as the input for
>> > > the
>> > > > converter
>> > > > * FLIP-150 defines a new EndStateT interface.
>> > > > It seems that the FLIP-150 approach of EndStateT is more flexible, as
>> > > > transition EndStateT doesn't have to be included in the upstream source
>> > > > checkpoint state.
>> > > >
>> > > > Let's look at two use cases:
>> > > > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5
>> > > pm,
>> > > > then Kafka source starts with initial position of 5 pm. In this case,
>> > > there
>> > > > is no need for converter or EndStateT since the starting time for Kafka
>> > > > source is known and fixed.
>> > > > 2) dynamic cutover time at 1 hour before now. This is useful when the
>> > > > bootstrap of historic data takes a long time (like days or weeks) and
>> > we
>> > > > don't know the exact time of cutover when a job is launched. Instead,
>> > we
>> > > > are instructing the file source to stop when it gets close to live
>> > data.
>> > > In
>> > > > this case, hybrid source construction will specify a relative time (now
>> > > - 1
>> > > > hour), the EndStateT (of file source) will be resolved to an absolute
>> > > time
>> > > > for cutover. We probably don't need to include EndStateT (end
>> > timestamp)
>> > > as
>> > > > the file source checkpoint state. Hence, the separate EndStateT is
>> > > probably
>> > > > more desirable.
>> > > >
>> > > > We also discussed the converter for the Kafka source. Kafka source
>> > > supports
>> > > > different OffsetsInitializer impls (including
>> > > TimestampOffsetsInitializer).
>> > > > To support the dynamic cutover time (use case #2 above), we can plug
>> > in a
>> > > > SupplierTimestampOffsetInitializer, where the starting timestamp is not
>> > > set
>> > > > during source/job construction. Rather it is a supplier model where the
>> > > > starting timestamp value is set to the resolved absolute timestamp
>> > during
>> > > > switch.
>> > > >
>> > > > Thanks,
>> > > > Steven
>> > > >
>> > > >
>> > > >
>> > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <t...@apache.org> wrote:
>> > > >
>> > > > > Hi Nicholas,
>> > > > >
>> > > > > Thanks for taking a look at the PR!
>> > > > >
>> > > > > 1. Regarding switching mechanism:
>> > > > >
>> > > > > There has been previous discussion in this thread regarding the pros
>> > > > > and cons of how the switching can be exposed to the user.
>> > > > >
>> > > > > With fixed start positions, no special switching interface to
>> > transfer
>> > > > > information between enumerators is required. Sources are configured
>> > as
>> > > > > they would be when used standalone and just plugged into
>> > HybridSource.
>> > > > > I expect that to be a common use case. You can find an example for
>> > > > > this in the ITCase:
>> > > > >
>> > > > >
>> > > > >
>> > >
>> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101
>> > > > >
>> > > > > For dynamic start position, the checkpoint state is used to transfer
>> > > > > information from old to new enumerator. An example for that can be
>> > > > > found here:
>> > > > >
>> > > > >
>> > > > >
>> > >
>> > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136
>> > > > >
>> > > > > That may look verbose, but the code to convert from one state to
>> > > > > another can be factored out into a utility and the function becomes a
>> > > > > one-liner.
>> > > > >
>> > > > > For common sources like files and Kafka we can potentially (later)
>> > > > > implement the conversion logic as part of the respective connector's
>> > > > > checkpoint and split classes.
>> > > > >
>> > > > > I hope that with the PR up for review, we can soon reach a conclusion
>> > > > > on how we want to expose this to the user.
>> > > > >
>> > > > > Following is an example for Files -> Files -> Kafka that I'm using
>> > for
>> > > > > e2e testing. It exercises both ways of setting the start position.
>> > > > >
>> > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a
>> > > > >
>> > > > >
>> > > > > 2. Regarding the events used to implement the actual switch between
>> > > > > enumerator and readers: I updated the PR with javadoc to clarify the
>> > > > > intent. Please let me know if that helps or let's continue to discuss
>> > > > > those details on the PR?
>> > > > >
>> > > > >
>> > > > > Thanks,
>> > > > > Thomas
>> > > > >
>> > > > >
>> > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <programg...@163.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > Hi Thomas,
>> > > > > >
>> > > > > >    Sorry for later reply for your POC. I have reviewed the based
>> > > abstract
>> > > > > > implementation of your pull request:
>> > > > > > https://github.com/apache/flink/pull/15924. IMO, for the switching
>> > > > > > mechanism, this level of abstraction is not concise enough, which
>> > > doesn't
>> > > > > > make connector contribution easier. In theory, it is necessary to
>> > > > > introduce
>> > > > > > a set of interfaces to support the switching mechanism. The
>> > > > > SwitchableSource
>> > > > > > and SwitchableSplitEnumerator interfaces are needed for connector
>> > > > > > expansibility.
>> > > > > >    In other words, the whole switching process of above mentioned
>> > PR
>> > > is
>> > > > > > different from that mentioned in FLIP-150. In the above
>> > > implementation,
>> > > > > the
>> > > > > > source reading switching is executed after receving the
>> > > > > SwitchSourceEvent,
>> > > > > > which could be before the sending SourceReaderFinishEvent. This
>> > > timeline
>> > > > > of
>> > > > > > source reading switching could be discussed here.
>> > > > > >    @Stephan @Becket, if you are available, please help to review
>> > the
>> > > > > > abstract implementation, and compare with the interfaces mentioned
>> > in
>> > > > > > FLIP-150.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Nicholas Jiang
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > Sent from:
>> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>> > > > >
>> > >
>> >

Reply via email to