Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2024-01-18 Thread Hongshun Wang
Hi Devs,
  Thanks for all your advice, it helps a lot. I have already revised
the document[1] and started a vote[2].

Thanks,

Hongshun


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

[2] https://lists.apache.org/thread/t1zff21z440pvv48jyhm8pgtqsyplchn

On Fri, Jan 12, 2024 at 1:00 AM Becket Qin  wrote:

> Hi Qingsheng,
>
> Thanks for the comment. I think the initial idea is to hide the queue
> completely from the users, i.e. make FutureCompletingBlockingQueue class
> internal. If it is OK to expose the class to the users, then just returning
> the queue sounds reasonable to me.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Jan 10, 2024 at 10:39 PM Hongshun Wang 
> wrote:
>
>> Hi Qingsheng,
>>
>>
>> I agree with you that it would be clearer to have a new interface that
>> extracts the SplitFetcher creation and management logic from the current
>> SplitFetcherManager. However, extensive modifications to the interface may
>> influence a lot and cause compatibility issues. Perhaps we can consider
>> doing it later, rather than in this FLIP.
>>
>>
>> Adding a new internal method, SplitFetcherManager#getQueue(), to
>> SourceReaderBase seems to be a better option than exposing methods like
>> poll and notifyAvailable on SplitFetcherManager.
>>
>>
>> I have taken this valuable suggestion and updated the FLIP accordingly.
>>
>>
>> Thanks,
>>
>> Hongshun
>>
>> On Thu, Jan 11, 2024 at 2:09 PM Qingsheng Ren  wrote:
>>
>>> Hi Hongshun and Becket,
>>>
>>> Sorry for being late in the discussion! I went through the entire FLIP
>>> but I still have some concerns about the new SplitFetcherManager.
>>>
>>> First of all I agree that we should hide the elementQueue from connector
>>> developers. This could simplify the interface exposed to developers so that
>>> they can focus on the interaction with external systems.
>>>
>>> However in the current FLIP, SplitFetcherManager exposes 4 more methods,
>>> poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which
>>> are tightly coupled with the implementation of the elementQueue. The naming
>>> of these methods look weird to me, like what does it mean to "poll from a
>>> SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify
>>> these methods we have to explain to developers that "well we hide a queue
>>> inside SplitFetcherMamager and the poll method is actually polling from the
>>> queue". I'm afraid these methods will implicitly expose the concept and the
>>> implementation of the queue to developers.
>>>
>>> I think a cleaner solution would be having a new interface that extracts
>>> SplitFetcher creating and managing logic from the current
>>> SplitFetcherManager, but having too many concepts might make the entire
>>> Source API even harder to understand. To make a compromise, I'm considering
>>> only exposing constructors of SplitFetcherManager as public APIs, and
>>> adding a new internal method SplitFetcherManager#getQueue() for
>>> SourceReaderBase (well it's a bit hacky I admit but I think exposing
>>> methods like poll and notifyAvailable on SplitFetcherManager is even
>>> worth). WDTY?
>>>
>>> Thanks,
>>> Qingsheng
>>>
>>> On Thu, Dec 21, 2023 at 8:36 AM Becket Qin  wrote:
>>>
 Hi Hongshun,

 I think the proposal in the FLIP is basically fine. A few minor
 comments:

 1. In FLIPs, we define all the user-sensible changes as public
 interfaces.
 The public interface section should list all of them. So, the code
 blocks
 currently in the proposed changes section should be put into the public
 interface section instead.

 2. It would be good to put all the changes of one class together. For
 example, for SplitFetcherManager, we can say:
 - Change SplitFetcherManager from Internal to PublicEvolving.
 - Deprecate the old constructor exposing the
 FutureCompletingBlockingQueue, and add new constructors as replacements
 which creates the FutureCompletingBlockingQueue instance internally.
 - Add a few new methods to expose the functionality of the internal
 FutureCompletingBlockingQueue via the SplitFetcherManager.
And then follows the code block containing all the changes above.
 Ideally, the changes should come with something like "// <-- New",
 so
 that it is. easier to be found.

 3. In the proposed changes section, it would be good to add some more
 detailed explanation of the idea behind the public interface changes. So
 even people new to Flink can understand better how exactly the interface
 changes will help fulfill the motivation. For example, regarding the
 constructor signature change, we can say the following. We can mention a
 few things in this section:
 - By exposing the SplitFetcherManager / SingleThreadFetcheManager,
 by
 implementing addSplits() and removeSplits(), connector developers can
 easily create their own 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2024-01-11 Thread Becket Qin
Hi Qingsheng,

Thanks for the comment. I think the initial idea is to hide the queue
completely from the users, i.e. make FutureCompletingBlockingQueue class
internal. If it is OK to expose the class to the users, then just returning
the queue sounds reasonable to me.

Thanks,

Jiangjie (Becket) Qin

On Wed, Jan 10, 2024 at 10:39 PM Hongshun Wang 
wrote:

> Hi Qingsheng,
>
>
> I agree with you that it would be clearer to have a new interface that
> extracts the SplitFetcher creation and management logic from the current
> SplitFetcherManager. However, extensive modifications to the interface may
> influence a lot and cause compatibility issues. Perhaps we can consider
> doing it later, rather than in this FLIP.
>
>
> Adding a new internal method, SplitFetcherManager#getQueue(), to
> SourceReaderBase seems to be a better option than exposing methods like
> poll and notifyAvailable on SplitFetcherManager.
>
>
> I have taken this valuable suggestion and updated the FLIP accordingly.
>
>
> Thanks,
>
> Hongshun
>
> On Thu, Jan 11, 2024 at 2:09 PM Qingsheng Ren  wrote:
>
>> Hi Hongshun and Becket,
>>
>> Sorry for being late in the discussion! I went through the entire FLIP
>> but I still have some concerns about the new SplitFetcherManager.
>>
>> First of all I agree that we should hide the elementQueue from connector
>> developers. This could simplify the interface exposed to developers so that
>> they can focus on the interaction with external systems.
>>
>> However in the current FLIP, SplitFetcherManager exposes 4 more methods,
>> poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which
>> are tightly coupled with the implementation of the elementQueue. The naming
>> of these methods look weird to me, like what does it mean to "poll from a
>> SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify
>> these methods we have to explain to developers that "well we hide a queue
>> inside SplitFetcherMamager and the poll method is actually polling from the
>> queue". I'm afraid these methods will implicitly expose the concept and the
>> implementation of the queue to developers.
>>
>> I think a cleaner solution would be having a new interface that extracts
>> SplitFetcher creating and managing logic from the current
>> SplitFetcherManager, but having too many concepts might make the entire
>> Source API even harder to understand. To make a compromise, I'm considering
>> only exposing constructors of SplitFetcherManager as public APIs, and
>> adding a new internal method SplitFetcherManager#getQueue() for
>> SourceReaderBase (well it's a bit hacky I admit but I think exposing
>> methods like poll and notifyAvailable on SplitFetcherManager is even
>> worth). WDTY?
>>
>> Thanks,
>> Qingsheng
>>
>> On Thu, Dec 21, 2023 at 8:36 AM Becket Qin  wrote:
>>
>>> Hi Hongshun,
>>>
>>> I think the proposal in the FLIP is basically fine. A few minor comments:
>>>
>>> 1. In FLIPs, we define all the user-sensible changes as public
>>> interfaces.
>>> The public interface section should list all of them. So, the code blocks
>>> currently in the proposed changes section should be put into the public
>>> interface section instead.
>>>
>>> 2. It would be good to put all the changes of one class together. For
>>> example, for SplitFetcherManager, we can say:
>>> - Change SplitFetcherManager from Internal to PublicEvolving.
>>> - Deprecate the old constructor exposing the
>>> FutureCompletingBlockingQueue, and add new constructors as replacements
>>> which creates the FutureCompletingBlockingQueue instance internally.
>>> - Add a few new methods to expose the functionality of the internal
>>> FutureCompletingBlockingQueue via the SplitFetcherManager.
>>>And then follows the code block containing all the changes above.
>>> Ideally, the changes should come with something like "// <-- New", so
>>> that it is. easier to be found.
>>>
>>> 3. In the proposed changes section, it would be good to add some more
>>> detailed explanation of the idea behind the public interface changes. So
>>> even people new to Flink can understand better how exactly the interface
>>> changes will help fulfill the motivation. For example, regarding the
>>> constructor signature change, we can say the following. We can mention a
>>> few things in this section:
>>> - By exposing the SplitFetcherManager / SingleThreadFetcheManager, by
>>> implementing addSplits() and removeSplits(), connector developers can
>>> easily create their own threading models in the SourceReaderBase.
>>> - Note that the SplitFetcher constructor is package private, so users
>>> can only create SplitFetchers via
>>> SplitFetcherManager.createSplitFetcher().
>>> This ensures each SplitFetcher is always owned by the
>>> SplitFetcherManager.
>>> - This FLIP essentially embedded the element queue (a
>>> FutureCompletingBlockingQueue) instance into the SplitFetcherManager.
>>> This
>>> hides the element queue from the connector 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2024-01-10 Thread Hongshun Wang
Hi Qingsheng,


I agree with you that it would be clearer to have a new interface that
extracts the SplitFetcher creation and management logic from the current
SplitFetcherManager. However, extensive modifications to the interface may
influence a lot and cause compatibility issues. Perhaps we can consider
doing it later, rather than in this FLIP.


Adding a new internal method, SplitFetcherManager#getQueue(), to
SourceReaderBase seems to be a better option than exposing methods like poll
and notifyAvailable on SplitFetcherManager.


I have taken this valuable suggestion and updated the FLIP accordingly.


Thanks,

Hongshun

On Thu, Jan 11, 2024 at 2:09 PM Qingsheng Ren  wrote:

