> Converter function relies on the specific enumerator capabilities to set the new start position (e.g. fileSourceEnumerator.getEndTimestamp() and kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
I guess the premise is that a converter is for a specific tuple of (upstream source, downstream source) . We don't have to define generic EndtStateT and SwitchableEnumerator interfaces. That should work. The benefit of defining EndtStateT and SwitchableEnumerator interfaces is probably promoting uniformity across sources that support hybrid/switchable source. On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise <t...@apache.org> wrote: > Hi Steven, > > Thank you for the thorough review of the PR and for bringing this back > to the mailing list. > > All, > > I updated the FLIP-150 page to highlight aspects in which the PR > deviates from the original proposal [1]. The goal would be to update > the FLIP soon and bring it to a vote, as previously suggested offline > by Nicholas. > > A few minor issues in the PR are outstanding and I'm working on test > coverage for the recovery behavior, which should be completed soon. > > The dynamic position transfer needs to be concluded before we can move > forward however. > > There have been various ideas, including the special > "SwitchableEnumerator" interface, using enumerator checkpoint state or > an enumerator interface extension to extract the end state. > > One goal in the FLIP is to "Reuse the existing Source connectors built > with FLIP-27 without any change." and I think it is important to honor > that goal given that fixed start positions do not require interface > changes. > > Based on the feedback the following might be a good solution for > runtime position transfer: > > * User supplies the optional converter function (not applicable for > fixed positions). > * Instead of relying on the enumerator checkpoint state [2], the > converter function will be supplied with the current and next > enumerator (source.createEnumerator). > * Converter function relies on the specific enumerator capabilities to > set the new start position (e.g. > fileSourceEnumerator.getEndTimestamp() and > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..) > * HybridSourceSplitEnumerator starts new underlying enumerator > > With this approach, there is no need to augment FLIP-27 interfaces and > custom source capabilities are easier to integrate. Removing the > mandate to rely on enumerator checkpoint state also avoids potential > upgrade/compatibility issues. > > Thoughts? > > Thanks, > Thomas > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation > [2] > https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281 > > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu <stevenz...@gmail.com> wrote: > > > > discussed the PR with Thosmas offline. Thomas, please correct me if I > > missed anything. > > > > Right now, the PR differs from the FLIP-150 doc regarding the converter. > > * Current PR uses the enumerator checkpoint state type as the input for > the > > converter > > * FLIP-150 defines a new EndStateT interface. > > It seems that the FLIP-150 approach of EndStateT is more flexible, as > > transition EndStateT doesn't have to be included in the upstream source > > checkpoint state. > > > > Let's look at two use cases: > > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5 > pm, > > then Kafka source starts with initial position of 5 pm. In this case, > there > > is no need for converter or EndStateT since the starting time for Kafka > > source is known and fixed. > > 2) dynamic cutover time at 1 hour before now. This is useful when the > > bootstrap of historic data takes a long time (like days or weeks) and we > > don't know the exact time of cutover when a job is launched. Instead, we > > are instructing the file source to stop when it gets close to live data. > In > > this case, hybrid source construction will specify a relative time (now > - 1 > > hour), the EndStateT (of file source) will be resolved to an absolute > time > > for cutover. We probably don't need to include EndStateT (end timestamp) > as > > the file source checkpoint state. Hence, the separate EndStateT is > probably > > more desirable. > > > > We also discussed the converter for the Kafka source. Kafka source > supports > > different OffsetsInitializer impls (including > TimestampOffsetsInitializer). > > To support the dynamic cutover time (use case #2 above), we can plug in a > > SupplierTimestampOffsetInitializer, where the starting timestamp is not > set > > during source/job construction. Rather it is a supplier model where the > > starting timestamp value is set to the resolved absolute timestamp during > > switch. > > > > Thanks, > > Steven > > > > > > > > On Thu, May 20, 2021 at 8:59 PM Thomas Weise <t...@apache.org> wrote: > > > > > Hi Nicholas, > > > > > > Thanks for taking a look at the PR! > > > > > > 1. Regarding switching mechanism: > > > > > > There has been previous discussion in this thread regarding the pros > > > and cons of how the switching can be exposed to the user. > > > > > > With fixed start positions, no special switching interface to transfer > > > information between enumerators is required. Sources are configured as > > > they would be when used standalone and just plugged into HybridSource. > > > I expect that to be a common use case. You can find an example for > > > this in the ITCase: > > > > > > > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR101 > > > > > > For dynamic start position, the checkpoint state is used to transfer > > > information from old to new enumerator. An example for that can be > > > found here: > > > > > > > > > > https://github.com/apache/flink/pull/15924/files#diff-fe1407d135d7b7b3a72aeb4471ab53ccd2665a58ff0129a83db1ec19cea06f1bR112-R136 > > > > > > That may look verbose, but the code to convert from one state to > > > another can be factored out into a utility and the function becomes a > > > one-liner. > > > > > > For common sources like files and Kafka we can potentially (later) > > > implement the conversion logic as part of the respective connector's > > > checkpoint and split classes. > > > > > > I hope that with the PR up for review, we can soon reach a conclusion > > > on how we want to expose this to the user. > > > > > > Following is an example for Files -> Files -> Kafka that I'm using for > > > e2e testing. It exercises both ways of setting the start position. > > > > > > https://gist.github.com/tweise/3139d66461e87986f6eddc70ff06ef9a > > > > > > > > > 2. Regarding the events used to implement the actual switch between > > > enumerator and readers: I updated the PR with javadoc to clarify the > > > intent. Please let me know if that helps or let's continue to discuss > > > those details on the PR? > > > > > > > > > Thanks, > > > Thomas > > > > > > > > > On Mon, May 17, 2021 at 1:03 AM Nicholas Jiang <programg...@163.com> > > > wrote: > > > > > > > > Hi Thomas, > > > > > > > > Sorry for later reply for your POC. I have reviewed the based > abstract > > > > implementation of your pull request: > > > > https://github.com/apache/flink/pull/15924. IMO, for the switching > > > > mechanism, this level of abstraction is not concise enough, which > doesn't > > > > make connector contribution easier. In theory, it is necessary to > > > introduce > > > > a set of interfaces to support the switching mechanism. The > > > SwitchableSource > > > > and SwitchableSplitEnumerator interfaces are needed for connector > > > > expansibility. > > > > In other words, the whole switching process of above mentioned PR > is > > > > different from that mentioned in FLIP-150. In the above > implementation, > > > the > > > > source reading switching is executed after receving the > > > SwitchSourceEvent, > > > > which could be before the sending SourceReaderFinishEvent. This > timeline > > > of > > > > source reading switching could be discussed here. > > > > @Stephan @Becket, if you are available, please help to review the > > > > abstract implementation, and compare with the interfaces mentioned in > > > > FLIP-150. > > > > > > > > Thanks, > > > > Nicholas Jiang > > > > > > > > > > > > > > > > -- > > > > Sent from: > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > >