Hi, Jiangjie(Becket) ,
Thank you for your advice. I have learned a lot.

If SplitFetcherManager becomes PublicEvolving, that also means
> SplitFetcher needs to be PublicEvolving, because it is returned by the
> protected method SplitFetcherManager.createSplitFetcher().

I completely agree with you. However, if SplitFetcher becomes
PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
because it is returned by the public method SplitFetcher#enqueueTask.
Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
as a
constructor parameter, which is not allowed
now. Therefore, I propose changing SplitFetcher to a public Interface
and moving its implementation details to an implement class (e.g.,
SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
marked as internal and managed by SplitFetcherManager,
and put data in the queue.
Subclasses of SplitFetcherManager can only use the SplitFetcher interface,
also ensuring that the current subclasses are not affected.



The current SplitFetcherManager basically looks up
> the SplitT from the fetcher with the split Id, and immediately passes the
> SplitT back to the fetcher, which is unnecessary.

I inferred that this is because SplitReader#pauseOrResumeSplits
requires SplitT instead of SpiltId.  Perhaps some external source
requires more information to pause. However, SplitReader doesn't store
all its split data, while SplitFetcherManager saves them.
CC, @Dawid Wysakowicz



 If not, SplitFetcher.pause() and
> SplitFetcher.resume() can be removed. In fact, they seem no longer used
> anywhere.

It seems no use any more. CC, @Arvid Heise



Thanks,
Hongshun Wang

On Fri, Nov 17, 2023 at 11:42 AM Becket Qin <becket....@gmail.com> wrote:

> Hi Hongshun,
>
> Thanks for updating the FLIP. I think that makes sense. A few comments
> below:
>
> 1. If SplitFetcherManager becomes PublicEvolving, that also means
> SplitFetcher needs to be PublicEvolving, because it is returned by the
> protected method SplitFetcherManager.createSplitFetcher().
>
> 2. When checking the API of the classes to be marked as PublicEvolving,
> there might be a few methods' signatures worth some discussion.
>
> For SplitFetcherManager:
> a) Currently removeSplits() methods takes a list of SplitT. I am wondering
> if it should be a list of splitIds. SplitT actually contains two parts of
> information, the static split Id and some dynamically changing state of the
> split (e.g. Kafka consumer offset). The source of truth for the dynamic
> state is SourceReaderBase. Currently we are passing in the full source
> split with the dynamic state for split removal. But it looks like only
> split id is needed for the split removal.
> Maybe this is intentional, as sometimes when a SplitReader removes a split,
> it also wants to know the dynamic state of the split. If so, we can keep it
> as is. But then the question is why
> SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead of
> SplitT. Should we make them consistent?
>
> For SplitFetcher:
> a) The SplitFetcher.pauseOrResumeSplits() method takes collections of
> SplitT as arguments. We may want to adjust that according to what we do to
> the SplitFetcherManager. The current SplitFetcherManager basically looks up
> the SplitT from the fetcher with the split Id, and immediately passes the
> SplitT back to the fetcher, which is unnecessary.
> b) After supporting split level pause and resume, do we still need split
> fetcher level pause and resume? If not, SplitFetcher.pause() and
> SplitFetcher.resume() can be removed. In fact, they seem no longer used
> anywhere.
>
> Other than the above potential API adjustment before we mark the classes
> PublicEvolving, the API looks fine to me.
>
> I think it is good timing for deprecation now. We will mark the impacted
> constructors as deprecated in 1.19, and remove them in release of 2.0.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Thu, Nov 16, 2023 at 8:26 PM Hongshun Wang <loserwang1...@gmail.com>
> wrote:
>
> > Hi Devs,
> >
> > I have just modified the content of FLIP-389: Annotate
> > SingleThreadFetcherManager as PublicEvolving[1].
> >
> > Now this Flip mainly do two thing:
> >
> >    1. Annotate SingleThreadFetcherManager as PublicEvolving
> >    2. Remove all public constructors which use
> >    FutureCompletingBlockingQueue. This will make many constructors as
> >    @Depricated.
> >
> > This may influence many connectors, so I am looking forward to hearing
> from
> > you.
> >
> >
> > Best regards,
> > Hongshun
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> >
> > On Wed, Nov 15, 2023 at 7:57 AM Becket Qin <becket....@gmail.com> wrote:
> >
> > > Hi Hongshun,
> > > >
> > > >
> > > > However, it will be tricky because SplitFetcherManager includes <E,
> > > SplitT
> > > > extends SourceSplit>, while FutureCompletingBlockingQueue includes
> <T>.
> > > > This means that SplitFetcherManager would have to be modified to <T,
> E,
> > > > SplitT extends SourceSplit>, which would affect the compatibility of
> > the
> > > > SplitFetcherManager class. I'm afraid this change will influence
> other
> > > > sources.
> > >
> > > Although the FutureCompletingBlockingQueue class itself has a template
> > > class <T>. In the SourceReaderBase and SplitFetcherManager, this <T> is
> > > actually RecordsWithSplitIds<E>. So it looks like we can just let
> > > SplitFetcherManager.poll() return a RecordsWithSplitIds<E>.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Tue, Nov 14, 2023 at 8:11 PM Hongshun Wang <loserwang1...@gmail.com
> >
> > > wrote:
> > >
> > > > Hi Becket,
> > > >       I agree with you and try to modify this Flip[1], which include
> > > these
> > > > changes:
> > > >
> > > >    1. Mark constructor of SingleThreadMultiplexSourceReaderBase as
> > > >    @Depricated
> > > >    2.
> > > >
> > > >    Mark constructor of SourceReaderBase as *@Depricated* and provide
> a
> > > new
> > > >    constructor without
> > > >
> > > >    FutureCompletingBlockingQueue
> > > >    3.
> > > >
> > > >    Mark constructor of SplitFetcherManager
> > andSingleThreadFetcherManager
> > > >    as  *@Depricated* and provide a new constructor
> > > >    without FutureCompletingBlockingQueue. Mark SplitFetcherManager
> > > >    andSingleThreadFetcherManager as *@PublicEvolving*
> > > >    4.
> > > >
> > > >    SplitFetcherManager provides  wrapper methods for
> > > >    FutureCompletingBlockingQueue  to replace its usage in
> > > SourceReaderBase.
> > > >    Then we can use FutureCompletingBlockingQueue only in
> > > >    SplitFetcherManager.
> > > >
> > > > However, it will be tricky because SplitFetcherManager includes <E,
> > > SplitT
> > > > extends SourceSplit>, while FutureCompletingBlockingQueue includes
> <T>.
> > > > This means that SplitFetcherManager would have to be modified to <T,
> E,
> > > > SplitT extends SourceSplit>, which would affect the compatibility of
> > the
> > > > SplitFetcherManager class. I'm afraid this change will influence
> other
> > > > sources.
> > > >
> > > >
> > > >
> > > > Looking forward to hearing from you.
> > > >
> > > > Best regards,
> > > > Hongshun
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> > > >
> > > > On Sat, Nov 11, 2023 at 10:55 AM Becket Qin <becket....@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Hongshun and Martijn,
> > > > >
> > > > > Sorry for the late reply as I was on travel and still catching up
> > with
> > > > the
> > > > > emails. Please allow me to provide more context.
> > > > >
> > > > > 1. The original design of SplitFetcherManager and its subclasses
> was
> > to
> > > > > make them public to the Source developers. The goal is to let us
> take
> > > > care
> > > > > of the threading model, while the Source developers can just focus
> on
> > > the
> > > > > SplitReader implementation. Therefore, I think making
> > > > SplitFetcherManater /
> > > > > SingleThreadFetcherManager public aligns with the original design.
> > That
> > > > is
> > > > > also why these classes are exposed in the constructor of
> > > > SourceReaderBase.
> > > > >
> > > > > 2. For FutureCompletingBlockingQueue, as a hindsight, it might be
> > > better
> > > > to
> > > > > not expose it to the Source developers. They are unlikely to use it
> > > > > anywhere other than just constructing it. The reason that
> > > > > FutureCompletingBlockingQueue is currently exposed in the
> > > > SourceReaderBase
> > > > > constructor is because both the SplitFetcherManager and
> > > SourceReaderBase
> > > > > need it. One way to hide the FutureCompletingBlockingQueue from the
> > > > public
> > > > > API is to make SplitFetcherManager the only owner class of the
> queue,
> > > and
> > > > > expose some of its methods via SplitFetcherManager. This way, the
> > > > > SourceReaderBase can invoke the methods via SplitFetcherManager. I
> > > > believe
> > > > > this also makes the code slightly cleaner.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Fri, Nov 10, 2023 at 12:28 PM Hongshun Wang <
> > > loserwang1...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > @Martijn, I agree with you.
> > > > > >
> > > > > >
> > > > > > I also have two questions at the beginning:
> > > > > >
> > > > > >    - Why is an Internal class
> > > > > >    exposed as a constructor param of a Public class?
> > > > > >    - Should these classes be exposed as public?
> > > > > >
> > > > > > For the first question,  I noticed that before the original
> > Jira[1] ,
> > > > > > all these classes missed the annotate , so it was not abnormal
> that
> > > > > > FutureCompletingBlockingQueue and SingleThreadFetcherManager were
> > > > > > constructor params of SingleThreadMultiplexSourceReaderBase.
> > > > > >  However,
> > > > > > this jira marked FutureCompletingBlockingQueue and
> > > > > > SingleThreadFetcherManager as Internal, while marked
> > > > > > SingleThreadMultiplexSourceReaderBase as Public. It's a good
> > choice,
> > > > > > but also forget that FutureCompletingBlockingQueue and
> > > > > > SingleThreadFetcherManager have already been  exposed by
> > > > > > SingleThreadMultiplexSourceReaderBase.
> > > > > >  Thus, this problem occurs because we didn't
> > > > > > clearly define the boundaries at the origin design. We should pay
> > > more
> > > > > > attention to it when creating a new class.
> > > > > >
> > > > > >
> > > > > > For the second question, I think at least SplitFetcherManager
> > > > > > should be Public. There are few reasons:
> > > > > >
> > > > > >    -  Connector developers want to decide their own
> > > > > >    thread mode. For example, Whether to recycle fetchers by
> > > overriding
> > > > > >    SplitFetcherManager#maybeShutdownFinishedFetchers
> > > > > >    when idle. Sometimes, developers want SplitFetcherManager
> react
> > > as a
> > > > > >    FixedThreadPool, because
> > > > > >    each time a thread is recycled then recreated, the context
> > > > > > resources need to be rebuilt. I met a related issue in flink
> > cdc[2].
> > > > > >    -
> > > > > >    KafkaSourceFetcherManager[3] also  extends
> > > > > > SingleThreadFetcherManager to commitOffsets. But now kafka souce
> is
> > > > > > not in Flink repository, so it's not allowed any more.
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-22358
> > > > > >
> > > > > > [2]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/ververica/flink-cdc-connectors/pull/2571#issuecomment-1797585418
> > > > > >
> > > > > > [3]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L52
> > > > > >
> > > > > > Looking forward to hearing from you.
> > > > > >
> > > > > > Best regards,
> > > > > > Hongshun
> > > > > >
> > > > > > On Thu, Nov 9, 2023 at 11:46 PM Martijn Visser <
> > > > martijnvis...@apache.org
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I'm looking at the original Jira that introduced these
> stability
> > > > > > > designations [1] and I'm just curious if it was intended that
> > these
> > > > > > > Internal classes would be used directly, or if we just haven't
> > > > created
> > > > > > > the right abstractions? The reason for asking is because moving
> > > > > > > something from Internal to a public designation is an easy fix,
> > > but I
> > > > > > > want to make sure that it's also the right fix. If we are
> missing
> > > > good
> > > > > > > abstractions, then I would rather invest in those.
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Martijn
> > > > > > >
> > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-22358
> > > > > > >
> > > > > > > On Wed, Nov 8, 2023 at 12:40 PM Leonard Xu <xbjt...@gmail.com>
> > > > wrote:
> > > > > > > >
> > > > > > > > Thanks Hongshun for starting this discussion.
> > > > > > > >
> > > > > > > > +1 from my side.
> > > > > > > >
> > > > > > > > IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324
> > > > > comment[1].
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Leonard
> > > > > > > >
> > > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > > 2023年11月8日 下午5:42,Hongshun Wang <loserwang1...@gmail.com>
> > 写道:
> > > > > > > > >
> > > > > > > > > Hi devs,
> > > > > > > > >
> > > > > > > > > I would like to start a discussion on FLIP-389: Annotate
> > > > > > > > > SingleThreadFetcherManager and
> FutureCompletingBlockingQueue
> > as
> > > > > > > > > PublicEvolving.[
> > > > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> > > > > > > >
> > > > > > > > > 1].
> > > > > > > > >
> > > > > > > > > Though the SingleThreadFetcherManager is annotated as
> > Internal,
> > > > it
> > > > > > > actually
> > > > > > > > > acts as some-degree public API, which is widely used in
> many
> > > > > > connector
> > > > > > > > > projects: flink-cdc-connector
> > > > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/ververica/flink-cdc-connectors/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L93
> > > > > > > >
> > > > > > > > > , flink-connector-mongodb
> > > > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink-connector-mongodb/blob/main/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java#L58
> > > > > > > >
> > > > > > > > > and
> > > > > > > > > soon.
> > > > > > > > >
> > > > > > > > > Moreover, even the constructor of
> > > > > > SingleThreadMultiplexSourceReaderBase
> > > > > > > > > (which is PublicEvolving) includes the params of
> > > > > > > SingleThreadFetcherManager
> > > > > > > > > and FutureCompletingBlockingQueue.  That means that the
> > > > > > > > > SingleThreadFetcherManager  and
> FutureCompletingBlockingQueue
> > > > have
> > > > > > > already
> > > > > > > > > been exposed to users for a long time and are widely used.
> > > > > > > > >
> > > > > > > > > Considering that all source implementations are using them
> de
> > > > > facto,
> > > > > > > why
> > > > > > > > > not annotate SingleThreadFetcherManager and
> > > > > > > FutureCompletingBlockingQueue
> > > > > > > > > as PublicEvolving so that developers will modify it more
> > > > carefully
> > > > > to
> > > > > > > avoid
> > > > > > > > > any potential issues.  As shown in FLINK-31324[2],
> > > FLINK-28853[3]
> > > > > > used
> > > > > > > > > to change the default constructor of
> > > SingleThreadFetcherManager.
> > > > > > > However,
> > > > > > > > > it influenced a lot. Finally, the former constructor was
> > added
> > > > back
> > > > > > and
> > > > > > > > > marked as Deprecated。
> > > > > > > > >
> > > > > > > > > In conclusion, the goal of this FLIP is to annotate
> > > > > > > > > SingleThreadFetcherManager(includes its parent class) and
> > > > > > > > > FutureCompletingBlockingQueue as PublicEvolving.
> > > > > > > > >
> > > > > > > > > Looking forward to hearing from you.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> > > > > > > > >
> > > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-31324
> > > > > > > > >
> > > > > > > > > [3] https://issues.apache.org/jira/browse/FLINK-28853
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to