> Hi Hongshun and Becket,
>
> Sorry for being late in the discussion! I went through the entire FLIP but
> I still have some concerns about the new SplitFetcherManager.
>
> First of all I agree that we should hide the elementQueue from connector
> developers. This could simplify the interface exposed to developers so that
> they can focus on the interaction with external systems.
>
> However in the current FLIP, SplitFetcherManager exposes 4 more methods,
> poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which
> are tightly coupled with the implementation of the elementQueue. The naming
> of these methods look weird to me, like what does it mean to "poll from a
> SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify
> these methods we have to explain to developers that "well we hide a queue
> inside SplitFetcherMamager and the poll method is actually polling from the
> queue". I'm afraid these methods will implicitly expose the concept and the
> implementation of the queue to developers.
>
> I think a cleaner solution would be having a new interface that extracts
> SplitFetcher creating and managing logic from the current
> SplitFetcherManager, but having too many concepts might make the entire
> Source API even harder to understand. To make a compromise, I'm considering
> only exposing constructors of SplitFetcherManager as public APIs, and
> adding a new internal method SplitFetcherManager#getQueue() for
> SourceReaderBase (well it's a bit hacky I admit but I think exposing
> methods like poll and notifyAvailable on SplitFetcherManager is even
> worth). WDTY?
>
> Thanks,
> Qingsheng
>
> On Thu, Dec 21, 2023 at 8:36 AM Becket Qin  wrote:
>
>> Hi Hongshun,
>>
>> I think the proposal in the FLIP is basically fine. A few minor comments:
>>
>> 1. In FLIPs, we define all the user-sensible changes as public interfaces.
>> The public interface section should list all of them. So, the code blocks
>> currently in the proposed changes section should be put into the public
>> interface section instead.
>>
>> 2. It would be good to put all the changes of one class together. For
>> example, for SplitFetcherManager, we can say:
>> - Change SplitFetcherManager from Internal to PublicEvolving.
>> - Deprecate the old constructor exposing the
>> FutureCompletingBlockingQueue, and add new constructors as replacements
>> which creates the FutureCompletingBlockingQueue instance internally.
>> - Add a few new methods to expose the functionality of the internal
>> FutureCompletingBlockingQueue via the SplitFetcherManager.
>>And then follows the code block containing all the changes above.
>> Ideally, the changes should come with something like "// <-- New", so
>> that it is. easier to be found.
>>
>> 3. In the proposed changes section, it would be good to add some more
>> detailed explanation of the idea behind the public interface changes. So
>> even people new to Flink can understand better how exactly the interface
>> changes will help fulfill the motivation. For example, regarding the
>> constructor signature change, we can say the following. We can mention a
>> few things in this section:
>> - By exposing the SplitFetcherManager / SingleThreadFetcheManager, by
>> implementing addSplits() and removeSplits(), connector developers can
>> easily create their own threading models in the SourceReaderBase.
>> - Note that the SplitFetcher constructor is package private, so users
>> can only create SplitFetchers via
>> SplitFetcherManager.createSplitFetcher().
>> This ensures each SplitFetcher is always owned by the SplitFetcherManager.
>> - This FLIP essentially embedded the element queue (a
>> FutureCompletingBlockingQueue) instance into the SplitFetcherManager. This
>> hides the element queue from the connector developers and simplifies the
>> SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter
>> as major components.
>>
>> In short, the public interface section answers the question of "what". We
>> should list all the user-sensible changes in the public interface section,
>> without verbose explanation. The proposed changes section answers "how",
>> where we can add more details to explain the changes listed in the public
>> interface section.
>>
>> Thanks,

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2024-01-10 Thread Qingsheng Ren
Hi Hongshun and Becket,

Sorry for being late in the discussion! I went through the entire FLIP but
I still have some concerns about the new SplitFetcherManager.

First of all I agree that we should hide the elementQueue from connector
developers. This could simplify the interface exposed to developers so that
they can focus on the interaction with external systems.

However in the current FLIP, SplitFetcherManager exposes 4 more methods,
poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which
are tightly coupled with the implementation of the elementQueue. The naming
of these methods look weird to me, like what does it mean to "poll from a
SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify
these methods we have to explain to developers that "well we hide a queue
inside SplitFetcherMamager and the poll method is actually polling from the
queue". I'm afraid these methods will implicitly expose the concept and the
implementation of the queue to developers.

I think a cleaner solution would be having a new interface that extracts
SplitFetcher creating and managing logic from the current
SplitFetcherManager, but having too many concepts might make the entire
Source API even harder to understand. To make a compromise, I'm considering
only exposing constructors of SplitFetcherManager as public APIs, and
adding a new internal method SplitFetcherManager#getQueue() for
SourceReaderBase (well it's a bit hacky I admit but I think exposing
methods like poll and notifyAvailable on SplitFetcherManager is even
worth). WDTY?

Thanks,
Qingsheng

On Thu, Dec 21, 2023 at 8:36 AM Becket Qin  wrote:

> Hi Hongshun,
>
> I think the proposal in the FLIP is basically fine. A few minor comments:
>
> 1. In FLIPs, we define all the user-sensible changes as public interfaces.
> The public interface section should list all of them. So, the code blocks
> currently in the proposed changes section should be put into the public
> interface section instead.
>
> 2. It would be good to put all the changes of one class together. For
> example, for SplitFetcherManager, we can say:
> - Change SplitFetcherManager from Internal to PublicEvolving.
> - Deprecate the old constructor exposing the
> FutureCompletingBlockingQueue, and add new constructors as replacements
> which creates the FutureCompletingBlockingQueue instance internally.
> - Add a few new methods to expose the functionality of the internal
> FutureCompletingBlockingQueue via the SplitFetcherManager.
>And then follows the code block containing all the changes above.
> Ideally, the changes should come with something like "// <-- New", so
> that it is. easier to be found.
>
> 3. In the proposed changes section, it would be good to add some more
> detailed explanation of the idea behind the public interface changes. So
> even people new to Flink can understand better how exactly the interface
> changes will help fulfill the motivation. For example, regarding the
> constructor signature change, we can say the following. We can mention a
> few things in this section:
> - By exposing the SplitFetcherManager / SingleThreadFetcheManager, by
> implementing addSplits() and removeSplits(), connector developers can
> easily create their own threading models in the SourceReaderBase.
> - Note that the SplitFetcher constructor is package private, so users
> can only create SplitFetchers via SplitFetcherManager.createSplitFetcher().
> This ensures each SplitFetcher is always owned by the SplitFetcherManager.
> - This FLIP essentially embedded the element queue (a
> FutureCompletingBlockingQueue) instance into the SplitFetcherManager. This
> hides the element queue from the connector developers and simplifies the
> SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter
> as major components.
>
> In short, the public interface section answers the question of "what". We
> should list all the user-sensible changes in the public interface section,
> without verbose explanation. The proposed changes section answers "how",
> where we can add more details to explain the changes listed in the public
> interface section.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Wed, Dec 20, 2023 at 10:07 AM Hongshun Wang 
> wrote:
>
> > Hi Becket,
> >
> >
> > It has been a long time since we last discussed. Are there any other
> > problems with this Flip from your side? I am looking forward to hearing
> > from you.
> >
> >
> > Thanks,
> > Hongshun Wang
> >
>


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-12-20 Thread Becket Qin
Hi Hongshun,

I think the proposal in the FLIP is basically fine. A few minor comments:

1. In FLIPs, we define all the user-sensible changes as public interfaces.
The public interface section should list all of them. So, the code blocks
currently in the proposed changes section should be put into the public
interface section instead.

2. It would be good to put all the changes of one class together. For
example, for SplitFetcherManager, we can say:
- Change SplitFetcherManager from Internal to PublicEvolving.
- Deprecate the old constructor exposing the
FutureCompletingBlockingQueue, and add new constructors as replacements
which creates the FutureCompletingBlockingQueue instance internally.
- Add a few new methods to expose the functionality of the internal
FutureCompletingBlockingQueue via the SplitFetcherManager.
   And then follows the code block containing all the changes above.
Ideally, the changes should come with something like "// <-- New", so
that it is. easier to be found.

3. In the proposed changes section, it would be good to add some more
detailed explanation of the idea behind the public interface changes. So
even people new to Flink can understand better how exactly the interface
changes will help fulfill the motivation. For example, regarding the
constructor signature change, we can say the following. We can mention a
few things in this section:
- By exposing the SplitFetcherManager / SingleThreadFetcheManager, by
implementing addSplits() and removeSplits(), connector developers can
easily create their own threading models in the SourceReaderBase.
- Note that the SplitFetcher constructor is package private, so users
can only create SplitFetchers via SplitFetcherManager.createSplitFetcher().
This ensures each SplitFetcher is always owned by the SplitFetcherManager.
- This FLIP essentially embedded the element queue (a
FutureCompletingBlockingQueue) instance into the SplitFetcherManager. This
hides the element queue from the connector developers and simplifies the
SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter
as major components.

In short, the public interface section answers the question of "what". We
should list all the user-sensible changes in the public interface section,
without verbose explanation. The proposed changes section answers "how",
where we can add more details to explain the changes listed in the public
interface section.

Thanks,

Jiangjie (Becket) Qin



On Wed, Dec 20, 2023 at 10:07 AM Hongshun Wang 
wrote:

> Hi Becket,
>
>
> It has been a long time since we last discussed. Are there any other
> problems with this Flip from your side? I am looking forward to hearing
> from you.
>
>
> Thanks,
> Hongshun Wang
>


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-12-19 Thread Hongshun Wang
Hi Becket,


It has been a long time since we last discussed. Are there any other
problems with this Flip from your side? I am looking forward to hearing
from you.


Thanks,
Hongshun Wang


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-30 Thread Hongshun Wang
Hi all,
Any additional questions or concern regarding this FLIP-389[1].? Looking
forward to hearing from you.

Thanks,
Hongshun Wang

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

On Wed, Nov 22, 2023 at 3:44 PM Hongshun Wang 
wrote:

> Hi Becket,
>
> Thanks a lot, I have no problem any more. And I have made further
> modifications to FLIP-389[1].
> In summary, this flip has 2 goals:
>
>- Annotate SingleThreadFetcherManager as PublicEvolving.
>- Shield FutureCompletingBlockingQueue from users and limit all
>operations on FutureCompletingBlockingQueue in SplitFetcherManager.
>
> All the changes are listed below:
>
>- Mark constructor of SourceReaderBase and
>SingleThreadMultiplexSourceReaderBase as @Depricated and provide a new
>constructor without FutureCompletingBlockingQueue.
>- Mark SplitFetcherManager andSingleThreadFetcherManager as
>`@PublicEvolving`,  mark constructor of SplitFetcherManager and
>SingleThreadFetcherManager as  @Depricated and provide a new constructor
>without FutureCompletingBlockingQueue.
>- SplitFetcherManager provides  wrapper methods for
>FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.
>- Mark SplitFetcher and SplitFetcherTask as PublicEvolving.
>
>
> Any additional questions regarding this FLIP? Looking forward to hearing
> from you.
>
> Thanks,
> Hongshun Wang
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
>
>
>
>
> On Wed, Nov 22, 2023 at 10:15 AM Becket Qin  wrote:
>
>> Hi Hongshun,
>>
>> The constructor of the SplitFetcher is already package private. So it can
>> only be accessed from the classes in the package
>> org.apache.flink.connector.base.source.reader.fetcher. And apparently,
>> user
>> classes should not be in this package. Therefore, even if we mark the
>> SplitFetcher class as PublicEvolving, the constructor is not available to
>> the users. Only the public and protected methods are considered public API
>> in this case. Private / package private methods and fields are still
>> internal.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Wed, Nov 22, 2023 at 9:46 AM Hongshun Wang 
>> wrote:
>>
>> > Hi Becket,
>> >
>> > If SplitFetcherManager becomes PublicEvolving, that also means
>> SplitFetcher
>> > > needs to be PublicEvolving, because it is returned by the protected
>> > method
>> > > SplitFetcherManager.createSplitFetcher().
>> >
>> >
>> >
>> > > it looks like there is no need to expose the constructor of
>> SplitFetcher
>> > > to the end users. Having an interface of SplitFetcher is also fine,
>> but
>> > > might not be necessary in this case.
>> >
>> >
>> >
>> > I don't know how to make SplitFetcher as PublicEnvolving but not  to
>> expose
>> > the constructor of SplitFetcher to the end users?
>> >
>> > Thanks,
>> > Hongshun Wang
>> >
>> > On Tue, Nov 21, 2023 at 7:23 PM Becket Qin 
>> wrote:
>> >
>> > > Hi Hongshun,
>> > >
>> > > Do we need to expose the constructor of SplitFetcher to the users?
>> > Ideally,
>> > > users should always get a new fetcher instance by calling
>> > > SplitFetcherManager.createSplitFetcher(). Or, they can get an existing
>> > > SplitFetcher by looking up in the SplitFetcherManager.fetchers map. I
>> > think
>> > > this makes sense because a SplitFetcher should always belong to a
>> > > SplitFetcherManager. Therefore, it should be created via a
>> > > SplitFetcherManager as well. So, it looks like there is no need to
>> expose
>> > > the constructor of SplitFetcher to the end users.
>> > >
>> > > Having an interface of SplitFetcher is also fine, but might not be
>> > > necessary in this case.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On Tue, Nov 21, 2023 at 10:36 AM Hongshun Wang <
>> loserwang1...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Becket,
>> > > >
>> > > > > Additionally, SplitFetcherTask requires
>> FutureCompletingBlockingQueue
>> > > as
>> > > > a constructor parameter, which is not allowed  now.
>> > > > Sorry, it was my writing mistake. What I meant is that
>> *SplitFetcher*
>> > > > requires FutureCompletingBlockingQueue as a constructor parameter.
>> > > > SplitFetcher
>> > > > is a class rather than Interface. Therefore, I want to  change
>> > > > SplitFetcher to a public Interface and moving its implementation
>> > > > details to an implement
>> > > > subclass .
>> > > >
>> > > > Thanks,
>> > > > Hongshun Wang
>> > > >
>> > > > On Fri, Nov 17, 2023 at 6:21 PM Becket Qin 
>> > wrote:
>> > > >
>> > > > > Hi Hongshun,
>> > > > >
>> > > > > SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask
>> is
>> > > > already
>> > > > > an interface, and we need to make that as a PublicEvolving API as
>> > well.
>> > > > >
>> > > > > So overall, a source developer can potentially do a few things in
>> the
>> > > > > SplitFetcherManager.
>> > > > > 1. for customized logic including split-to-fetcher assignment,
>> > > threading
>> > > > 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-21 Thread Hongshun Wang
Hi Becket,

