Sorry for the late reply. Starting from a specific connector sounds
reasonable to me.

That said, I would suggest to keep the possibility of future generalization
as much as possible. We have already seen some variation of source
combinations from different users, HDFS + Kafka, S3 + Kafka, S3 + SQL
Binlog, etc. So it would be good if we can reuse some base abstraction in
the future instead of having to write each combination from scratch.

Thanks,

Jiangjie (Becket) Qin

On Sat, Apr 17, 2021 at 7:34 PM Stephan Ewen <se...@apache.org> wrote:

> Thanks, Thomas!
>
> @Becket and @Nicholas - would you be ok with that approach?
>
>
> On Thu, Apr 15, 2021 at 6:30 PM Thomas Weise <t...@apache.org> wrote:
>
> > Hi Stephan,
> >
> > Thanks for the feedback!
> >
> > I agree with the approach of starting with a simple implementation
> > that can address a well understood, significant portion of use cases.
> >
> > I'm planning to continue work on the prototype that I had shared.
> > There is production level usage waiting for it fairly soon. I expect
> > to open a PR in the coming weeks.
> >
> > Thomas
> >
> >
> >
> >
> >
> > On Tue, Apr 13, 2021 at 12:15 PM Stephan Ewen <se...@apache.org> wrote:
> > >
> > > Thanks all for this discussion. Looks like there are lots of ideas and
> > > folks that are eager to do things, so let's see how we can get this
> > moving.
> > >
> > > My take on this is the following:
> > >
> > > There will probably not be one Hybrid source, but possibly multiple
> ones,
> > > because of different strategies/requirements.
> > >     - One may be very simple, with switching points known up-front.
> Would
> > > be good to have this in a very simple implementation.
> > >     - There may be one where the switch is dynamic and the readers need
> > to
> > > report back where they left off.
> > >     - There may be one that switches back and forth multiple times
> > during a
> > > job, for example Kakfa going to DFS whenever it falls behind retention,
> > in
> > > order to catch up again.
> > >
> > > This also seems hard to "design on paper"; I expect there are nuances
> in
> > a
> > > production setup that affect some details of the design. So I'd feel
> most
> > > comfortable in adding a variant of the hybrid source to Flink that has
> > been
> > > used already in a real use case (not necessarily in production, but
> maybe
> > > in a testing/staging environment, so it seems to meet all
> requirements).
> > >
> > >
> > > What do you think about the following approach?
> > >   - If there is a tested PoC, let's try to get it contributed to Flink
> > > without trying to make it much more general.
> > >   - When we see similar but a bit different requirements for another
> > hybrid
> > > source, then let's try to evolve the contributed one.
> > >   - If we see new requirements that are so different that they don't
> fit
> > > well with the existing hybrid source, then let us look at building a
> > second
> > > hybrid source for those requirements.
> > >
> > > We need to make connector contributions in general more easy, and I
> think
> > > it is not a bad thing to end up with different approaches and see how
> > these
> > > play out against each other when being used by users. For example
> > switching
> > > with known boundaries, dynamic switching, back-and-forth-switching,
> etc.
> > > (I know some committers are planning to do some work on making
> > > connector contributions easier, with standardized testing frameworks,
> > > decoupled CI, etc.)
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Thu, Mar 25, 2021 at 4:41 AM Thomas Weise <t...@apache.org> wrote:
> > >
> > > > Hi,
> > > >
> > > > As mentioned in my previous email, I had been working on a prototype
> > for
> > > > the hybrid source.
> > > >
> > > > You can find it at https://github.com/tweise/flink/pull/1
> > > >
> > > > It contains:
> > > > * Switching with configurable chain of sources
> > > > * Fixed or dynamic start positions
> > > > * Test with MockSource and FileSource
> > > >
> > > > The purpose of the above PR is to gather feedback and help drive
> > consensus
> > > > on the FLIP.
> > > >
> > > > * How to support a dynamic start position within the source chain?
> > > >
> > > > Relevant in those (few?) cases where start positions are not known
> > upfront.
> > > > You can find an example of what that might look like in the tests:
> > > >
> > > >
> > > >
> >
> https://github.com/tweise/flink/pull/1/files#diff-8eda4e21a8a53b70c46d30ceecfbfd8ffdb11c14580ca048fa4210564f63ada3R62
> > > >
> > > >
> >
> https://github.com/tweise/flink/pull/1/files#diff-3a5658515bb213f9a66db88d45a85971d8c68f64cdc52807622acf27fa703255R132
> > > >
> > > > When switching, the enumerator of the previous source needs to
> > > > supply information about consumed splits that allows to set the start
> > > > position for the next source. That could be something like the last
> > > > processed file, timestamp, etc. (Currently StaticFileSplitEnumerator
> > > > doesn't track finished splits.)
> > > >
> > > > See previous discussion regarding start/end position. The prototype
> > shows
> > > > the use of checkpoint state with converter function.
> > > >
> > > > * Should readers be deployed dynamically?
> > > >
> > > > The prototype assumes a static source chain that is fixed at job
> > submission
> > > > time. Conceivably there could be use cases that require more
> > flexibility.
> > > > Such as switching one KafkaSource for another. A step in that
> direction
> > > > would be to deploy the actual readers dynamically, at the time of
> > switching
> > > > source.
> > > >
> > > > Looking forward to feedback and suggestions for next steps!
> > > >
> > > > Thomas
> > > >
> > > > On Sun, Mar 14, 2021 at 11:17 AM Thomas Weise <t...@apache.org>
> wrote:
> > > >
> > > > > Hi Nicholas,
> > > > >
> > > > > Thanks for the reply. I had implemented a small PoC. It switches a
> > > > > configurable sequence of sources with predefined bounds. I'm using
> > the
> > > > > unmodified MockSource for illustration. It does not require a
> > > > "Switchable"
> > > > > interface. I looked at the code you shared and the delegation and
> > > > signaling
> > > > > works quite similar. That's a good validation.
> > > > >
> > > > > Hi Kezhu,
> > > > >
> > > > > Thanks for bringing the more detailed discussion regarding the
> > start/end
> > > > > position. I think in most cases the start and end positions will be
> > known
> > > > > when the job is submitted. If we take a File -> Kafka source chain
> as
> > > > > example, there would most likely be a timestamp at which we want to
> > > > > transition from files to reading from Kafka. So we would either set
> > the
> > > > > start position for Kafka based on that timestamp or provide the
> > offsets
> > > > > directly. (Note that I'm skipping a few related nuances here. In
> > order to
> > > > > achieve an exact switch without duplication or gap, we may also
> need
> > some
> > > > > overlap and filtering, but that's a separate issue.)
> > > > >
> > > > > The point is that the start positions can be configured by the
> user,
> > > > there
> > > > > is no need to transfer any information from one source to another
> as
> > part
> > > > > of switching.
> > > > >
> > > > > It gets more complicated if we want to achieve a dynamic switch
> > where the
> > > > > transition timestamp isn't known when the job starts. For example,
> > > > consider
> > > > > a bootstrap scenario where the time taken to process historic data
> > > > exceeds
> > > > > the Kafka retention. Here, we would need to dynamically resolve the
> > Kafka
> > > > > start position based on where the file readers left off, when the
> > > > switching
> > > > > occurs. The file source enumerator would determine at runtime when
> > it is
> > > > > done handing splits to its readers, maybe when the max file
> timestamp
> > > > > reaches (processing time - X). This information needs to be
> > transferred
> > > > to
> > > > > the Kafka source.
> > > > >
> > > > > The timestamp would need to be derived from the file enumerator
> > state,
> > > > > either by looking at the last splits or explicitly. The natural way
> > to do
> > > > > that is to introspect the enumerator state which gets checkpointed.
> > Any
> > > > > other form of "end position" via a special interface would need to
> be
> > > > > derived in the same manner.
> > > > >
> > > > > The converter that will be provided by the user would look at the
> > file
> > > > > enumerator state, derive the timestamp and then supply the "start
> > > > position"
> > > > > to the Kafka source. The Kafka source was created when the job
> > started.
> > > > It
> > > > > needs to be augmented with the new start position. That can be
> > achieved
> > > > via
> > > > > a special enumerator interface like
> > > > SwitchableSplitEnumerator#setStartState
> > > > > or by using restoreEnumerator with the checkpoint state constructed
> > by
> > > > the
> > > > > converter function. I'm leaning towards the latter as long as there
> > is a
> > > > > convenient way to construct the state from a position (like
> > > > > enumStateForTimestamp). The converter would map one enum state to
> > another
> > > > > and can be made very simple by providing a few utility functions
> > instead
> > > > of
> > > > > mandating a new interface that enumerators need to implement to
> > become
> > > > > switchable.
> > > > >
> > > > > Again, a converter is only required when sources need to be
> switched
> > > > based
> > > > > on positions not known at graph construction time.
> > > > >
> > > > > I'm planning to add such deferred switching to the PoC for
> > illustration
> > > > > and will share the experiment when that's done.
> > > > >
> > > > > Cheers,
> > > > > Thomas
> > > > >
> > > > >
> > > > > On Mon, Mar 8, 2021 at 1:46 AM Nicholas Jiang <programg...@163.com
> >
> > > > wrote:
> > > > >
> > > > >> Hi Kezhu,
> > > > >>
> > > > >> Thanks for your detailed points for the Hybrid Source. I follow
> your
> > > > >> opinions and make a corresponding explanation as follows:
> > > > >>
> > > > >> 1.Would the Hybrid Source be possible to use this feature to
> > > > switch/chain
> > > > >> multiple homogeneous sources?
> > > > >>
> > > > >> "HybridSource" supports to switch/chain multiple homogeneous
> > sources,
> > > > >> which
> > > > >> have the respective implementation for "SwitchableSource" and
> > > > >> "SwitchableSplitEnumerator". "HybridSource" doesn't limit whether
> > the
> > > > >> Sources consisted is homogeneous. From the user's perspective,
> User
> > only
> > > > >> adds the "SwitchableSource" into "HybridSource" and leaves the
> > smooth
> > > > >> migration operation to "HybridSource".
> > > > >>
> > > > >> 2."setStartState" is actually a reposition operation for next
> > source to
> > > > >> start in job runtime?
> > > > >>
> > > > >> IMO, "setStartState" is used to determine the initial position of
> > the
> > > > new
> > > > >> source for smooth migration, not reposition operation. More
> > importantly,
> > > > >> the
> > > > >> "State" mentioned here refers to the start and end positions of
> > reading
> > > > >> source.
> > > > >>
> > > > >> 3.This conversion should be implementation detail of next source,
> > not
> > > > >> converter function in my opinion?
> > > > >>
> > > > >> The state conversion is of course an implementation detail and
> > included
> > > > in
> > > > >> the switching mechanism, that should provide users with the
> > conversion
> > > > >> interface for conversion, which is defined in converter function.
> > What's
> > > > >> more, when users has already implemented "SwitchableSource" and
> > added to
> > > > >> the
> > > > >> Hybrid Source, the users don't need to implement the
> > "SwitchableSource"
> > > > >> for
> > > > >> the different conversion. From the user's perspective, users could
> > > > define
> > > > >> the different converter functions and create the
> "SwitchableSource"
> > for
> > > > >> the
> > > > >> addition of "HybridSource", no need to implement a Source for the
> > > > >> converter
> > > > >> function.
> > > > >>
> > > > >> 4.No configurable start-position. In this situation combination of
> > above
> > > > >> three joints is a nop, and
> > > > >> "HybridSource" is a chain of start-position pre-configured
> sources?
> > > > >>
> > > > >> Indeed there is no configurable start-position, and this
> > configuration
> > > > >> could
> > > > >> be involved in the feature. Users could use
> > > > >> "SwitchableSplitEnumerator#setStartState" interface or the
> > configuration
> > > > >> parameters to configure start-position.
> > > > >>
> > > > >> 5.I am wonder whether end-position is a must and how it could be
> > useful
> > > > >> for
> > > > >> end users in a generic-enough source?
> > > > >>
> > > > >> "getEndState" interface is used for the smooth migration scenario,
> > which
> > > > >> could return null value if it is not needed. In the Hybrid Source
> > > > >> mechanism,
> > > > >> this interface is required for the switching between the sources
> > > > >> consisted,
> > > > >> otherwise there is no any way to get end-position of upstream
> > source. In
> > > > >> summary, Hybrid Source needs to be able to set the start position
> > and
> > > > get
> > > > >> the end position of each Source, otherwise there is no use to
> build
> > > > Hybrid
> > > > >> Source.
> > > > >>
> > > > >> 6.Is it possible for converter function to do blocking operations?
> > How
> > > > to
> > > > >> respond to checkpoint request when switching split enumerators
> cross
> > > > >> sources? Does end-position or start-position need to be stored in
> > > > >> checkpoint
> > > > >> state or not?
> > > > >>
> > > > >> The converter function only simply converts the state of upstream
> > source
> > > > >> to
> > > > >> the state of downstream source, not blocking operations. The way
> to
> > > > >> respond
> > > > >> the checkpoint request when switching split enumerators cross
> > sources is
> > > > >> send the corresponding "SourceEvent" to coordination. The
> > end-position
> > > > or
> > > > >> start-position don't need to be stored in checkpoint state, only
> > > > >> implements
> > > > >> the "getEndState" interface for end-position.
> > > > >>
> > > > >> Best,
> > > > >> Nicholas Jiang
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Sent from:
> > > > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > > > >>
> > > > >
> > > >
> >
>

Reply via email to