Hi Hongshun,

Thanks for updating the FLIP. I think that makes sense. A few comments
below:

1. If SplitFetcherManager becomes PublicEvolving, that also means
SplitFetcher needs to be PublicEvolving, because it is returned by the
protected method SplitFetcherManager.createSplitFetcher().

2. When checking the API of the classes to be marked as PublicEvolving,
there might be a few methods' signatures worth some discussion.

For SplitFetcherManager:
a) Currently removeSplits() methods takes a list of SplitT. I am wondering
if it should be a list of splitIds. SplitT actually contains two parts of
information, the static split Id and some dynamically changing state of the
split (e.g. Kafka consumer offset). The source of truth for the dynamic
state is SourceReaderBase. Currently we are passing in the full source
split with the dynamic state for split removal. But it looks like only
split id is needed for the split removal.
Maybe this is intentional, as sometimes when a SplitReader removes a split,
it also wants to know the dynamic state of the split. If so, we can keep it
as is. But then the question is why
SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead of
SplitT. Should we make them consistent?

For SplitFetcher:
a) The SplitFetcher.pauseOrResumeSplits() method takes collections of
SplitT as arguments. We may want to adjust that according to what we do to
the SplitFetcherManager. The current SplitFetcherManager basically looks up
the SplitT from the fetcher with the split Id, and immediately passes the
SplitT back to the fetcher, which is unnecessary.
b) After supporting split level pause and resume, do we still need split
fetcher level pause and resume? If not, SplitFetcher.pause() and
SplitFetcher.resume() can be removed. In fact, they seem no longer used
anywhere.

Other than the above potential API adjustment before we mark the classes
PublicEvolving, the API looks fine to me.

I think it is good timing for deprecation now. We will mark the impacted
constructors as deprecated in 1.19, and remove them in release of 2.0.

Thanks,

Jiangjie (Becket) Qin