Thanks a lot, I have no problem any more. And I have made further
modifications to FLIP-389[1].
In summary, this flip has 2 goals:

   - Annotate SingleThreadFetcherManager as PublicEvolving.
   - Shield FutureCompletingBlockingQueue from users and limit all
   operations on FutureCompletingBlockingQueue in SplitFetcherManager.

All the changes are listed below:

   - Mark constructor of SourceReaderBase and
   SingleThreadMultiplexSourceReaderBase as @Depricated and provide a new
   constructor without FutureCompletingBlockingQueue.
   - Mark SplitFetcherManager andSingleThreadFetcherManager as
   `@PublicEvolving`,  mark constructor of SplitFetcherManager and
   SingleThreadFetcherManager as  @Depricated and provide a new constructor
   without FutureCompletingBlockingQueue.
   - SplitFetcherManager provides  wrapper methods for
   FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.
   - Mark SplitFetcher and SplitFetcherTask as PublicEvolving.


Any additional questions regarding this FLIP? Looking forward to hearing
from you.

Thanks,
Hongshun Wang

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498




On Wed, Nov 22, 2023 at 10:15 AM Becket Qin  wrote:

> Hi Hongshun,
>
> The constructor of the SplitFetcher is already package private. So it can
> only be accessed from the classes in the package
> org.apache.flink.connector.base.source.reader.fetcher. And apparently, user
> classes should not be in this package. Therefore, even if we mark the
> SplitFetcher class as PublicEvolving, the constructor is not available to
> the users. Only the public and protected methods are considered public API
> in this case. Private / package private methods and fields are still
> internal.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Nov 22, 2023 at 9:46 AM Hongshun Wang 
> wrote:
>
> > Hi Becket,
> >
> > If SplitFetcherManager becomes PublicEvolving, that also means
> SplitFetcher
> > > needs to be PublicEvolving, because it is returned by the protected
> > method
> > > SplitFetcherManager.createSplitFetcher().
> >
> >
> >
> > > it looks like there is no need to expose the constructor of
> SplitFetcher
> > > to the end users. Having an interface of SplitFetcher is also fine, but
> > > might not be necessary in this case.
> >
> >
> >
> > I don't know how to make SplitFetcher as PublicEnvolving but not  to
> expose
> > the constructor of SplitFetcher to the end users?
> >
> > Thanks,
> > Hongshun Wang
> >
> > On Tue, Nov 21, 2023 at 7:23 PM Becket Qin  wrote:
> >
> > > Hi Hongshun,
> > >
> > > Do we need to expose the constructor of SplitFetcher to the users?
> > Ideally,
> > > users should always get a new fetcher instance by calling
> > > SplitFetcherManager.createSplitFetcher(). Or, they can get an existing
> > > SplitFetcher by looking up in the SplitFetcherManager.fetchers map. I
> > think
> > > this makes sense because a SplitFetcher should always belong to a
> > > SplitFetcherManager. Therefore, it should be created via a
> > > SplitFetcherManager as well. So, it looks like there is no need to
> expose
> > > the constructor of SplitFetcher to the end users.
> > >
> > > Having an interface of SplitFetcher is also fine, but might not be
> > > necessary in this case.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Tue, Nov 21, 2023 at 10:36 AM Hongshun Wang <
> loserwang1...@gmail.com>
> > > wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > > Additionally, SplitFetcherTask requires
> FutureCompletingBlockingQueue
> > > as
> > > > a constructor parameter, which is not allowed  now.
> > > > Sorry, it was my writing mistake. What I meant is that *SplitFetcher*
> > > > requires FutureCompletingBlockingQueue as a constructor parameter.
> > > > SplitFetcher
> > > > is a class rather than Interface. Therefore, I want to  change
> > > > SplitFetcher to a public Interface and moving its implementation
> > > > details to an implement
> > > > subclass .
> > > >
> > > > Thanks,
> > > > Hongshun Wang
> > > >
> > > > On Fri, Nov 17, 2023 at 6:21 PM Becket Qin 
> > wrote:
> > > >
> > > > > Hi Hongshun,
> > > > >
> > > > > SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is
> > > > already
> > > > > an interface, and we need to make that as a PublicEvolving API as
> > well.
> > > > >
> > > > > So overall, a source developer can potentially do a few things in
> the
> > > > > SplitFetcherManager.
> > > > > 1. for customized logic including split-to-fetcher assignment,
> > > threading
> > > > > model, etc.
> > > > > 2. create their own SplitFetcherTask for the SplitFetcher /
> > SplitReader
> > > > to
> > > > > execute in a coordinated manner.
> > > > >
> > > > > It should be powerful enough for the vast majority of the source
> > > > > implementation, if not all.
> > > > >
> > > > >
> > > > > Additionally, SplitFetcherTask requires
> FutureCompletingBlockingQueue
> > > > > > as a
> > > > > > constructor parameter, which is not 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-21 Thread Becket Qin
Hi Hongshun,

The constructor of the SplitFetcher is already package private. So it can
only be accessed from the classes in the package
org.apache.flink.connector.base.source.reader.fetcher. And apparently, user
classes should not be in this package. Therefore, even if we mark the
SplitFetcher class as PublicEvolving, the constructor is not available to
the users. Only the public and protected methods are considered public API
in this case. Private / package private methods and fields are still
internal.

Thanks,

Jiangjie (Becket) Qin

On Wed, Nov 22, 2023 at 9:46 AM Hongshun Wang 
wrote:

> Hi Becket,
>
> If SplitFetcherManager becomes PublicEvolving, that also means SplitFetcher
> > needs to be PublicEvolving, because it is returned by the protected
> method
> > SplitFetcherManager.createSplitFetcher().
>
>
>
> > it looks like there is no need to expose the constructor of SplitFetcher
> > to the end users. Having an interface of SplitFetcher is also fine, but
> > might not be necessary in this case.
>
>
>
> I don't know how to make SplitFetcher as PublicEnvolving but not  to expose
> the constructor of SplitFetcher to the end users?
>
> Thanks,
> Hongshun Wang
>
> On Tue, Nov 21, 2023 at 7:23 PM Becket Qin  wrote:
>
> > Hi Hongshun,
> >
> > Do we need to expose the constructor of SplitFetcher to the users?
> Ideally,
> > users should always get a new fetcher instance by calling
> > SplitFetcherManager.createSplitFetcher(). Or, they can get an existing
> > SplitFetcher by looking up in the SplitFetcherManager.fetchers map. I
> think
> > this makes sense because a SplitFetcher should always belong to a
> > SplitFetcherManager. Therefore, it should be created via a
> > SplitFetcherManager as well. So, it looks like there is no need to expose
> > the constructor of SplitFetcher to the end users.
> >
> > Having an interface of SplitFetcher is also fine, but might not be
> > necessary in this case.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Nov 21, 2023 at 10:36 AM Hongshun Wang 
> > wrote:
> >
> > > Hi Becket,
> > >
> > > > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > as
> > > a constructor parameter, which is not allowed  now.
> > > Sorry, it was my writing mistake. What I meant is that *SplitFetcher*
> > > requires FutureCompletingBlockingQueue as a constructor parameter.
> > > SplitFetcher
> > > is a class rather than Interface. Therefore, I want to  change
> > > SplitFetcher to a public Interface and moving its implementation
> > > details to an implement
> > > subclass .
> > >
> > > Thanks,
> > > Hongshun Wang
> > >
> > > On Fri, Nov 17, 2023 at 6:21 PM Becket Qin 
> wrote:
> > >
> > > > Hi Hongshun,
> > > >
> > > > SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is
> > > already
> > > > an interface, and we need to make that as a PublicEvolving API as
> well.
> > > >
> > > > So overall, a source developer can potentially do a few things in the
> > > > SplitFetcherManager.
> > > > 1. for customized logic including split-to-fetcher assignment,
> > threading
> > > > model, etc.
> > > > 2. create their own SplitFetcherTask for the SplitFetcher /
> SplitReader
> > > to
> > > > execute in a coordinated manner.
> > > >
> > > > It should be powerful enough for the vast majority of the source
> > > > implementation, if not all.
> > > >
> > > >
> > > > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > > > > as a
> > > > > constructor parameter, which is not allowed
> > > > > now.
> > > >
> > > > Are you referring to FetchTask which implements SplitFetcherTask?
> That
> > > > class will remain internal.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Fri, Nov 17, 2023 at 5:23 PM Hongshun Wang <
> loserwang1...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi, Jiangjie(Becket) ,
> > > > > Thank you for your advice. I have learned a lot.
> > > > >
> > > > > If SplitFetcherManager becomes PublicEvolving, that also means
> > > > > > SplitFetcher needs to be PublicEvolving, because it is returned
> by
> > > the
> > > > > > protected method SplitFetcherManager.createSplitFetcher().
> > > > >
> > > > > I completely agree with you. However, if SplitFetcher becomes
> > > > > PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
> > > > > because it is returned by the public method
> SplitFetcher#enqueueTask.
> > > > > Additionally, SplitFetcherTask requires
> FutureCompletingBlockingQueue
> > > > > as a
> > > > > constructor parameter, which is not allowed
> > > > > now. Therefore, I propose changing SplitFetcher to a public
> Interface
> > > > > and moving its implementation details to an implement class (e.g.,
> > > > > SplitFetcherImpl or another suitable name). SplitFetcherImpl will
> be
> > > > > marked as internal and managed by SplitFetcherManager,
> > > > > and put data in the queue.
> > > > > Subclasses of SplitFetcherManager can only use the SplitFetcher
> > > > interface,
> > 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-21 Thread Hongshun Wang
Hi Becket,

If SplitFetcherManager becomes PublicEvolving, that also means SplitFetcher
> needs to be PublicEvolving, because it is returned by the protected method
> SplitFetcherManager.createSplitFetcher().



> it looks like there is no need to expose the constructor of SplitFetcher
> to the end users. Having an interface of SplitFetcher is also fine, but
> might not be necessary in this case.



I don't know how to make SplitFetcher as PublicEnvolving but not  to expose
the constructor of SplitFetcher to the end users?

Thanks,
Hongshun Wang

On Tue, Nov 21, 2023 at 7:23 PM Becket Qin  wrote:

> Hi Hongshun,
>
> Do we need to expose the constructor of SplitFetcher to the users? Ideally,
> users should always get a new fetcher instance by calling
> SplitFetcherManager.createSplitFetcher(). Or, they can get an existing
> SplitFetcher by looking up in the SplitFetcherManager.fetchers map. I think
> this makes sense because a SplitFetcher should always belong to a
> SplitFetcherManager. Therefore, it should be created via a
> SplitFetcherManager as well. So, it looks like there is no need to expose
> the constructor of SplitFetcher to the end users.
>
> Having an interface of SplitFetcher is also fine, but might not be
> necessary in this case.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Nov 21, 2023 at 10:36 AM Hongshun Wang 
> wrote:
>
> > Hi Becket,
> >
> > > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> as
> > a constructor parameter, which is not allowed  now.
> > Sorry, it was my writing mistake. What I meant is that *SplitFetcher*
> > requires FutureCompletingBlockingQueue as a constructor parameter.
> > SplitFetcher
> > is a class rather than Interface. Therefore, I want to  change
> > SplitFetcher to a public Interface and moving its implementation
> > details to an implement
> > subclass .
> >
> > Thanks,
> > Hongshun Wang
> >
> > On Fri, Nov 17, 2023 at 6:21 PM Becket Qin  wrote:
> >
> > > Hi Hongshun,
> > >
> > > SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is
> > already
> > > an interface, and we need to make that as a PublicEvolving API as well.
> > >
> > > So overall, a source developer can potentially do a few things in the
> > > SplitFetcherManager.
> > > 1. for customized logic including split-to-fetcher assignment,
> threading
> > > model, etc.
> > > 2. create their own SplitFetcherTask for the SplitFetcher / SplitReader
> > to
> > > execute in a coordinated manner.
> > >
> > > It should be powerful enough for the vast majority of the source
> > > implementation, if not all.
> > >
> > >
> > > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > > > as a
> > > > constructor parameter, which is not allowed
> > > > now.
> > >
> > > Are you referring to FetchTask which implements SplitFetcherTask? That
> > > class will remain internal.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Fri, Nov 17, 2023 at 5:23 PM Hongshun Wang  >
> > > wrote:
> > >
> > > > Hi, Jiangjie(Becket) ,
> > > > Thank you for your advice. I have learned a lot.
> > > >
> > > > If SplitFetcherManager becomes PublicEvolving, that also means
> > > > > SplitFetcher needs to be PublicEvolving, because it is returned by
> > the
> > > > > protected method SplitFetcherManager.createSplitFetcher().
> > > >
> > > > I completely agree with you. However, if SplitFetcher becomes
> > > > PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
> > > > because it is returned by the public method SplitFetcher#enqueueTask.
> > > > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > > > as a
> > > > constructor parameter, which is not allowed
> > > > now. Therefore, I propose changing SplitFetcher to a public Interface
> > > > and moving its implementation details to an implement class (e.g.,
> > > > SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
> > > > marked as internal and managed by SplitFetcherManager,
> > > > and put data in the queue.
> > > > Subclasses of SplitFetcherManager can only use the SplitFetcher
> > > interface,
> > > > also ensuring that the current subclasses are not affected.
> > > >
> > > >
> > > >
> > > > The current SplitFetcherManager basically looks up
> > > > > the SplitT from the fetcher with the split Id, and immediately
> passes
> > > the
> > > > > SplitT back to the fetcher, which is unnecessary.
> > > >
> > > > I inferred that this is because SplitReader#pauseOrResumeSplits
> > > > requires SplitT instead of SpiltId.  Perhaps some external source
> > > > requires more information to pause. However, SplitReader doesn't
> store
> > > > all its split data, while SplitFetcherManager saves them.
> > > > CC, @Dawid Wysakowicz
> > > >
> > > >
> > > >
> > > >  If not, SplitFetcher.pause() and
> > > > > SplitFetcher.resume() can be removed. In fact, they seem no longer
> > used
> > > > > anywhere.
> > > >
> > > > It seems no use any more. CC, @Arvid Heise
> > > >
> > > 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-21 Thread Becket Qin
Hi Hongshun,

Do we need to expose the constructor of SplitFetcher to the users? Ideally,
users should always get a new fetcher instance by calling
SplitFetcherManager.createSplitFetcher(). Or, they can get an existing
SplitFetcher by looking up in the SplitFetcherManager.fetchers map. I think
this makes sense because a SplitFetcher should always belong to a
SplitFetcherManager. Therefore, it should be created via a
SplitFetcherManager as well. So, it looks like there is no need to expose
the constructor of SplitFetcher to the end users.

Having an interface of SplitFetcher is also fine, but might not be
necessary in this case.

Thanks,

Jiangjie (Becket) Qin

On Tue, Nov 21, 2023 at 10:36 AM Hongshun Wang 
wrote:

> Hi Becket,
>
> > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue  as
> a constructor parameter, which is not allowed  now.
> Sorry, it was my writing mistake. What I meant is that *SplitFetcher*
> requires FutureCompletingBlockingQueue as a constructor parameter.
> SplitFetcher
> is a class rather than Interface. Therefore, I want to  change
> SplitFetcher to a public Interface and moving its implementation
> details to an implement
> subclass .
>
> Thanks,
> Hongshun Wang
>
> On Fri, Nov 17, 2023 at 6:21 PM Becket Qin  wrote:
>
> > Hi Hongshun,
> >
> > SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is
> already
> > an interface, and we need to make that as a PublicEvolving API as well.
> >
> > So overall, a source developer can potentially do a few things in the
> > SplitFetcherManager.
> > 1. for customized logic including split-to-fetcher assignment, threading
> > model, etc.
> > 2. create their own SplitFetcherTask for the SplitFetcher / SplitReader
> to
> > execute in a coordinated manner.
> >
> > It should be powerful enough for the vast majority of the source
> > implementation, if not all.
> >
> >
> > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > > as a
> > > constructor parameter, which is not allowed
> > > now.
> >
> > Are you referring to FetchTask which implements SplitFetcherTask? That
> > class will remain internal.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Nov 17, 2023 at 5:23 PM Hongshun Wang 
> > wrote:
> >
> > > Hi, Jiangjie(Becket) ,
> > > Thank you for your advice. I have learned a lot.
> > >
> > > If SplitFetcherManager becomes PublicEvolving, that also means
> > > > SplitFetcher needs to be PublicEvolving, because it is returned by
> the
> > > > protected method SplitFetcherManager.createSplitFetcher().
> > >
> > > I completely agree with you. However, if SplitFetcher becomes
> > > PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
> > > because it is returned by the public method SplitFetcher#enqueueTask.
> > > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > > as a
> > > constructor parameter, which is not allowed
> > > now. Therefore, I propose changing SplitFetcher to a public Interface
> > > and moving its implementation details to an implement class (e.g.,
> > > SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
> > > marked as internal and managed by SplitFetcherManager,
> > > and put data in the queue.
> > > Subclasses of SplitFetcherManager can only use the SplitFetcher
> > interface,
> > > also ensuring that the current subclasses are not affected.
> > >
> > >
> > >
> > > The current SplitFetcherManager basically looks up
> > > > the SplitT from the fetcher with the split Id, and immediately passes
> > the
> > > > SplitT back to the fetcher, which is unnecessary.
> > >
> > > I inferred that this is because SplitReader#pauseOrResumeSplits
> > > requires SplitT instead of SpiltId.  Perhaps some external source
> > > requires more information to pause. However, SplitReader doesn't store
> > > all its split data, while SplitFetcherManager saves them.
> > > CC, @Dawid Wysakowicz
> > >
> > >
> > >
> > >  If not, SplitFetcher.pause() and
> > > > SplitFetcher.resume() can be removed. In fact, they seem no longer
> used
> > > > anywhere.
> > >
> > > It seems no use any more. CC, @Arvid Heise
> > >
> > >
> > >
> > > Thanks,
> > > Hongshun Wang
> > >
> > > On Fri, Nov 17, 2023 at 11:42 AM Becket Qin 
> > wrote:
> > >
> > > > Hi Hongshun,
> > > >
> > > > Thanks for updating the FLIP. I think that makes sense. A few
> comments
> > > > below:
> > > >
> > > > 1. If SplitFetcherManager becomes PublicEvolving, that also means
> > > > SplitFetcher needs to be PublicEvolving, because it is returned by
> the
> > > > protected method SplitFetcherManager.createSplitFetcher().
> > > >
> > > > 2. When checking the API of the classes to be marked as
> PublicEvolving,
> > > > there might be a few methods' signatures worth some discussion.
> > > >
> > > > For SplitFetcherManager:
> > > > a) Currently removeSplits() methods takes a list of SplitT. I am
> > > wondering
> > > > if it should be a list of splitIds. SplitT actually contains two
> 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-20 Thread Hongshun Wang
Hi Becket,

> Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue  as
a constructor parameter, which is not allowed  now.
Sorry, it was my writing mistake. What I meant is that *SplitFetcher*
requires FutureCompletingBlockingQueue as a constructor parameter. SplitFetcher
is a class rather than Interface. Therefore, I want to  change
SplitFetcher to a public Interface and moving its implementation
details to an implement
subclass .

Thanks,
Hongshun Wang

On Fri, Nov 17, 2023 at 6:21 PM Becket Qin  wrote:

> Hi Hongshun,
>
> SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is already
> an interface, and we need to make that as a PublicEvolving API as well.
>
> So overall, a source developer can potentially do a few things in the
> SplitFetcherManager.
> 1. for customized logic including split-to-fetcher assignment, threading
> model, etc.
> 2. create their own SplitFetcherTask for the SplitFetcher / SplitReader to
> execute in a coordinated manner.
>
> It should be powerful enough for the vast majority of the source
> implementation, if not all.
>
>
> Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > as a
> > constructor parameter, which is not allowed
> > now.
>
> Are you referring to FetchTask which implements SplitFetcherTask? That
> class will remain internal.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 17, 2023 at 5:23 PM Hongshun Wang 
> wrote:
>
> > Hi, Jiangjie(Becket) ,
> > Thank you for your advice. I have learned a lot.
> >
> > If SplitFetcherManager becomes PublicEvolving, that also means
> > > SplitFetcher needs to be PublicEvolving, because it is returned by the
> > > protected method SplitFetcherManager.createSplitFetcher().
> >
> > I completely agree with you. However, if SplitFetcher becomes
> > PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
> > because it is returned by the public method SplitFetcher#enqueueTask.
> > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > as a
> > constructor parameter, which is not allowed
> > now. Therefore, I propose changing SplitFetcher to a public Interface
> > and moving its implementation details to an implement class (e.g.,
> > SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
> > marked as internal and managed by SplitFetcherManager,
> > and put data in the queue.
> > Subclasses of SplitFetcherManager can only use the SplitFetcher
> interface,
> > also ensuring that the current subclasses are not affected.
> >
> >
> >
> > The current SplitFetcherManager basically looks up
> > > the SplitT from the fetcher with the split Id, and immediately passes
> the
> > > SplitT back to the fetcher, which is unnecessary.
> >
> > I inferred that this is because SplitReader#pauseOrResumeSplits
> > requires SplitT instead of SpiltId.  Perhaps some external source
> > requires more information to pause. However, SplitReader doesn't store
> > all its split data, while SplitFetcherManager saves them.
> > CC, @Dawid Wysakowicz
> >
> >
> >
> >  If not, SplitFetcher.pause() and
> > > SplitFetcher.resume() can be removed. In fact, they seem no longer used
> > > anywhere.
> >
> > It seems no use any more. CC, @Arvid Heise
> >
> >
> >
> > Thanks,
> > Hongshun Wang
> >
> > On Fri, Nov 17, 2023 at 11:42 AM Becket Qin 
> wrote:
> >
> > > Hi Hongshun,
> > >
> > > Thanks for updating the FLIP. I think that makes sense. A few comments
> > > below:
> > >
> > > 1. If SplitFetcherManager becomes PublicEvolving, that also means
> > > SplitFetcher needs to be PublicEvolving, because it is returned by the
> > > protected method SplitFetcherManager.createSplitFetcher().
> > >
> > > 2. When checking the API of the classes to be marked as PublicEvolving,
> > > there might be a few methods' signatures worth some discussion.
> > >
> > > For SplitFetcherManager:
> > > a) Currently removeSplits() methods takes a list of SplitT. I am
> > wondering
> > > if it should be a list of splitIds. SplitT actually contains two parts
> of
> > > information, the static split Id and some dynamically changing state of
> > the
> > > split (e.g. Kafka consumer offset). The source of truth for the dynamic
> > > state is SourceReaderBase. Currently we are passing in the full source
> > > split with the dynamic state for split removal. But it looks like only
> > > split id is needed for the split removal.
> > > Maybe this is intentional, as sometimes when a SplitReader removes a
> > split,
> > > it also wants to know the dynamic state of the split. If so, we can
> keep
> > it
> > > as is. But then the question is why
> > > SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead
> > of
> > > SplitT. Should we make them consistent?
> > >
> > > For SplitFetcher:
> > > a) The SplitFetcher.pauseOrResumeSplits() method takes collections of
> > > SplitT as arguments. We may want to adjust that according to what we do
> > to
> > > the SplitFetcherManager. The current 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-17 Thread Becket Qin
Hi Hongshun,

SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is already
an interface, and we need to make that as a PublicEvolving API as well.

So overall, a source developer can potentially do a few things in the
SplitFetcherManager.
1. for customized logic including split-to-fetcher assignment, threading
model, etc.
2. create their own SplitFetcherTask for the SplitFetcher / SplitReader to
execute in a coordinated manner.

It should be powerful enough for the vast majority of the source
implementation, if not all.


Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> as a
> constructor parameter, which is not allowed
> now.

Are you referring to FetchTask which implements SplitFetcherTask? That
class will remain internal.

Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 17, 2023 at 5:23 PM Hongshun Wang 
wrote:

> Hi, Jiangjie(Becket) ,
> Thank you for your advice. I have learned a lot.
>
> If SplitFetcherManager becomes PublicEvolving, that also means
> > SplitFetcher needs to be PublicEvolving, because it is returned by the
> > protected method SplitFetcherManager.createSplitFetcher().
>
> I completely agree with you. However, if SplitFetcher becomes
> PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
> because it is returned by the public method SplitFetcher#enqueueTask.
> Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> as a
> constructor parameter, which is not allowed
> now. Therefore, I propose changing SplitFetcher to a public Interface
> and moving its implementation details to an implement class (e.g.,
> SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
> marked as internal and managed by SplitFetcherManager,
> and put data in the queue.
> Subclasses of SplitFetcherManager can only use the SplitFetcher interface,
> also ensuring that the current subclasses are not affected.
>
>
>
> The current SplitFetcherManager basically looks up
> > the SplitT from the fetcher with the split Id, and immediately passes the
> > SplitT back to the fetcher, which is unnecessary.
>
> I inferred that this is because SplitReader#pauseOrResumeSplits
> requires SplitT instead of SpiltId.  Perhaps some external source
> requires more information to pause. However, SplitReader doesn't store
> all its split data, while SplitFetcherManager saves them.
> CC, @Dawid Wysakowicz
>
>
>
>  If not, SplitFetcher.pause() and
> > SplitFetcher.resume() can be removed. In fact, they seem no longer used
> > anywhere.
>
> It seems no use any more. CC, @Arvid Heise
>
>
>
> Thanks,
> Hongshun Wang
>
> On Fri, Nov 17, 2023 at 11:42 AM Becket Qin  wrote:
>
> > Hi Hongshun,
> >
> > Thanks for updating the FLIP. I think that makes sense. A few comments
> > below:
> >
> > 1. If SplitFetcherManager becomes PublicEvolving, that also means
> > SplitFetcher needs to be PublicEvolving, because it is returned by the
> > protected method SplitFetcherManager.createSplitFetcher().
> >
> > 2. When checking the API of the classes to be marked as PublicEvolving,
> > there might be a few methods' signatures worth some discussion.
> >
> > For SplitFetcherManager:
> > a) Currently removeSplits() methods takes a list of SplitT. I am
> wondering
> > if it should be a list of splitIds. SplitT actually contains two parts of
> > information, the static split Id and some dynamically changing state of
> the
> > split (e.g. Kafka consumer offset). The source of truth for the dynamic
> > state is SourceReaderBase. Currently we are passing in the full source
> > split with the dynamic state for split removal. But it looks like only
> > split id is needed for the split removal.
> > Maybe this is intentional, as sometimes when a SplitReader removes a
> split,
> > it also wants to know the dynamic state of the split. If so, we can keep
> it
> > as is. But then the question is why
> > SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead
> of
> > SplitT. Should we make them consistent?
> >
> > For SplitFetcher:
> > a) The SplitFetcher.pauseOrResumeSplits() method takes collections of
> > SplitT as arguments. We may want to adjust that according to what we do
> to
> > the SplitFetcherManager. The current SplitFetcherManager basically looks
> up
> > the SplitT from the fetcher with the split Id, and immediately passes the
> > SplitT back to the fetcher, which is unnecessary.
> > b) After supporting split level pause and resume, do we still need split
> > fetcher level pause and resume? If not, SplitFetcher.pause() and
> > SplitFetcher.resume() can be removed. In fact, they seem no longer used
> > anywhere.
> >
> > Other than the above potential API adjustment before we mark the classes
> > PublicEvolving, the API looks fine to me.
> >
> > I think it is good timing for deprecation now. We will mark the impacted
> > constructors as deprecated in 1.19, and remove them in release of 2.0.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Thu, Nov 16, 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-17 Thread Hongshun Wang
Hi, Jiangjie(Becket) ,
Thank you for your advice. I have learned a lot.

If SplitFetcherManager becomes PublicEvolving, that also means
> SplitFetcher needs to be PublicEvolving, because it is returned by the
> protected method SplitFetcherManager.createSplitFetcher().

I completely agree with you. However, if SplitFetcher becomes
PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
because it is returned by the public method SplitFetcher#enqueueTask.
Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
as a
constructor parameter, which is not allowed
now. Therefore, I propose changing SplitFetcher to a public Interface
and moving its implementation details to an implement class (e.g.,
SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
marked as internal and managed by SplitFetcherManager,
and put data in the queue.
Subclasses of SplitFetcherManager can only use the SplitFetcher interface,
also ensuring that the current subclasses are not affected.



The current SplitFetcherManager basically looks up
> the SplitT from the fetcher with the split Id, and immediately passes the
> SplitT back to the fetcher, which is unnecessary.

I inferred that this is because SplitReader#pauseOrResumeSplits
requires SplitT instead of SpiltId.  Perhaps some external source
requires more information to pause. However, SplitReader doesn't store
all its split data, while SplitFetcherManager saves them.
CC, @Dawid Wysakowicz



 If not, SplitFetcher.pause() and
> SplitFetcher.resume() can be removed. In fact, they seem no longer used
> anywhere.

It seems no use any more. CC, @Arvid Heise



Thanks,
Hongshun Wang

On Fri, Nov 17, 2023 at 11:42 AM Becket Qin  wrote:

> Hi Hongshun,
>
> Thanks for updating the FLIP. I think that makes sense. A few comments
> below:
>
> 1. If SplitFetcherManager becomes PublicEvolving, that also means
> SplitFetcher needs to be PublicEvolving, because it is returned by the
> protected method SplitFetcherManager.createSplitFetcher().
>
> 2. When checking the API of the classes to be marked as PublicEvolving,
> there might be a few methods' signatures worth some discussion.
>
> For SplitFetcherManager:
> a) Currently removeSplits() methods takes a list of SplitT. I am wondering
> if it should be a list of splitIds. SplitT actually contains two parts of
> information, the static split Id and some dynamically changing state of the
> split (e.g. Kafka consumer offset). The source of truth for the dynamic
> state is SourceReaderBase. Currently we are passing in the full source
> split with the dynamic state for split removal. But it looks like only
> split id is needed for the split removal.
> Maybe this is intentional, as sometimes when a SplitReader removes a split,
> it also wants to know the dynamic state of the split. If so, we can keep it
> as is. But then the question is why
> SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead of
> SplitT. Should we make them consistent?
>
> For SplitFetcher:
> a) The SplitFetcher.pauseOrResumeSplits() method takes collections of
> SplitT as arguments. We may want to adjust that according to what we do to
> the SplitFetcherManager. The current SplitFetcherManager basically looks up
> the SplitT from the fetcher with the split Id, and immediately passes the
> SplitT back to the fetcher, which is unnecessary.
> b) After supporting split level pause and resume, do we still need split
> fetcher level pause and resume? If not, SplitFetcher.pause() and
> SplitFetcher.resume() can be removed. In fact, they seem no longer used
> anywhere.
>
> Other than the above potential API adjustment before we mark the classes
> PublicEvolving, the API looks fine to me.
>
> I think it is good timing for deprecation now. We will mark the impacted
> constructors as deprecated in 1.19, and remove them in release of 2.0.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Thu, Nov 16, 2023 at 8:26 PM Hongshun Wang 
> wrote:
>
> > Hi Devs,
> >
> > I have just modified the content of FLIP-389: Annotate
> > SingleThreadFetcherManager as PublicEvolving[1].
> >
> > Now this Flip mainly do two thing:
> >
> >1. Annotate SingleThreadFetcherManager as PublicEvolving
> >2. Remove all public constructors which use
> >FutureCompletingBlockingQueue. This will make many constructors as
> >@Depricated.
> >
> > This may influence many connectors, so I am looking forward to hearing
> from
> > you.
> >
> >
> > Best regards,
> > Hongshun
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> >
> > On Wed, Nov 15, 2023 at 7:57 AM Becket Qin  wrote:
> >
> > > Hi Hongshun,
> > > >
> > > >
> > > > However, it will be tricky because SplitFetcherManager includes  > > SplitT
> > > > extends SourceSplit>, while FutureCompletingBlockingQueue includes
> .
> > > > This means that SplitFetcherManager would have to be modified to  E,
> > > > SplitT extends SourceSplit>, which would affect the 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-16 Thread Becket Qin
Hi Hongshun,