On Thu, Nov 16, 2023 at 8:26 PM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi Devs,
>
> I have just modified the content of FLIP-389: Annotate
> SingleThreadFetcherManager as PublicEvolving[1].
>
> Now this Flip mainly do two thing:
>
>    1. Annotate SingleThreadFetcherManager as PublicEvolving
>    2. Remove all public constructors which use
>    FutureCompletingBlockingQueue. This will make many constructors as
>    @Depricated.
>
> This may influence many connectors, so I am looking forward to hearing from
> you.
>
>
> Best regards,
> Hongshun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
>
> On Wed, Nov 15, 2023 at 7:57 AM Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Hongshun,
> > >
> > >
> > > However, it will be tricky because SplitFetcherManager includes <E,
> > SplitT
> > > extends SourceSplit>, while FutureCompletingBlockingQueue includes <T>.
> > > This means that SplitFetcherManager would have to be modified to <T, E,
> > > SplitT extends SourceSplit>, which would affect the compatibility of
> the
> > > SplitFetcherManager class. I'm afraid this change will influence other
> > > sources.
> >
> > Although the FutureCompletingBlockingQueue class itself has a template
> > class <T>. In the SourceReaderBase and SplitFetcherManager, this <T> is
> > actually RecordsWithSplitIds<E>. So it looks like we can just let
> > SplitFetcherManager.poll() return a RecordsWithSplitIds<E>.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Nov 14, 2023 at 8:11 PM Hongshun Wang <loserwang1...@gmail.com>
> > wrote:
> >
> > > Hi Becket,
> > >       I agree with you and try to modify this Flip[1], which include
> > these
> > > changes:
> > >
> > >    1. Mark constructor of SingleThreadMultiplexSourceReaderBase as
> > >    @Depricated
> > >    2.
> > >
> > >    Mark constructor of SourceReaderBase as *@Depricated* and provide a
> > new
> > >    constructor without
> > >
> > >    FutureCompletingBlockingQueue
> > >    3.
> > >
> > >    Mark constructor of SplitFetcherManager
> andSingleThreadFetcherManager
> > >    as  *@Depricated* and provide a new constructor
> > >    without FutureCompletingBlockingQueue. Mark SplitFetcherManager
> > >    andSingleThreadFetcherManager as *@PublicEvolving*
> > >    4.
> > >
> > >    SplitFetcherManager provides  wrapper methods for
> > >    FutureCompletingBlockingQueue  to replace its usage in
> > SourceReaderBase.
> > >    Then we can use FutureCompletingBlockingQueue only in
> > >    SplitFetcherManager.
> > >
> > > However, it will be tricky because SplitFetcherManager includes <E,
> > SplitT
> > > extends SourceSplit>, while FutureCompletingBlockingQueue includes <T>.
> > > This means that SplitFetcherManager would have to be modified to <T, E,
> > > SplitT extends SourceSplit>, which would affect the compatibility of
> the
> > > SplitFetcherManager class. I'm afraid this change will influence other
> > > sources.
> > >
> > >
> > >
> > > Looking forward to hearing from you.
> > >
> > > Best regards,
> > > Hongshun
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> > >
> > > On Sat, Nov 11, 2023 at 10:55 AM Becket Qin <becket....@gmail.com>
> > wrote:
> > >
> > > > Hi Hongshun and Martijn,
> > > >
> > > > Sorry for the late reply as I was on travel and still catching up
> with
> > > the
> > > > emails. Please allow me to provide more context.
> > > >
> > > > 1. The original design of SplitFetcherManager and its subclasses was
> to
> > > > make them public to the Source developers. The goal is to let us take
> > > care
> > > > of the threading model, while the Source developers can just focus on
> > the
> > > > SplitReader implementation. Therefore, I think making
> > > SplitFetcherManater /
> > > > SingleThreadFetcherManager public aligns with the original design.
> That
> > > is
> > > > also why these classes are exposed in the constructor of
> > > SourceReaderBase.
> > > >
> > > > 2. For FutureCompletingBlockingQueue, as a hindsight, it might be
> > better
> > > to
> > > > not expose it to the Source developers. They are unlikely to use it
> > > > anywhere other than just constructing it. The reason that
> > > > FutureCompletingBlockingQueue is currently exposed in the
> > > SourceReaderBase
> > > > constructor is because both the SplitFetcherManager and
> > SourceReaderBase
> > > > need it. One way to hide the FutureCompletingBlockingQueue from the
> > > public
> > > > API is to make SplitFetcherManager the only owner class of the queue,
> > and
> > > > expose some of its methods via SplitFetcherManager. This way, the
> > > > SourceReaderBase can invoke the methods via SplitFetcherManager. I
> > > believe
> > > > this also makes the code slightly cleaner.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Fri, Nov 10, 2023 at 12:28 PM Hongshun Wang <
> > loserwang1...@gmail.com>
> > > > wrote:
> > > >
> > > > > @Martijn, I agree with you.
> > > > >
> > > > >
> > > > > I also have two questions at the beginning:
> > > > >
> > > > >    - Why is an Internal class
> > > > >    exposed as a constructor param of a Public class?
> > > > >    - Should these classes be exposed as public?
> > > > >
> > > > > For the first question,  I noticed that before the original
> Jira[1] ,
> > > > > all these classes missed the annotate , so it was not abnormal that
> > > > > FutureCompletingBlockingQueue and SingleThreadFetcherManager were
> > > > > constructor params of SingleThreadMultiplexSourceReaderBase.
> > > > >  However,
> > > > > this jira marked FutureCompletingBlockingQueue and
> > > > > SingleThreadFetcherManager as Internal, while marked
> > > > > SingleThreadMultiplexSourceReaderBase as Public. It's a good
> choice,
> > > > > but also forget that FutureCompletingBlockingQueue and
> > > > > SingleThreadFetcherManager have already been  exposed by
> > > > > SingleThreadMultiplexSourceReaderBase.
> > > > >  Thus, this problem occurs because we didn't
> > > > > clearly define the boundaries at the origin design. We should pay
> > more
> > > > > attention to it when creating a new class.
> > > > >
> > > > >
> > > > > For the second question, I think at least SplitFetcherManager
> > > > > should be Public. There are few reasons:
> > > > >
> > > > >    -  Connector developers want to decide their own
> > > > >    thread mode. For example, Whether to recycle fetchers by
> > overriding
> > > > >    SplitFetcherManager#maybeShutdownFinishedFetchers
> > > > >    when idle. Sometimes, developers want SplitFetcherManager react
> > as a
> > > > >    FixedThreadPool, because
> > > > >    each time a thread is recycled then recreated, the context
> > > > > resources need to be rebuilt. I met a related issue in flink
> cdc[2].
> > > > >    -
> > > > >    KafkaSourceFetcherManager[3] also  extends
> > > > > SingleThreadFetcherManager to commitOffsets. But now kafka souce is
> > > > > not in Flink repository, so it's not allowed any more.
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-22358
> > > > >
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/ververica/flink-cdc-connectors/pull/2571#issuecomment-1797585418
> > > > >
> > > > > [3]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L52
> > > > >
> > > > > Looking forward to hearing from you.
> > > > >
> > > > > Best regards,
> > > > > Hongshun
> > > > >
> > > > > On Thu, Nov 9, 2023 at 11:46 PM Martijn Visser <
> > > martijnvis...@apache.org
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I'm looking at the original Jira that introduced these stability
> > > > > > designations [1] and I'm just curious if it was intended that
> these
> > > > > > Internal classes would be used directly, or if we just haven't
> > > created
> > > > > > the right abstractions? The reason for asking is because moving
> > > > > > something from Internal to a public designation is an easy fix,
> > but I
> > > > > > want to make sure that it's also the right fix. If we are missing
> > > good
> > > > > > abstractions, then I would rather invest in those.
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Martijn
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-22358
> > > > > >
> > > > > > On Wed, Nov 8, 2023 at 12:40 PM Leonard Xu <xbjt...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > Thanks Hongshun for starting this discussion.
> > > > > > >
> > > > > > > +1 from my side.
> > > > > > >
> > > > > > > IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324
> > > > comment[1].
> > > > > > >
> > > > > > > Best,
> > > > > > > Leonard
> > > > > > >
> > > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > > 2023年11月8日 下午5:42,Hongshun Wang <loserwang1...@gmail.com>
> 写道:
> > > > > > > >
> > > > > > > > Hi devs,
> > > > > > > >
> > > > > > > > I would like to start a discussion on FLIP-389: Annotate
> > > > > > > > SingleThreadFetcherManager and FutureCompletingBlockingQueue
> as
> > > > > > > > PublicEvolving.[
> > > > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> > > > > > >
> > > > > > > > 1].
> > > > > > > >
> > > > > > > > Though the SingleThreadFetcherManager is annotated as
> Internal,
> > > it
> > > > > > actually
> > > > > > > > acts as some-degree public API, which is widely used in many
> > > > > connector
> > > > > > > > projects: flink-cdc-connector
> > > > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/ververica/flink-cdc-connectors/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L93
> > > > > > >
> > > > > > > > , flink-connector-mongodb
> > > > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink-connector-mongodb/blob/main/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java#L58
> > > > > > >
> > > > > > > > and
> > > > > > > > soon.
> > > > > > > >
> > > > > > > > Moreover, even the constructor of
> > > > > SingleThreadMultiplexSourceReaderBase
> > > > > > > > (which is PublicEvolving) includes the params of
> > > > > > SingleThreadFetcherManager
> > > > > > > > and FutureCompletingBlockingQueue.  That means that the
> > > > > > > > SingleThreadFetcherManager  and FutureCompletingBlockingQueue
> > > have
> > > > > > already
> > > > > > > > been exposed to users for a long time and are widely used.
> > > > > > > >
> > > > > > > > Considering that all source implementations are using them de
> > > > facto,
> > > > > > why
> > > > > > > > not annotate SingleThreadFetcherManager and
> > > > > > FutureCompletingBlockingQueue
> > > > > > > > as PublicEvolving so that developers will modify it more
> > > carefully
> > > > to
> > > > > > avoid
> > > > > > > > any potential issues.  As shown in FLINK-31324[2],
> > FLINK-28853[3]
> > > > > used
> > > > > > > > to change the default constructor of
> > SingleThreadFetcherManager.
> > > > > > However,
> > > > > > > > it influenced a lot. Finally, the former constructor was
> added
> > > back
> > > > > and
> > > > > > > > marked as Deprecated。
> > > > > > > >
> > > > > > > > In conclusion, the goal of this FLIP is to annotate
> > > > > > > > SingleThreadFetcherManager(includes its parent class) and
> > > > > > > > FutureCompletingBlockingQueue as PublicEvolving.
> > > > > > > >
> > > > > > > > Looking forward to hearing from you.
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> > > > > > > >
> > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-31324
> > > > > > > >
> > > > > > > > [3] https://issues.apache.org/jira/browse/FLINK-28853
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to