Thanks for updating the FLIP. I think that makes sense. A few comments
below:

1. If SplitFetcherManager becomes PublicEvolving, that also means
SplitFetcher needs to be PublicEvolving, because it is returned by the
protected method SplitFetcherManager.createSplitFetcher().

2. When checking the API of the classes to be marked as PublicEvolving,
there might be a few methods' signatures worth some discussion.

For SplitFetcherManager:
a) Currently removeSplits() methods takes a list of SplitT. I am wondering
if it should be a list of splitIds. SplitT actually contains two parts of
information, the static split Id and some dynamically changing state of the
split (e.g. Kafka consumer offset). The source of truth for the dynamic
state is SourceReaderBase. Currently we are passing in the full source
split with the dynamic state for split removal. But it looks like only
split id is needed for the split removal.
Maybe this is intentional, as sometimes when a SplitReader removes a split,
it also wants to know the dynamic state of the split. If so, we can keep it
as is. But then the question is why
SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead of
SplitT. Should we make them consistent?

For SplitFetcher:
a) The SplitFetcher.pauseOrResumeSplits() method takes collections of
SplitT as arguments. We may want to adjust that according to what we do to
the SplitFetcherManager. The current SplitFetcherManager basically looks up
the SplitT from the fetcher with the split Id, and immediately passes the
SplitT back to the fetcher, which is unnecessary.
b) After supporting split level pause and resume, do we still need split
fetcher level pause and resume? If not, SplitFetcher.pause() and
SplitFetcher.resume() can be removed. In fact, they seem no longer used
anywhere.

Other than the above potential API adjustment before we mark the classes
PublicEvolving, the API looks fine to me.

I think it is good timing for deprecation now. We will mark the impacted
constructors as deprecated in 1.19, and remove them in release of 2.0.

Thanks,

Jiangjie (Becket) Qin



On Thu, Nov 16, 2023 at 8:26 PM Hongshun Wang 
wrote:

> Hi Devs,
>
> I have just modified the content of FLIP-389: Annotate
> SingleThreadFetcherManager as PublicEvolving[1].
>
> Now this Flip mainly do two thing:
>
>1. Annotate SingleThreadFetcherManager as PublicEvolving
>2. Remove all public constructors which use
>FutureCompletingBlockingQueue. This will make many constructors as
>@Depricated.
>
> This may influence many connectors, so I am looking forward to hearing from
> you.
>
>
> Best regards,
> Hongshun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
>
> On Wed, Nov 15, 2023 at 7:57 AM Becket Qin  wrote:
>
> > Hi Hongshun,
> > >
> > >
> > > However, it will be tricky because SplitFetcherManager includes  > SplitT
> > > extends SourceSplit>, while FutureCompletingBlockingQueue includes .
> > > This means that SplitFetcherManager would have to be modified to  > > SplitT extends SourceSplit>, which would affect the compatibility of
> the
> > > SplitFetcherManager class. I'm afraid this change will influence other
> > > sources.
> >
> > Although the FutureCompletingBlockingQueue class itself has a template
> > class . In the SourceReaderBase and SplitFetcherManager, this  is
> > actually RecordsWithSplitIds. So it looks like we can just let
> > SplitFetcherManager.poll() return a RecordsWithSplitIds.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Nov 14, 2023 at 8:11 PM Hongshun Wang 
> > wrote:
> >
> > > Hi Becket,
> > >   I agree with you and try to modify this Flip[1], which include
> > these
> > > changes:
> > >
> > >1. Mark constructor of SingleThreadMultiplexSourceReaderBase as
> > >@Depricated
> > >2.
> > >
> > >Mark constructor of SourceReaderBase as *@Depricated* and provide a
> > new
> > >constructor without
> > >
> > >FutureCompletingBlockingQueue
> > >3.
> > >
> > >Mark constructor of SplitFetcherManager
> andSingleThreadFetcherManager
> > >as  *@Depricated* and provide a new constructor
> > >without FutureCompletingBlockingQueue. Mark SplitFetcherManager
> > >andSingleThreadFetcherManager as *@PublicEvolving*
> > >4.
> > >
> > >SplitFetcherManager provides  wrapper methods for
> > >FutureCompletingBlockingQueue  to replace its usage in
> > SourceReaderBase.
> > >Then we can use FutureCompletingBlockingQueue only in
> > >SplitFetcherManager.
> > >
> > > However, it will be tricky because SplitFetcherManager includes  > SplitT
> > > extends SourceSplit>, while FutureCompletingBlockingQueue includes .
> > > This means that SplitFetcherManager would have to be modified to  > > SplitT extends SourceSplit>, which would affect the compatibility of
> the
> > > SplitFetcherManager class. I'm afraid this change will influence other
> > > sources.
> > >
> > >
> > >
> > > Looking forward 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-16 Thread Hongshun Wang
Hi Devs,

I have just modified the content of FLIP-389: Annotate
SingleThreadFetcherManager as PublicEvolving[1].

Now this Flip mainly do two thing:

   1. Annotate SingleThreadFetcherManager as PublicEvolving
   2. Remove all public constructors which use
   FutureCompletingBlockingQueue. This will make many constructors as
   @Depricated.

This may influence many connectors, so I am looking forward to hearing from
you.


Best regards,
Hongshun

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

On Wed, Nov 15, 2023 at 7:57 AM Becket Qin  wrote:

> Hi Hongshun,
> >
> >
> > However, it will be tricky because SplitFetcherManager includes  SplitT
> > extends SourceSplit>, while FutureCompletingBlockingQueue includes .
> > This means that SplitFetcherManager would have to be modified to  > SplitT extends SourceSplit>, which would affect the compatibility of the
> > SplitFetcherManager class. I'm afraid this change will influence other
> > sources.
>
> Although the FutureCompletingBlockingQueue class itself has a template
> class . In the SourceReaderBase and SplitFetcherManager, this  is
> actually RecordsWithSplitIds. So it looks like we can just let
> SplitFetcherManager.poll() return a RecordsWithSplitIds.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Nov 14, 2023 at 8:11 PM Hongshun Wang 
> wrote:
>
> > Hi Becket,
> >   I agree with you and try to modify this Flip[1], which include
> these
> > changes:
> >
> >1. Mark constructor of SingleThreadMultiplexSourceReaderBase as
> >@Depricated
> >2.
> >
> >Mark constructor of SourceReaderBase as *@Depricated* and provide a
> new
> >constructor without
> >
> >FutureCompletingBlockingQueue
> >3.
> >
> >Mark constructor of SplitFetcherManager andSingleThreadFetcherManager
> >as  *@Depricated* and provide a new constructor
> >without FutureCompletingBlockingQueue. Mark SplitFetcherManager
> >andSingleThreadFetcherManager as *@PublicEvolving*
> >4.
> >
> >SplitFetcherManager provides  wrapper methods for
> >FutureCompletingBlockingQueue  to replace its usage in
> SourceReaderBase.
> >Then we can use FutureCompletingBlockingQueue only in
> >SplitFetcherManager.
> >
> > However, it will be tricky because SplitFetcherManager includes  SplitT
> > extends SourceSplit>, while FutureCompletingBlockingQueue includes .
> > This means that SplitFetcherManager would have to be modified to  > SplitT extends SourceSplit>, which would affect the compatibility of the
> > SplitFetcherManager class. I'm afraid this change will influence other
> > sources.
> >
> >
> >
> > Looking forward to hearing from you.
> >
> > Best regards,
> > Hongshun
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> >
> > On Sat, Nov 11, 2023 at 10:55 AM Becket Qin 
> wrote:
> >
> > > Hi Hongshun and Martijn,
> > >
> > > Sorry for the late reply as I was on travel and still catching up with
> > the
> > > emails. Please allow me to provide more context.
> > >
> > > 1. The original design of SplitFetcherManager and its subclasses was to
> > > make them public to the Source developers. The goal is to let us take
> > care
> > > of the threading model, while the Source developers can just focus on
> the
> > > SplitReader implementation. Therefore, I think making
> > SplitFetcherManater /
> > > SingleThreadFetcherManager public aligns with the original design. That
> > is
> > > also why these classes are exposed in the constructor of
> > SourceReaderBase.
> > >
> > > 2. For FutureCompletingBlockingQueue, as a hindsight, it might be
> better
> > to
> > > not expose it to the Source developers. They are unlikely to use it
> > > anywhere other than just constructing it. The reason that
> > > FutureCompletingBlockingQueue is currently exposed in the
> > SourceReaderBase
> > > constructor is because both the SplitFetcherManager and
> SourceReaderBase
> > > need it. One way to hide the FutureCompletingBlockingQueue from the
> > public
> > > API is to make SplitFetcherManager the only owner class of the queue,
> and
> > > expose some of its methods via SplitFetcherManager. This way, the
> > > SourceReaderBase can invoke the methods via SplitFetcherManager. I
> > believe
> > > this also makes the code slightly cleaner.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Fri, Nov 10, 2023 at 12:28 PM Hongshun Wang <
> loserwang1...@gmail.com>
> > > wrote:
> > >
> > > > @Martijn, I agree with you.
> > > >
> > > >
> > > > I also have two questions at the beginning:
> > > >
> > > >- Why is an Internal class
> > > >exposed as a constructor param of a Public class?
> > > >- Should these classes be exposed as public?
> > > >
> > > > For the first question,  I noticed that before the original Jira[1] ,
> > > > all these classes missed the annotate , so it was not abnormal that
> > > > FutureCompletingBlockingQueue and SingleThreadFetcherManager were
> > > > 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-14 Thread Becket Qin
Hi Hongshun,
>
>
> However, it will be tricky because SplitFetcherManager includes  extends SourceSplit>, while FutureCompletingBlockingQueue includes .
> This means that SplitFetcherManager would have to be modified to  SplitT extends SourceSplit>, which would affect the compatibility of the
> SplitFetcherManager class. I'm afraid this change will influence other
> sources.

Although the FutureCompletingBlockingQueue class itself has a template
class . In the SourceReaderBase and SplitFetcherManager, this  is
actually RecordsWithSplitIds. So it looks like we can just let
SplitFetcherManager.poll() return a RecordsWithSplitIds.

Thanks,

Jiangjie (Becket) Qin

On Tue, Nov 14, 2023 at 8:11 PM Hongshun Wang 
wrote:

> Hi Becket,
>   I agree with you and try to modify this Flip[1], which include these
> changes:
>
>1. Mark constructor of SingleThreadMultiplexSourceReaderBase as
>@Depricated
>2.
>
>Mark constructor of SourceReaderBase as *@Depricated* and provide a new
>constructor without
>
>FutureCompletingBlockingQueue
>3.
>
>Mark constructor of SplitFetcherManager andSingleThreadFetcherManager
>as  *@Depricated* and provide a new constructor
>without FutureCompletingBlockingQueue. Mark SplitFetcherManager
>andSingleThreadFetcherManager as *@PublicEvolving*
>4.
>
>SplitFetcherManager provides  wrapper methods for
>FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.
>Then we can use FutureCompletingBlockingQueue only in
>SplitFetcherManager.
>
> However, it will be tricky because SplitFetcherManager includes  extends SourceSplit>, while FutureCompletingBlockingQueue includes .
> This means that SplitFetcherManager would have to be modified to  SplitT extends SourceSplit>, which would affect the compatibility of the
> SplitFetcherManager class. I'm afraid this change will influence other
> sources.
>
>
>
> Looking forward to hearing from you.
>
> Best regards,
> Hongshun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
>
> On Sat, Nov 11, 2023 at 10:55 AM Becket Qin  wrote:
>
> > Hi Hongshun and Martijn,
> >
> > Sorry for the late reply as I was on travel and still catching up with
> the
> > emails. Please allow me to provide more context.
> >
> > 1. The original design of SplitFetcherManager and its subclasses was to
> > make them public to the Source developers. The goal is to let us take
> care
> > of the threading model, while the Source developers can just focus on the
> > SplitReader implementation. Therefore, I think making
> SplitFetcherManater /
> > SingleThreadFetcherManager public aligns with the original design. That
> is
> > also why these classes are exposed in the constructor of
> SourceReaderBase.
> >
> > 2. For FutureCompletingBlockingQueue, as a hindsight, it might be better
> to
> > not expose it to the Source developers. They are unlikely to use it
> > anywhere other than just constructing it. The reason that
> > FutureCompletingBlockingQueue is currently exposed in the
> SourceReaderBase
> > constructor is because both the SplitFetcherManager and SourceReaderBase
> > need it. One way to hide the FutureCompletingBlockingQueue from the
> public
> > API is to make SplitFetcherManager the only owner class of the queue, and
> > expose some of its methods via SplitFetcherManager. This way, the
> > SourceReaderBase can invoke the methods via SplitFetcherManager. I
> believe
> > this also makes the code slightly cleaner.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Nov 10, 2023 at 12:28 PM Hongshun Wang 
> > wrote:
> >
> > > @Martijn, I agree with you.
> > >
> > >
> > > I also have two questions at the beginning:
> > >
> > >- Why is an Internal class
> > >exposed as a constructor param of a Public class?
> > >- Should these classes be exposed as public?
> > >
> > > For the first question,  I noticed that before the original Jira[1] ,
> > > all these classes missed the annotate , so it was not abnormal that
> > > FutureCompletingBlockingQueue and SingleThreadFetcherManager were
> > > constructor params of SingleThreadMultiplexSourceReaderBase.
> > >  However,
> > > this jira marked FutureCompletingBlockingQueue and
> > > SingleThreadFetcherManager as Internal, while marked
> > > SingleThreadMultiplexSourceReaderBase as Public. It's a good choice,
> > > but also forget that FutureCompletingBlockingQueue and
> > > SingleThreadFetcherManager have already been  exposed by
> > > SingleThreadMultiplexSourceReaderBase.
> > >  Thus, this problem occurs because we didn't
> > > clearly define the boundaries at the origin design. We should pay more
> > > attention to it when creating a new class.
> > >
> > >
> > > For the second question, I think at least SplitFetcherManager
> > > should be Public. There are few reasons:
> > >
> > >-  Connector developers want to decide their own
> > >thread mode. For example, Whether to recycle fetchers by 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-14 Thread Hongshun Wang
Hi Becket,
  I agree with you and try to modify this Flip[1], which include these
changes:

   1. Mark constructor of SingleThreadMultiplexSourceReaderBase as
   @Depricated
   2.

   Mark constructor of SourceReaderBase as *@Depricated* and provide a new
   constructor without

   FutureCompletingBlockingQueue
   3.

   Mark constructor of SplitFetcherManager andSingleThreadFetcherManager
   as  *@Depricated* and provide a new constructor
   without FutureCompletingBlockingQueue. Mark SplitFetcherManager
   andSingleThreadFetcherManager as *@PublicEvolving*
   4.

   SplitFetcherManager provides  wrapper methods for
   FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.
   Then we can use FutureCompletingBlockingQueue only in
   SplitFetcherManager.

However, it will be tricky because SplitFetcherManager includes , while FutureCompletingBlockingQueue includes .
This means that SplitFetcherManager would have to be modified to , which would affect the compatibility of the
SplitFetcherManager class. I'm afraid this change will influence other
sources.



Looking forward to hearing from you.

Best regards,
Hongshun

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

On Sat, Nov 11, 2023 at 10:55 AM Becket Qin  wrote:

> Hi Hongshun and Martijn,
>
> Sorry for the late reply as I was on travel and still catching up with the
> emails. Please allow me to provide more context.
>
> 1. The original design of SplitFetcherManager and its subclasses was to
> make them public to the Source developers. The goal is to let us take care
> of the threading model, while the Source developers can just focus on the
> SplitReader implementation. Therefore, I think making SplitFetcherManater /
> SingleThreadFetcherManager public aligns with the original design. That is
> also why these classes are exposed in the constructor of SourceReaderBase.
>
> 2. For FutureCompletingBlockingQueue, as a hindsight, it might be better to
> not expose it to the Source developers. They are unlikely to use it
> anywhere other than just constructing it. The reason that
> FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase
> constructor is because both the SplitFetcherManager and SourceReaderBase
> need it. One way to hide the FutureCompletingBlockingQueue from the public
> API is to make SplitFetcherManager the only owner class of the queue, and
> expose some of its methods via SplitFetcherManager. This way, the
> SourceReaderBase can invoke the methods via SplitFetcherManager. I believe
> this also makes the code slightly cleaner.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 10, 2023 at 12:28 PM Hongshun Wang 
> wrote:
>
> > @Martijn, I agree with you.
> >
> >
> > I also have two questions at the beginning:
> >
> >- Why is an Internal class
> >exposed as a constructor param of a Public class?
> >- Should these classes be exposed as public?
> >
> > For the first question,  I noticed that before the original Jira[1] ,
> > all these classes missed the annotate , so it was not abnormal that
> > FutureCompletingBlockingQueue and SingleThreadFetcherManager were
> > constructor params of SingleThreadMultiplexSourceReaderBase.
> >  However,
> > this jira marked FutureCompletingBlockingQueue and
> > SingleThreadFetcherManager as Internal, while marked
> > SingleThreadMultiplexSourceReaderBase as Public. It's a good choice,
> > but also forget that FutureCompletingBlockingQueue and
> > SingleThreadFetcherManager have already been  exposed by
> > SingleThreadMultiplexSourceReaderBase.
> >  Thus, this problem occurs because we didn't
> > clearly define the boundaries at the origin design. We should pay more
> > attention to it when creating a new class.
> >
> >
> > For the second question, I think at least SplitFetcherManager
> > should be Public. There are few reasons:
> >
> >-  Connector developers want to decide their own
> >thread mode. For example, Whether to recycle fetchers by overriding
> >SplitFetcherManager#maybeShutdownFinishedFetchers
> >when idle. Sometimes, developers want SplitFetcherManager react as a
> >FixedThreadPool, because
> >each time a thread is recycled then recreated, the context
> > resources need to be rebuilt. I met a related issue in flink cdc[2].
> >-
> >KafkaSourceFetcherManager[3] also  extends
> > SingleThreadFetcherManager to commitOffsets. But now kafka souce is
> > not in Flink repository, so it's not allowed any more.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-22358
> >
> > [2]
> >
> >
> https://github.com/ververica/flink-cdc-connectors/pull/2571#issuecomment-1797585418
> >
> > [3]
> >
> >
> https://github.com/apache/flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L52
> >
> > Looking forward to hearing from you.
> >
> > Best regards,
> > 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-10 Thread Becket Qin
Hi Hongshun and Martijn,

Sorry for the late reply as I was on travel and still catching up with the
emails. Please allow me to provide more context.

1. The original design of SplitFetcherManager and its subclasses was to
make them public to the Source developers. The goal is to let us take care
of the threading model, while the Source developers can just focus on the
SplitReader implementation. Therefore, I think making SplitFetcherManater /
SingleThreadFetcherManager public aligns with the original design. That is
also why these classes are exposed in the constructor of SourceReaderBase.

2. For FutureCompletingBlockingQueue, as a hindsight, it might be better to
not expose it to the Source developers. They are unlikely to use it
anywhere other than just constructing it. The reason that
FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase
constructor is because both the SplitFetcherManager and SourceReaderBase
need it. One way to hide the FutureCompletingBlockingQueue from the public
API is to make SplitFetcherManager the only owner class of the queue, and
expose some of its methods via SplitFetcherManager. This way, the
SourceReaderBase can invoke the methods via SplitFetcherManager. I believe
this also makes the code slightly cleaner.

Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 10, 2023 at 12:28 PM Hongshun Wang 
wrote:

> @Martijn, I agree with you.
>
>
> I also have two questions at the beginning:
>
>- Why is an Internal class
>exposed as a constructor param of a Public class?
>- Should these classes be exposed as public?
>
> For the first question,  I noticed that before the original Jira[1] ,
> all these classes missed the annotate , so it was not abnormal that
> FutureCompletingBlockingQueue and SingleThreadFetcherManager were
> constructor params of SingleThreadMultiplexSourceReaderBase.
>  However,
> this jira marked FutureCompletingBlockingQueue and
> SingleThreadFetcherManager as Internal, while marked
> SingleThreadMultiplexSourceReaderBase as Public. It's a good choice,
> but also forget that FutureCompletingBlockingQueue and
> SingleThreadFetcherManager have already been  exposed by
> SingleThreadMultiplexSourceReaderBase.
>  Thus, this problem occurs because we didn't
> clearly define the boundaries at the origin design. We should pay more
> attention to it when creating a new class.
>
>
> For the second question, I think at least SplitFetcherManager
> should be Public. There are few reasons:
>
>-  Connector developers want to decide their own
>thread mode. For example, Whether to recycle fetchers by overriding
>SplitFetcherManager#maybeShutdownFinishedFetchers
>when idle. Sometimes, developers want SplitFetcherManager react as a
>FixedThreadPool, because
>each time a thread is recycled then recreated, the context
> resources need to be rebuilt. I met a related issue in flink cdc[2].
>-
>KafkaSourceFetcherManager[3] also  extends
> SingleThreadFetcherManager to commitOffsets. But now kafka souce is
> not in Flink repository, so it's not allowed any more.
>
> [1] https://issues.apache.org/jira/browse/FLINK-22358
>
> [2]
>
> https://github.com/ververica/flink-cdc-connectors/pull/2571#issuecomment-1797585418
>
> [3]
>
> https://github.com/apache/flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L52
>
> Looking forward to hearing from you.
>
> Best regards,
> Hongshun
>
> On Thu, Nov 9, 2023 at 11:46 PM Martijn Visser 
> wrote:
>
> > Hi all,
> >
> > I'm looking at the original Jira that introduced these stability
> > designations [1] and I'm just curious if it was intended that these
> > Internal classes would be used directly, or if we just haven't created
> > the right abstractions? The reason for asking is because moving
> > something from Internal to a public designation is an easy fix, but I
> > want to make sure that it's also the right fix. If we are missing good
> > abstractions, then I would rather invest in those.
> >
> > Best regards,
> >
> > Martijn
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-22358
> >
> > On Wed, Nov 8, 2023 at 12:40 PM Leonard Xu  wrote:
> > >
> > > Thanks Hongshun for starting this discussion.
> > >
> > > +1 from my side.
> > >
> > > IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324 comment[1].
> > >
> > > Best,
> > > Leonard
> > >
> > > [1]
> >
> https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756
> > >
> > >
> > >
> > > > 2023年11月8日 下午5:42,Hongshun Wang  写道:
> > > >
> > > > Hi devs,
> > > >
> > > > I would like to start a discussion on FLIP-389: Annotate
> > > > SingleThreadFetcherManager and FutureCompletingBlockingQueue as
> > > > PublicEvolving.[
> > > > <
> >
> 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-09 Thread Hongshun Wang
@Martijn, I agree with you.


I also have two questions at the beginning:

   - Why is an Internal class
   exposed as a constructor param of a Public class?
   - Should these classes be exposed as public?

For the first question,  I noticed that before the original Jira[1] ,
all these classes missed the annotate , so it was not abnormal that
FutureCompletingBlockingQueue and SingleThreadFetcherManager were
constructor params of SingleThreadMultiplexSourceReaderBase.
 However,
this jira marked FutureCompletingBlockingQueue and
SingleThreadFetcherManager as Internal, while marked
SingleThreadMultiplexSourceReaderBase as Public. It's a good choice,
but also forget that FutureCompletingBlockingQueue and
SingleThreadFetcherManager have already been  exposed by
SingleThreadMultiplexSourceReaderBase.
 Thus, this problem occurs because we didn't
clearly define the boundaries at the origin design. We should pay more
attention to it when creating a new class.


For the second question, I think at least SplitFetcherManager
should be Public. There are few reasons:

   -  Connector developers want to decide their own
   thread mode. For example, Whether to recycle fetchers by overriding
   SplitFetcherManager#maybeShutdownFinishedFetchers
   when idle. Sometimes, developers want SplitFetcherManager react as a
   FixedThreadPool, because
   each time a thread is recycled then recreated, the context
resources need to be rebuilt. I met a related issue in flink cdc[2].
   -
   KafkaSourceFetcherManager[3] also  extends
SingleThreadFetcherManager to commitOffsets. But now kafka souce is
not in Flink repository, so it's not allowed any more.

[1] https://issues.apache.org/jira/browse/FLINK-22358

[2]
https://github.com/ververica/flink-cdc-connectors/pull/2571#issuecomment-1797585418

[3]
https://github.com/apache/flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L52

Looking forward to hearing from you.

Best regards,
Hongshun

On Thu, Nov 9, 2023 at 11:46 PM Martijn Visser 
wrote:

> Hi all,
>
> I'm looking at the original Jira that introduced these stability
> designations [1] and I'm just curious if it was intended that these
> Internal classes would be used directly, or if we just haven't created
> the right abstractions? The reason for asking is because moving
> something from Internal to a public designation is an easy fix, but I
> want to make sure that it's also the right fix. If we are missing good
> abstractions, then I would rather invest in those.
>
> Best regards,
>
> Martijn
>
> [1] https://issues.apache.org/jira/browse/FLINK-22358
>
> On Wed, Nov 8, 2023 at 12:40 PM Leonard Xu  wrote:
> >
> > Thanks Hongshun for starting this discussion.
> >
> > +1 from my side.
> >
> > IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324 comment[1].
> >
> > Best,
> > Leonard
> >
> > [1]
> https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756
> >
> >
> >
> > > 2023年11月8日 下午5:42,Hongshun Wang  写道:
> > >
> > > Hi devs,
> > >
> > > I would like to start a discussion on FLIP-389: Annotate
> > > SingleThreadFetcherManager and FutureCompletingBlockingQueue as
> > > PublicEvolving.[
> > > <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> >
> > > 1].
> > >
> > > Though the SingleThreadFetcherManager is annotated as Internal, it
> actually
> > > acts as some-degree public API, which is widely used in many connector
> > > projects: flink-cdc-connector
> > > <
> https://github.com/ververica/flink-cdc-connectors/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L93
> >
> > > , flink-connector-mongodb
> > > <
> https://github.com/apache/flink-connector-mongodb/blob/main/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java#L58
> >
> > > and
> > > soon.
> > >
> > > Moreover, even the constructor of SingleThreadMultiplexSourceReaderBase
> > > (which is PublicEvolving) includes the params of
> SingleThreadFetcherManager
> > > and FutureCompletingBlockingQueue.  That means that the
> > > SingleThreadFetcherManager  and FutureCompletingBlockingQueue have
> already
> > > been exposed to users for a long time and are widely used.
> > >
> > > Considering that all source implementations are using them de facto,
> why
> > > not annotate SingleThreadFetcherManager and
> FutureCompletingBlockingQueue
> > > as PublicEvolving so that developers will modify it more carefully to
> avoid
> > > any potential issues.  As shown in FLINK-31324[2], FLINK-28853[3] used
> > > to change the default constructor of SingleThreadFetcherManager.
> However,
> > > it influenced a lot. Finally, the former constructor was added back and
> > > marked as 

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-09 Thread Martijn Visser
Hi all,

I'm looking at the original Jira that introduced these stability
designations [1] and I'm just curious if it was intended that these
Internal classes would be used directly, or if we just haven't created
the right abstractions? The reason for asking is because moving
something from Internal to a public designation is an easy fix, but I
want to make sure that it's also the right fix. If we are missing good
abstractions, then I would rather invest in those.

Best regards,

Martijn

[1] https://issues.apache.org/jira/browse/FLINK-22358

On Wed, Nov 8, 2023 at 12:40 PM Leonard Xu  wrote:
>
> Thanks Hongshun for starting this discussion.
>
> +1 from my side.
>
> IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324 comment[1].
>
> Best,
> Leonard
>
> [1] 
> https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756
>
>
>
> > 2023年11月8日 下午5:42,Hongshun Wang  写道:
> >
> > Hi devs,
> >
> > I would like to start a discussion on FLIP-389: Annotate
> > SingleThreadFetcherManager and FutureCompletingBlockingQueue as
> > PublicEvolving.[
> > 
> > 1].
> >
> > Though the SingleThreadFetcherManager is annotated as Internal, it actually
> > acts as some-degree public API, which is widely used in many connector
> > projects: flink-cdc-connector
> > 
> > , flink-connector-mongodb
> > 
> > and
> > soon.
> >
> > Moreover, even the constructor of SingleThreadMultiplexSourceReaderBase
> > (which is PublicEvolving) includes the params of SingleThreadFetcherManager
> > and FutureCompletingBlockingQueue.  That means that the
> > SingleThreadFetcherManager  and FutureCompletingBlockingQueue have already
> > been exposed to users for a long time and are widely used.
> >
> > Considering that all source implementations are using them de facto, why
> > not annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue
> > as PublicEvolving so that developers will modify it more carefully to avoid
> > any potential issues.  As shown in FLINK-31324[2], FLINK-28853[3] used
> > to change the default constructor of SingleThreadFetcherManager. However,
> > it influenced a lot. Finally, the former constructor was added back and
> > marked as Deprecated。
> >
> > In conclusion, the goal of this FLIP is to annotate
> > SingleThreadFetcherManager(includes its parent class) and
> > FutureCompletingBlockingQueue as PublicEvolving.
> >
> > Looking forward to hearing from you.
> >
> >
> > [1]
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> >
> > [2] https://issues.apache.org/jira/browse/FLINK-31324
> >
> > [3] https://issues.apache.org/jira/browse/FLINK-28853
>


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-08 Thread Leonard Xu
Thanks Hongshun for starting this discussion. 

+1 from my side.

IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324 comment[1].

Best,
Leonard

[1] 
https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756



> 2023年11月8日 下午5:42,Hongshun Wang  写道:
> 
> Hi devs,
> 
> I would like to start a discussion on FLIP-389: Annotate
> SingleThreadFetcherManager and FutureCompletingBlockingQueue as
> PublicEvolving.[
> 
> 1].
> 
> Though the SingleThreadFetcherManager is annotated as Internal, it actually
> acts as some-degree public API, which is widely used in many connector
> projects: flink-cdc-connector
> 
> , flink-connector-mongodb
> 
> and
> soon.
> 
> Moreover, even the constructor of SingleThreadMultiplexSourceReaderBase
> (which is PublicEvolving) includes the params of SingleThreadFetcherManager
> and FutureCompletingBlockingQueue.  That means that the
> SingleThreadFetcherManager  and FutureCompletingBlockingQueue have already
> been exposed to users for a long time and are widely used.
> 
> Considering that all source implementations are using them de facto, why
> not annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue
> as PublicEvolving so that developers will modify it more carefully to avoid
> any potential issues.  As shown in FLINK-31324[2], FLINK-28853[3] used
> to change the default constructor of SingleThreadFetcherManager. However,
> it influenced a lot. Finally, the former constructor was added back and
> marked as Deprecated。
> 
> In conclusion, the goal of this FLIP is to annotate
> SingleThreadFetcherManager(includes its parent class) and
> FutureCompletingBlockingQueue as PublicEvolving.
> 
> Looking forward to hearing from you.
> 
> 
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> 
> [2] https://issues.apache.org/jira/browse/FLINK-31324
> 
> [3] https://issues.apache.org/jira/browse/FLINK-28853