[DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-08 Thread Hongshun Wang
Hi devs,

I would like to start a discussion on FLIP-389: Annotate
SingleThreadFetcherManager and FutureCompletingBlockingQueue as
PublicEvolving.[

1].

Though the SingleThreadFetcherManager is annotated as Internal, it actually
acts as some-degree public API, which is widely used in many connector
projects: flink-cdc-connector

, flink-connector-mongodb

and
soon.

Moreover, even the constructor of SingleThreadMultiplexSourceReaderBase
 (which is PublicEvolving) includes the params of SingleThreadFetcherManager
 and FutureCompletingBlockingQueue.  That means that the
SingleThreadFetcherManager  and FutureCompletingBlockingQueue have already
been exposed to users for a long time and are widely used.

Considering that all source implementations are using them de facto, why
not annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue
as PublicEvolving so that developers will modify it more carefully to avoid
any potential issues.  As shown in FLINK-31324[2], FLINK-28853[3] used
to change the default constructor of SingleThreadFetcherManager. However,
it influenced a lot. Finally, the former constructor was added back and
marked as Deprecated。

In conclusion, the goal of this FLIP is to annotate
SingleThreadFetcherManager(includes its parent class) and
FutureCompletingBlockingQueue as PublicEvolving.

Looking forward to hearing from you.


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

[2] https://issues.apache.org/jira/browse/FLINK-31324

[3] https://issues.apache.org/jira/browse/FLINK-28853


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-09 Thread Hongshun Wang
@Martijn, I agree with you.


I also have two questions at the beginning:

   - Why is an Internal class
   exposed as a constructor param of a Public class?
   - Should these classes be exposed as public?

For the first question,  I noticed that before the original Jira[1] ,
all these classes missed the annotate , so it was not abnormal that
FutureCompletingBlockingQueue and SingleThreadFetcherManager were
constructor params of SingleThreadMultiplexSourceReaderBase.
 However,
this jira marked FutureCompletingBlockingQueue and
SingleThreadFetcherManager as Internal, while marked
SingleThreadMultiplexSourceReaderBase as Public. It's a good choice,
but also forget that FutureCompletingBlockingQueue and
SingleThreadFetcherManager have already been  exposed by
SingleThreadMultiplexSourceReaderBase.
 Thus, this problem occurs because we didn't
clearly define the boundaries at the origin design. We should pay more
attention to it when creating a new class.


For the second question, I think at least SplitFetcherManager
should be Public. There are few reasons:

   -  Connector developers want to decide their own
   thread mode. For example, Whether to recycle fetchers by overriding
   SplitFetcherManager#maybeShutdownFinishedFetchers
   when idle. Sometimes, developers want SplitFetcherManager react as a
   FixedThreadPool, because
   each time a thread is recycled then recreated, the context
resources need to be rebuilt. I met a related issue in flink cdc[2].
   -
   KafkaSourceFetcherManager[3] also  extends
SingleThreadFetcherManager to commitOffsets. But now kafka souce is
not in Flink repository, so it's not allowed any more.

[1] https://issues.apache.org/jira/browse/FLINK-22358

[2]
https://github.com/ververica/flink-cdc-connectors/pull/2571#issuecomment-1797585418

[3]
https://github.com/apache/flink-connector-kafka/blob/979791c4c71e944c16c51419cf9a84aa1f8fea4c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L52

Looking forward to hearing from you.

Best regards,
Hongshun

On Thu, Nov 9, 2023 at 11:46 PM Martijn Visser 
wrote:

> Hi all,
>
> I'm looking at the original Jira that introduced these stability
> designations [1] and I'm just curious if it was intended that these
> Internal classes would be used directly, or if we just haven't created
> the right abstractions? The reason for asking is because moving
> something from Internal to a public designation is an easy fix, but I
> want to make sure that it's also the right fix. If we are missing good
> abstractions, then I would rather invest in those.
>
> Best regards,
>
> Martijn
>
> [1] https://issues.apache.org/jira/browse/FLINK-22358
>
> On Wed, Nov 8, 2023 at 12:40 PM Leonard Xu  wrote:
> >
> > Thanks Hongshun for starting this discussion.
> >
> > +1 from my side.
> >
> > IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324 comment[1].
> >
> > Best,
> > Leonard
> >
> > [1]
> https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756
> >
> >
> >
> > > 2023年11月8日 下午5:42,Hongshun Wang  写道:
> > >
> > > Hi devs,
> > >
> > > I would like to start a discussion on FLIP-389: Annotate
> > > SingleThreadFetcherManager and FutureCompletingBlockingQueue as
> > > PublicEvolving.[
> > > <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> >
> > > 1].
> > >
> > > Though the SingleThreadFetcherManager is annotated as Internal, it
> actually
> > > acts as some-degree public API, which is widely used in many connector
> > > projects: flink-cdc-connector
> > > <
> https://github.com/ververica/flink-cdc-connectors/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L93
> >
> > > , flink-connector-mongodb
> > > <
> https://github.com/apache/flink-connector-mongodb/blob/main/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java#L58
> >
> > > and
> > > soon.
> > >
> > > Moreover, even the constructor of SingleThreadMultiplexSourceReaderBase
> > > (which is PublicEvolving) includes the params of
> SingleThreadFetcherManager
> > > and FutureCompletingBlockingQueue.  That means that the
> > > SingleThreadFetcherManager  and FutureCompletingBlockingQueue have
> already
> > > been exposed to users for a long time and are widely used.
> > >
> > > Considering that all source implement

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-14 Thread Hongshun Wang
Hi Becket,
  I agree with you and try to modify this Flip[1], which include these
changes:

   1. Mark constructor of SingleThreadMultiplexSourceReaderBase as
   @Depricated
   2.

   Mark constructor of SourceReaderBase as *@Depricated* and provide a new
   constructor without

   FutureCompletingBlockingQueue
   3.

   Mark constructor of SplitFetcherManager andSingleThreadFetcherManager
   as  *@Depricated* and provide a new constructor
   without FutureCompletingBlockingQueue. Mark SplitFetcherManager
   andSingleThreadFetcherManager as *@PublicEvolving*
   4.

   SplitFetcherManager provides  wrapper methods for
   FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.
   Then we can use FutureCompletingBlockingQueue only in
   SplitFetcherManager.

However, it will be tricky because SplitFetcherManager includes , while FutureCompletingBlockingQueue includes .
This means that SplitFetcherManager would have to be modified to , which would affect the compatibility of the
SplitFetcherManager class. I'm afraid this change will influence other
sources.



Looking forward to hearing from you.

Best regards,
Hongshun

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

On Sat, Nov 11, 2023 at 10:55 AM Becket Qin  wrote:

> Hi Hongshun and Martijn,
>
> Sorry for the late reply as I was on travel and still catching up with the
> emails. Please allow me to provide more context.
>
> 1. The original design of SplitFetcherManager and its subclasses was to
> make them public to the Source developers. The goal is to let us take care
> of the threading model, while the Source developers can just focus on the
> SplitReader implementation. Therefore, I think making SplitFetcherManater /
> SingleThreadFetcherManager public aligns with the original design. That is
> also why these classes are exposed in the constructor of SourceReaderBase.
>
> 2. For FutureCompletingBlockingQueue, as a hindsight, it might be better to
> not expose it to the Source developers. They are unlikely to use it
> anywhere other than just constructing it. The reason that
> FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase
> constructor is because both the SplitFetcherManager and SourceReaderBase
> need it. One way to hide the FutureCompletingBlockingQueue from the public
> API is to make SplitFetcherManager the only owner class of the queue, and
> expose some of its methods via SplitFetcherManager. This way, the
> SourceReaderBase can invoke the methods via SplitFetcherManager. I believe
> this also makes the code slightly cleaner.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 10, 2023 at 12:28 PM Hongshun Wang 
> wrote:
>
> > @Martijn, I agree with you.
> >
> >
> > I also have two questions at the beginning:
> >
> >- Why is an Internal class
> >exposed as a constructor param of a Public class?
> >- Should these classes be exposed as public?
> >
> > For the first question,  I noticed that before the original Jira[1] ,
> > all these classes missed the annotate , so it was not abnormal that
> > FutureCompletingBlockingQueue and SingleThreadFetcherManager were
> > constructor params of SingleThreadMultiplexSourceReaderBase.
> >  However,
> > this jira marked FutureCompletingBlockingQueue and
> > SingleThreadFetcherManager as Internal, while marked
> > SingleThreadMultiplexSourceReaderBase as Public. It's a good choice,
> > but also forget that FutureCompletingBlockingQueue and
> > SingleThreadFetcherManager have already been  exposed by
> > SingleThreadMultiplexSourceReaderBase.
> >  Thus, this problem occurs because we didn't
> > clearly define the boundaries at the origin design. We should pay more
> > attention to it when creating a new class.
> >
> >
> > For the second question, I think at least SplitFetcherManager
> > should be Public. There are few reasons:
> >
> >-  Connector developers want to decide their own
> >thread mode. For example, Whether to recycle fetchers by overriding
> >SplitFetcherManager#maybeShutdownFinishedFetchers
> >when idle. Sometimes, developers want SplitFetcherManager react as a
> >FixedThreadPool, because
> >each time a thread is recycled then recreated, the context
> > resources need to be rebuilt. I met a related issue in flink cdc[2].
> >-
> >KafkaSourceFetcherManager[3] also  extends
> > SingleThreadFetcherManager to commitOffsets. But now kafka souce is
> > not in Flink repository, so it's not allowed any more.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-22358
> >
> > [2]
> >
> >
> https://

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-16 Thread Hongshun Wang
Hi Devs,

I have just modified the content of FLIP-389: Annotate
SingleThreadFetcherManager as PublicEvolving[1].

Now this Flip mainly do two thing:

   1. Annotate SingleThreadFetcherManager as PublicEvolving
   2. Remove all public constructors which use
   FutureCompletingBlockingQueue. This will make many constructors as
   @Depricated.

This may influence many connectors, so I am looking forward to hearing from
you.


Best regards,
Hongshun

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

On Wed, Nov 15, 2023 at 7:57 AM Becket Qin  wrote:

> Hi Hongshun,
> >
> >
> > However, it will be tricky because SplitFetcherManager includes  SplitT
> > extends SourceSplit>, while FutureCompletingBlockingQueue includes .
> > This means that SplitFetcherManager would have to be modified to  > SplitT extends SourceSplit>, which would affect the compatibility of the
> > SplitFetcherManager class. I'm afraid this change will influence other
> > sources.
>
> Although the FutureCompletingBlockingQueue class itself has a template
> class . In the SourceReaderBase and SplitFetcherManager, this  is
> actually RecordsWithSplitIds. So it looks like we can just let
> SplitFetcherManager.poll() return a RecordsWithSplitIds.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Nov 14, 2023 at 8:11 PM Hongshun Wang 
> wrote:
>
> > Hi Becket,
> >   I agree with you and try to modify this Flip[1], which include
> these
> > changes:
> >
> >1. Mark constructor of SingleThreadMultiplexSourceReaderBase as
> >@Depricated
> >2.
> >
> >Mark constructor of SourceReaderBase as *@Depricated* and provide a
> new
> >constructor without
> >
> >FutureCompletingBlockingQueue
> >3.
> >
> >Mark constructor of SplitFetcherManager andSingleThreadFetcherManager
> >as  *@Depricated* and provide a new constructor
> >without FutureCompletingBlockingQueue. Mark SplitFetcherManager
> >andSingleThreadFetcherManager as *@PublicEvolving*
> >4.
> >
> >SplitFetcherManager provides  wrapper methods for
> >FutureCompletingBlockingQueue  to replace its usage in
> SourceReaderBase.
> >Then we can use FutureCompletingBlockingQueue only in
> >SplitFetcherManager.
> >
> > However, it will be tricky because SplitFetcherManager includes  SplitT
> > extends SourceSplit>, while FutureCompletingBlockingQueue includes .
> > This means that SplitFetcherManager would have to be modified to  > SplitT extends SourceSplit>, which would affect the compatibility of the
> > SplitFetcherManager class. I'm afraid this change will influence other
> > sources.
> >
> >
> >
> > Looking forward to hearing from you.
> >
> > Best regards,
> > Hongshun
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> >
> > On Sat, Nov 11, 2023 at 10:55 AM Becket Qin 
> wrote:
> >
> > > Hi Hongshun and Martijn,
> > >
> > > Sorry for the late reply as I was on travel and still catching up with
> > the
> > > emails. Please allow me to provide more context.
> > >
> > > 1. The original design of SplitFetcherManager and its subclasses was to
> > > make them public to the Source developers. The goal is to let us take
> > care
> > > of the threading model, while the Source developers can just focus on
> the
> > > SplitReader implementation. Therefore, I think making
> > SplitFetcherManater /
> > > SingleThreadFetcherManager public aligns with the original design. That
> > is
> > > also why these classes are exposed in the constructor of
> > SourceReaderBase.
> > >
> > > 2. For FutureCompletingBlockingQueue, as a hindsight, it might be
> better
> > to
> > > not expose it to the Source developers. They are unlikely to use it
> > > anywhere other than just constructing it. The reason that
> > > FutureCompletingBlockingQueue is currently exposed in the
> > SourceReaderBase
> > > constructor is because both the SplitFetcherManager and
> SourceReaderBase
> > > need it. One way to hide the FutureCompletingBlockingQueue from the
> > public
> > > API is to make SplitFetcherManager the only owner class of the queue,
> and
> > > expose some of its methods via SplitFetcherManager. This way, the
> > > SourceReaderBase can invoke the methods via SplitFetcherManager. I
> > believe
> > > this also makes the code slightly cleaner.
> > >

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-17 Thread Hongshun Wang
Hi, Jiangjie(Becket) ,
Thank you for your advice. I have learned a lot.

If SplitFetcherManager becomes PublicEvolving, that also means
> SplitFetcher needs to be PublicEvolving, because it is returned by the
> protected method SplitFetcherManager.createSplitFetcher().

I completely agree with you. However, if SplitFetcher becomes
PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
because it is returned by the public method SplitFetcher#enqueueTask.
Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
as a
constructor parameter, which is not allowed
now. Therefore, I propose changing SplitFetcher to a public Interface
and moving its implementation details to an implement class (e.g.,
SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
marked as internal and managed by SplitFetcherManager,
and put data in the queue.
Subclasses of SplitFetcherManager can only use the SplitFetcher interface,
also ensuring that the current subclasses are not affected.



The current SplitFetcherManager basically looks up
> the SplitT from the fetcher with the split Id, and immediately passes the
> SplitT back to the fetcher, which is unnecessary.

I inferred that this is because SplitReader#pauseOrResumeSplits
requires SplitT instead of SpiltId.  Perhaps some external source
requires more information to pause. However, SplitReader doesn't store
all its split data, while SplitFetcherManager saves them.
CC, @Dawid Wysakowicz



 If not, SplitFetcher.pause() and
> SplitFetcher.resume() can be removed. In fact, they seem no longer used
> anywhere.

It seems no use any more. CC, @Arvid Heise



Thanks,
Hongshun Wang

On Fri, Nov 17, 2023 at 11:42 AM Becket Qin  wrote:

> Hi Hongshun,
>
> Thanks for updating the FLIP. I think that makes sense. A few comments
> below:
>
> 1. If SplitFetcherManager becomes PublicEvolving, that also means
> SplitFetcher needs to be PublicEvolving, because it is returned by the
> protected method SplitFetcherManager.createSplitFetcher().
>
> 2. When checking the API of the classes to be marked as PublicEvolving,
> there might be a few methods' signatures worth some discussion.
>
> For SplitFetcherManager:
> a) Currently removeSplits() methods takes a list of SplitT. I am wondering
> if it should be a list of splitIds. SplitT actually contains two parts of
> information, the static split Id and some dynamically changing state of the
> split (e.g. Kafka consumer offset). The source of truth for the dynamic
> state is SourceReaderBase. Currently we are passing in the full source
> split with the dynamic state for split removal. But it looks like only
> split id is needed for the split removal.
> Maybe this is intentional, as sometimes when a SplitReader removes a split,
> it also wants to know the dynamic state of the split. If so, we can keep it
> as is. But then the question is why
> SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead of
> SplitT. Should we make them consistent?
>
> For SplitFetcher:
> a) The SplitFetcher.pauseOrResumeSplits() method takes collections of
> SplitT as arguments. We may want to adjust that according to what we do to
> the SplitFetcherManager. The current SplitFetcherManager basically looks up
> the SplitT from the fetcher with the split Id, and immediately passes the
> SplitT back to the fetcher, which is unnecessary.
> b) After supporting split level pause and resume, do we still need split
> fetcher level pause and resume? If not, SplitFetcher.pause() and
> SplitFetcher.resume() can be removed. In fact, they seem no longer used
> anywhere.
>
> Other than the above potential API adjustment before we mark the classes
> PublicEvolving, the API looks fine to me.
>
> I think it is good timing for deprecation now. We will mark the impacted
> constructors as deprecated in 1.19, and remove them in release of 2.0.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Thu, Nov 16, 2023 at 8:26 PM Hongshun Wang 
> wrote:
>
> > Hi Devs,
> >
> > I have just modified the content of FLIP-389: Annotate
> > SingleThreadFetcherManager as PublicEvolving[1].
> >
> > Now this Flip mainly do two thing:
> >
> >1. Annotate SingleThreadFetcherManager as PublicEvolving
> >2. Remove all public constructors which use
> >FutureCompletingBlockingQueue. This will make many constructors as
> >@Depricated.
> >
> > This may influence many connectors, so I am looking forward to hearing
> from
> > you.
> >
> >
> > Best regards,
> > Hongshun
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> >
> > On Wed, Nov 15, 2023 at 7:57 AM Becket Qin  wrote:
> >

RE: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-03-15 Thread Hongshun Wang
 Hi devs,
I’d like to join this discussion. CC:Qingsheng
As discussed above, new partitions after the first discovery should be
consumed from EARLIEST offset.

However, when KafkaSourceEnumerator restarts after a job failure, it cannot
distinguish between unassigned partitions as first-discovered or new,
because the snapshot state currently only contains assignedPartitions
collection (the assigned partitions). We can solve this by adding a
unAssignedInitialPartitons collection to snapshot state, which represents
the collection of first discovered partitions that have not yet been
assigned. Also, we can combine this two collections into a single
collection if we add status to each item.

Besides , there is also a problem which often occurs in pattern mode to
distinguish between the following two case:

   1. Case1:  The first partition discovery is too slow, before which the
   checkpoint is finished and then job is restarted .At this time, the
   restored unAssignedInitialPartitons is an empty collection, which means
   non-discovery. The next discovery will be treated as the first discovery.
   2. Case2:  The first time the partition is obtained is empty, and new
   partitions can only be obtained after multiple partition discoveries. If a
   restart occurs between this period, the restored
   *unAssignedInitialPartitons* is also an empty collection, which means
   empty-discovery. However, the next discovery should be treated as a new
   discovery.

We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
to snapshot state, which represents whether the first-discovery has been
done.

Also two rejected alternatives :

   1. Change the KafkaSourceEnumerator's snapshotState method to a blocking
   one, which resumes only after the first-discovered partition has been
   successfully assigned to KafkaSourceReader. The advantage of this approach
   is no need to change the snapshot state's variable values. However, if
   first-discovered partitions are not assigned before checkpointing, the
   SourceCoordinator's event-loop thread will be blocked, but partition
   assignment also requires the event-loop thread to execute, which will cause
   thread self-locking.
   2. An alternative to the *firstDiscoveryDone* variable. If we change the
   first discovery method to a synchronous method, we can ensure that Case1
   will never happen. Because when the event-loop thread starts, it first adds
   a discovery event to the blocking queue. When it turns to execute the
   checkpoint event, the partition has already been discovered successfully.
   However, If partition discovery is a heavily time-consuming operation, the
   SourceCoordinator cannot process other event operations during the waiting
   period, such as reader registration. It is a waste.

Best regards,
Hongshun

On 2023/01/13 03:31:20 Qingsheng Ren wrote:
> Hi devs,
>
> I’d like to start a discussion about enabling the dynamic partition
> discovery feature by default in Kafka source. Dynamic partition discovery
> [1] is a useful feature in Kafka source especially under the scenario when
> the consuming Kafka topic scales out, or the source subscribes to multiple
> Kafka topics with a pattern. Users don’t have to restart the Flink job to
> consume messages in the new partition with this feature enabled.
Currently,
> dynamic partition discovery is disabled by default and users have to
> explicitly specify the interval of discovery in order to turn it on.
>
> # Breaking changes
>
> For Kafka table source:
>
> - “scan.topic-partition-discovery.interval” will be set to 30 seconds by
> default.
> - As we need to provide a way for users to disable the feature,
> “scan.topic-partition-discovery.interval” = “0” will be used to turn off
> the discovery. Before this proposal, “0” means to enable partition
> discovery with interval = 0, which is a bit senseless in practice.
> Unfortunately we can't use negative values as the type of this option is
> Duration.
>
> For KafkaSource (DataStream API)
>
> - Dynamic partition discovery in Kafka source will be enabled by default,
> with discovery interval set to 30 seconds.
> - To align with table source, only a positive value for option “
> partition.discovery.interval.ms” could be used to specify the discovery
> interval. Both negative and zero will be interpreted as disabling the
> feature.
>
> # Overhead of partition discovery
>
> Partition discovery is made on KafkaSourceEnumerator, which asynchronously
> fetches topic metadata from Kafka cluster and checks if there’s any new
> topic and partition. This shouldn’t introduce performance issues on the
> Flink side.
>
> On the Kafka broker side, partition discovery makes MetadataRequest to
> Kafka broker for fetching topic infos. Considering Kafka broker has its
> metadata cache and the default request frequency is relatively low (per 30
> seconds), this is not a heavy operation and the performance of the broker
> won’t be affected a lot. It'll also be grea

Re: [DISCUSS] Enabling dynamic partition discovery by default in Kafka source

2023-03-15 Thread Hongshun Wang
Thanks for your advise! I will do this later.


Best, Hongshun

On Wed, Mar 15, 2023 at 5:15 PM Etienne Chauchot 
wrote:

> Hi,
>
> Why not track this in a FLIP and a ticket and link this discussion thread.
>
> My 2 cents
>
> Etienne
>
> Le 15/03/2023 à 10:01, Hongshun Wang a écrit :
> >   Hi devs,
> > I’d like to join this discussion. CC:Qingsheng
> > As discussed above, new partitions after the first discovery should be
> > consumed from EARLIEST offset.
> >
> > However, when KafkaSourceEnumerator restarts after a job failure, it
> cannot
> > distinguish between unassigned partitions as first-discovered or new,
> > because the snapshot state currently only contains assignedPartitions
> > collection (the assigned partitions). We can solve this by adding a
> > unAssignedInitialPartitons collection to snapshot state, which represents
> > the collection of first discovered partitions that have not yet been
> > assigned. Also, we can combine this two collections into a single
> > collection if we add status to each item.
> >
> > Besides , there is also a problem which often occurs in pattern mode to
> > distinguish between the following two case:
> >
> > 1. Case1:  The first partition discovery is too slow, before which
> the
> > checkpoint is finished and then job is restarted .At this time, the
> > restored unAssignedInitialPartitons is an empty collection, which
> means
> > non-discovery. The next discovery will be treated as the first
> discovery.
> > 2. Case2:  The first time the partition is obtained is empty, and new
> > partitions can only be obtained after multiple partition
> discoveries. If a
> > restart occurs between this period, the restored
> > *unAssignedInitialPartitons* is also an empty collection, which means
> > empty-discovery. However, the next discovery should be treated as a
> new
> > discovery.
> >
> > We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
> > to snapshot state, which represents whether the first-discovery has been
> > done.
> >
> > Also two rejected alternatives :
> >
> > 1. Change the KafkaSourceEnumerator's snapshotState method to a
> blocking
> > one, which resumes only after the first-discovered partition has been
> > successfully assigned to KafkaSourceReader. The advantage of this
> approach
> > is no need to change the snapshot state's variable values. However,
> if
> > first-discovered partitions are not assigned before checkpointing,
> the
> > SourceCoordinator's event-loop thread will be blocked, but partition
> > assignment also requires the event-loop thread to execute, which
> will cause
> > thread self-locking.
> > 2. An alternative to the *firstDiscoveryDone* variable. If we change
> the
> > first discovery method to a synchronous method, we can ensure that
> Case1
> > will never happen. Because when the event-loop thread starts, it
> first adds
> > a discovery event to the blocking queue. When it turns to execute the
> > checkpoint event, the partition has already been discovered
> successfully.
> > However, If partition discovery is a heavily time-consuming
> operation, the
> > SourceCoordinator cannot process other event operations during the
> waiting
> > period, such as reader registration. It is a waste.
> >
> > Best regards,
> > Hongshun
> >
> > On 2023/01/13 03:31:20 Qingsheng Ren wrote:
> >> Hi devs,
> >>
> >> I’d like to start a discussion about enabling the dynamic partition
> >> discovery feature by default in Kafka source. Dynamic partition
> discovery
> >> [1] is a useful feature in Kafka source especially under the scenario
> when
> >> the consuming Kafka topic scales out, or the source subscribes to
> multiple
> >> Kafka topics with a pattern. Users don’t have to restart the Flink job
> to
> >> consume messages in the new partition with this feature enabled.
> > Currently,
> >> dynamic partition discovery is disabled by default and users have to
> >> explicitly specify the interval of discovery in order to turn it on.
> >>
> >> # Breaking changes
> >>
> >> For Kafka table source:
> >>
> >> - “scan.topic-partition-discovery.interval” will be set to 30 seconds by
> >> default.
> >> - As we need to provide a way for users to disable the feature,
> >> “scan.topic-partition-discovery.interval” = “0” will be used to tur

[DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-03-17 Thread Hongshun Wang
Hi everyone,

I would like to start a discussion on FLIP-288:Enable Dynamic Partition
Discovery by Default in Kafka Source[1].

As described in mail thread[2], dynamic partition discovery is disabled by
default and users have to explicitly specify the interval of discovery in
order to turn it on. Besides, if the initial offset strategy is LATEST,
same strategy is used for new partitions, leading to the loss of some data
(thinking a new partition is created and might be discovered by Kafka
source several minutes later, and the message produced into the partition
within the gap might be dropped if we use for example "latest" as the
initial offset strategy.)

The goals of this FLIP are as follows:

   1. Enable partition discovery by default.
   2. Use earliest as the offset strategy for new partitions after the
   first discovery.

Looking forward to hearing from you.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source

[2]  
https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln


Best,

Hongshun


Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-03-21 Thread Hongshun Wang
Hi, Hang,

Thanks for your advice.

When the second case will occur? Currently, there are three ways to specify
partitions in Kafka: by topic, by partition, and by matching the topic with
a regular expression. Currently, if the initial partition number is 0, an
error will occur for the first two methods. However, when using a regular
expression to match topics, it is allowed to have 0 matched topics.

> I don't know when the second case will occur


Why prefer the field `firstDiscoveryDone`? When a regular expression
initially matches 0 topics, it should consume all messages of the new
topic. If unassignedInitialPartitons and unassignedTopLevelPartitions are
used instead of firstDiscoveryDone, any new topics created during (5
minutes discovery + job restart time) will be treated as the first
discovery, causing data loss.

> Then when will we get the empty partition list? I think it should be
treated as the first initial discovery if both `unassignedInitialPartitons`
and `assignedPartitons` are empty without `firstDiscoveryDone`.

Best

Hongshun

On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan  wrote:

> Hi, Hongshun,
>
> Thank you for starting this discussion.  I have some problems about the
> field `firstDiscoveryDone`.
>
> In the FLIP, why we need firstDiscoveryDone is as follows.
> > Why do we need firstDiscoveryDone? Only relying on the
> unAssignedInitialPartitons attribute cannot distinguish between the
> following two cases (which often occur in pattern mode):
> > The first partition discovery is so slow, before which the checkpoint is
> executed and then job is restarted . At this time, the restored
> unAssignedInitialPartitons is an empty set, which means non-discovery. The
> next discovery will be treated as first discovery.
> > The first time the partition is discovered is empty, and new partitions
> can only be found after multiple partition discoveries. If a restart occurs
> between this period, the restored unAssignedInitialPartitons is also an
> empty set, which means empty-discovery.The next discovery will be treated
> as new discovery.
>
> I don't know when the second case will occur. The partitions must be
> greater than 0 when creating topics. And I have read this note in the FLIP.
> > Note: The current design only applies to cases where all existing
> partitions can be discovered at once. If all old partitions cannot be
> discovered at once, the subsequent old partitions discovered will be
> treated as new partitions, leading to message duplication. Therefore, this
> point needs to be particularly noted.
>
> Then when will we get the empty partition list? I think it should be
> treated as the first initial discovery if both `unassignedInitialPartitons`
> and `assignedPartitons` are empty without `firstDiscoveryDone`.
>
> Besides that, I think the `unAssignedInitialPartitons` is better to be
> named `unassignedInitialPartitons`.
>
> Best,
> Hang
>
> Hongshun Wang  于2023年3月17日周五 18:42写道:
>
> > Hi everyone,
> >
> > I would like to start a discussion on FLIP-288:Enable Dynamic Partition
> > Discovery by Default in Kafka Source[1].
> >
> > As described in mail thread[2], dynamic partition discovery is disabled
> by
> > default and users have to explicitly specify the interval of discovery in
> > order to turn it on. Besides, if the initial offset strategy is LATEST,
> > same strategy is used for new partitions, leading to the loss of some
> data
> > (thinking a new partition is created and might be discovered by Kafka
> > source several minutes later, and the message produced into the partition
> > within the gap might be dropped if we use for example "latest" as the
> > initial offset strategy.)
> >
> > The goals of this FLIP are as follows:
> >
> >1. Enable partition discovery by default.
> >2. Use earliest as the offset strategy for new partitions after the
> >first discovery.
> >
> > Looking forward to hearing from you.
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
> >
> > [2]  <https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln>
> > https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln
> >
> >
> > Best,
> >
> > Hongshun
> >
>


Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-03-23 Thread Hongshun Wang
Hi Shammon,

Thanks for your advise!  I learn a lot about TIMESTAMP/SPECIFIC_OFFSET.
That's interesting.

However, I have a different opinion.

If a user employs the SPECIFIC_OFFSET strategy and enables auto-discovery,
they will be able to find new partitions beyond the specified offset.
Otherwise, enabling auto-discovery is no sense.

When it comes to the TIMESTAMP strategy, it seems to be trivial. I
understand your concern, however, it’s the role of time window rather than
partition discovery. The TIMESTAMP strategy means that the consumer starts
from the first record whose timestamp is greater than or equal to a given
timestamp, rather than only consuming all records whose timestamp is
greater than or equal to the given timestamp. *Thus, even disable auto
discovery or discover new partitions with TIMESTAMP strategy, same problems
still occur.*

Above all , why use EARLIEST strategy? I believe that the strategy
specified by the startup should be the strategy at the moment of startup. *So
there is no difference between new partitions and new messages in old
partitions.* Therefore, all the new partition issues that you care about
will still appear even if you disable the partition, as new messages in old
partitions. If all new messages in old partitions should be consume, all
new messages in old partitions should also be consume.


Best,
Hongshun

On Thu, Mar 23, 2023 at 8:34 PM Shammon FY  wrote:

> Hi Hongshun
>
> Thanks for driving this discussion. Automatically discovering partitions
> without losing data sounds great!
>
> Currently flink supports kafka source with different startup modes, such as
> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET.
>
> If I understand correctly, you will set the offset of new partitions with
> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup mode
> for new partitions is not suitable if users set TIMESTAMP/SPECIFIC_OFFSET
> for kafka in their jobs.
>
> For an extreme example, the current time is 2023-03-23 15:00:00 and users
> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a partition
> is added during this period, jobs will generate “surprising” data. What do
> you think of it?
>
>
> Best,
> Shammon FY
>
>
> On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang 
> wrote:
>
> > Hi, Hang,
> >
> > Thanks for your advice.
> >
> > When the second case will occur? Currently, there are three ways to
> specify
> > partitions in Kafka: by topic, by partition, and by matching the topic
> with
> > a regular expression. Currently, if the initial partition number is 0, an
> > error will occur for the first two methods. However, when using a regular
> > expression to match topics, it is allowed to have 0 matched topics.
> >
> > > I don't know when the second case will occur
> >
> >
> > Why prefer the field `firstDiscoveryDone`? When a regular expression
> > initially matches 0 topics, it should consume all messages of the new
> > topic. If unassignedInitialPartitons and unassignedTopLevelPartitions are
> > used instead of firstDiscoveryDone, any new topics created during (5
> > minutes discovery + job restart time) will be treated as the first
> > discovery, causing data loss.
> >
> > > Then when will we get the empty partition list? I think it should be
> > treated as the first initial discovery if both
> `unassignedInitialPartitons`
> > and `assignedPartitons` are empty without `firstDiscoveryDone`.
> >
> > Best
> >
> > Hongshun
> >
> > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan 
> wrote:
> >
> > > Hi, Hongshun,
> > >
> > > Thank you for starting this discussion.  I have some problems about the
> > > field `firstDiscoveryDone`.
> > >
> > > In the FLIP, why we need firstDiscoveryDone is as follows.
> > > > Why do we need firstDiscoveryDone? Only relying on the
> > > unAssignedInitialPartitons attribute cannot distinguish between the
> > > following two cases (which often occur in pattern mode):
> > > > The first partition discovery is so slow, before which the checkpoint
> > is
> > > executed and then job is restarted . At this time, the restored
> > > unAssignedInitialPartitons is an empty set, which means non-discovery.
> > The
> > > next discovery will be treated as first discovery.
> > > > The first time the partition is discovered is empty, and new
> partitions
> > > can only be found after multiple partition discoveries. If a restart
> > occurs
> > > between this period, the restored unAssignedInitialPartitons is also an
> > > empty set, which means empty-discovery

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-03-23 Thread Hongshun Wang
"If all new messages in old partitions should be consumed, all new messages
in new partitions should also be consumed."

Sorry, I wrote the last sentence incorrectly.

On Fri, Mar 24, 2023 at 11:15 AM Hongshun Wang 
wrote:

> Hi Shammon,
>
> Thanks for your advise!  I learn a lot about TIMESTAMP/SPECIFIC_OFFSET.
> That's interesting.
>
> However, I have a different opinion.
>
> If a user employs the SPECIFIC_OFFSET strategy and enables auto-discovery,
> they will be able to find new partitions beyond the specified offset.
> Otherwise, enabling auto-discovery is no sense.
>
> When it comes to the TIMESTAMP strategy, it seems to be trivial. I
> understand your concern, however, it’s the role of time window rather than
> partition discovery. The TIMESTAMP strategy means that the consumer starts
> from the first record whose timestamp is greater than or equal to a given
> timestamp, rather than only consuming all records whose timestamp is
> greater than or equal to the given timestamp. *Thus, even disable auto
> discovery or discover new partitions with TIMESTAMP strategy, same problems
> still occur.*
>
> Above all , why use EARLIEST strategy? I believe that the strategy
> specified by the startup should be the strategy at the moment of startup. *So
> there is no difference between new partitions and new messages in old
> partitions.* Therefore, all the new partition issues that you care about
> will still appear even if you disable the partition, as new messages in old
> partitions. If all new messages in old partitions should be consume, all
> new messages in old partitions should also be consume.
>
>
> Best,
> Hongshun
>
> On Thu, Mar 23, 2023 at 8:34 PM Shammon FY  wrote:
>
>> Hi Hongshun
>>
>> Thanks for driving this discussion. Automatically discovering partitions
>> without losing data sounds great!
>>
>> Currently flink supports kafka source with different startup modes, such
>> as
>> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET.
>>
>> If I understand correctly, you will set the offset of new partitions with
>> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup
>> mode
>> for new partitions is not suitable if users set TIMESTAMP/SPECIFIC_OFFSET
>> for kafka in their jobs.
>>
>> For an extreme example, the current time is 2023-03-23 15:00:00 and users
>> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a partition
>> is added during this period, jobs will generate “surprising” data. What do
>> you think of it?
>>
>>
>> Best,
>> Shammon FY
>>
>>
>> On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang 
>> wrote:
>>
>> > Hi, Hang,
>> >
>> > Thanks for your advice.
>> >
>> > When the second case will occur? Currently, there are three ways to
>> specify
>> > partitions in Kafka: by topic, by partition, and by matching the topic
>> with
>> > a regular expression. Currently, if the initial partition number is 0,
>> an
>> > error will occur for the first two methods. However, when using a
>> regular
>> > expression to match topics, it is allowed to have 0 matched topics.
>> >
>> > > I don't know when the second case will occur
>> >
>> >
>> > Why prefer the field `firstDiscoveryDone`? When a regular expression
>> > initially matches 0 topics, it should consume all messages of the new
>> > topic. If unassignedInitialPartitons and unassignedTopLevelPartitions
>> are
>> > used instead of firstDiscoveryDone, any new topics created during (5
>> > minutes discovery + job restart time) will be treated as the first
>> > discovery, causing data loss.
>> >
>> > > Then when will we get the empty partition list? I think it should be
>> > treated as the first initial discovery if both
>> `unassignedInitialPartitons`
>> > and `assignedPartitons` are empty without `firstDiscoveryDone`.
>> >
>> > Best
>> >
>> > Hongshun
>> >
>> > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan 
>> wrote:
>> >
>> > > Hi, Hongshun,
>> > >
>> > > Thank you for starting this discussion.  I have some problems about
>> the
>> > > field `firstDiscoveryDone`.
>> > >
>> > > In the FLIP, why we need firstDiscoveryDone is as follows.
>> > > > Why do we need firstDiscoveryDone? Only relying on the
>> > > unAssignedInitialPartitons attribute cannot distinguish between the
>> > > following two cases (w

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-03-27 Thread Hongshun Wang
Hi Shammon,


Thanks a lot for your advise. I agree with your opinion now. It seems that
I forgot to consider that it may be at a certain point in the future.


I will modify OffsetsInitializer to provide a different strategy for later
discovered partitions, by which users can also customize strategies for new
and old partitions.

 WDYT?


Yours

Hongshun

On Tue, Mar 28, 2023 at 9:00 AM Shammon FY  wrote:

> Hi Hongshun
>
> Thanks for your answer.
>
> I think the startup offset of Kafka such as timestamp or
> specific_offset has no relationship with `Window Operator`. Users can
> freely set the starting position according to their needs, it may be before
> the latest Kafka data, or it may be at a certain point in the future.
>
> The offsets set by users in Kafka can be divided into four types at the
> moment: EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSET. The new discovered
> partitions may need to be handled with different strategies for these four
> types:
>
> 1. EARLIEST, use EARLIEST for the new discovered partitions
> 2. LATEST, use EARLIEST for the new discovered partitions
> 3. TIMESTAMP, use TIMESTAMP for the new discovered partitions
> 4. SPECIFIC_OFFSET, use SPECIFIC_OFFSET for the new discovered partitions
>
> From above, it seems that we only need to do special processing for
> EARLIEST. What do you think of it?
>
> Best,
> Shammon FY
>
>
> On Fri, Mar 24, 2023 at 11:23 AM Hongshun Wang 
> wrote:
>
> > "If all new messages in old partitions should be consumed, all new
> messages
> > in new partitions should also be consumed."
> >
> > Sorry, I wrote the last sentence incorrectly.
> >
> > On Fri, Mar 24, 2023 at 11:15 AM Hongshun Wang 
> > wrote:
> >
> > > Hi Shammon,
> > >
> > > Thanks for your advise!  I learn a lot about TIMESTAMP/SPECIFIC_OFFSET.
> > > That's interesting.
> > >
> > > However, I have a different opinion.
> > >
> > > If a user employs the SPECIFIC_OFFSET strategy and enables
> > auto-discovery,
> > > they will be able to find new partitions beyond the specified offset.
> > > Otherwise, enabling auto-discovery is no sense.
> > >
> > > When it comes to the TIMESTAMP strategy, it seems to be trivial. I
> > > understand your concern, however, it’s the role of time window rather
> > than
> > > partition discovery. The TIMESTAMP strategy means that the consumer
> > starts
> > > from the first record whose timestamp is greater than or equal to a
> given
> > > timestamp, rather than only consuming all records whose timestamp is
> > > greater than or equal to the given timestamp. *Thus, even disable auto
> > > discovery or discover new partitions with TIMESTAMP strategy, same
> > problems
> > > still occur.*
> > >
> > > Above all , why use EARLIEST strategy? I believe that the strategy
> > > specified by the startup should be the strategy at the moment of
> > startup. *So
> > > there is no difference between new partitions and new messages in old
> > > partitions.* Therefore, all the new partition issues that you care
> about
> > > will still appear even if you disable the partition, as new messages in
> > old
> > > partitions. If all new messages in old partitions should be consume,
> all
> > > new messages in old partitions should also be consume.
> > >
> > >
> > > Best,
> > > Hongshun
> > >
> > > On Thu, Mar 23, 2023 at 8:34 PM Shammon FY  wrote:
> > >
> > >> Hi Hongshun
> > >>
> > >> Thanks for driving this discussion. Automatically discovering
> partitions
> > >> without losing data sounds great!
> > >>
> > >> Currently flink supports kafka source with different startup modes,
> such
> > >> as
> > >> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET.
> > >>
> > >> If I understand correctly, you will set the offset of new partitions
> > with
> > >> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup
> > >> mode
> > >> for new partitions is not suitable if users set
> > TIMESTAMP/SPECIFIC_OFFSET
> > >> for kafka in their jobs.
> > >>
> > >> For an extreme example, the current time is 2023-03-23 15:00:00 and
> > users
> > >> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a
> > partition
> > >> is added during this period, jobs will generate “surprising” data.
> What
> > do
&

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-03-30 Thread Hongshun Wang
Hi everyone,
Thanks for your participation.

@Gordon, I looked at the several questions you raised:

   1. Should we use the firstDiscovery flag or two separate
   OffsetsInitializers? Actually, I have considered later. If we follow my
   initial idea, we can provide a default earliest OffsetsInitializer for a
   new partition. However, According to @Shammon's suggestion, different
   startup OffsetsInitializers correspond to different post-startup
   OffsetsInitializers for Flink's built-in offset strategies.
   2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
   again, and it seems that neither @Shammon nor I have figured out .
   TimestampOffsetsInitializer#getPartitionOffsets has a comment: "First
   get the current end offsets of the partitions. This is going to be used in
   case we cannot find a suitable offset based on the timestamp, i.e., the
   message meeting the requirement of the timestamp has not been produced to
   Kafka yet. *In this case, we just use the latest offset*." Therefore,
   using the TimestampOffsetsInitializer will always have an offset at
   startup.
   3. Clarification on coupling SPECIFIC-OFFSET startup with SPECIFIC-OFFSET
   post-startup. SPECIFIC-OFFSET strategy already uses "auto.offset.reset"
   position for partitions that are not hit.

@Gordon, @Shammon, @Leonard, the core issue we are concerned about is
whether the offset specified at the beginning includes non-exist
partitions. The previous design may have SPECIFIC-OFFSET startup with
future partition. However, I think since different strategies have been
used for the first discovered partition and the later discovered partition,
the specified offset at startup should be the partitions that have been
confirmed to exist, if not an error will be thrown. If partitions still not
exist, it should be specified in the post-startup OffsetsInitializers
(default EARLIEST).

Best
Hongshun


On Thu, Mar 30, 2023 at 1:43 PM Shammon FY  wrote:

> Thanks Gordon and Leonard
>
> I'm sorry that there is no specific case from my side, but I consider the
> issue as follows
>
> 1. Users may set an offset later than the current time because Flink does
> not limit it
> 2. If we use EARLIEST for a newly discovered partition with different
> OFFSETs, which may be different from the previous strategy. I think it's
> best to keep the same strategy as before if it does not cause data losing
> 3. I think support different OFFSETs in the FLIP will not make the
> implementation more complexity
>
> Of course, if it is confirmed that this is an illegal Timestamp OFFSET and
> Flink validate it. Then we can use the same strategy to apply to the newly
> discovered partition, I think this will be nice too
>
> Best,
> Shammon FY
>
>
> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu  wrote:
>
> > Thanks Hongshun and Shammon for driving the FLIP!
> >
> >
> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
> > SPECIFIC-OFFSET
> > > post-startup*
> >
> > Grodan raised a good point about the future TIMESTAMP and
> SPECIFIC-OFFSET,
> > the timestamps/offset of the newly added partition is undetermined when
> the
> > job starts (the partition has not been created yet), and it is the
> > timestamps/offset in the future.
> >
> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In my past
> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually used
> > to specify existing timestamps/offset, which are used for business
> > scenarios such as backfilling data and re-refreshing data. At present,
> It's
> > hard to imagine a user scenario specifying a future timestamp to filter
> > data in the current topic of message queue system. Is it overthinking to
> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> >
> >
> > Best,
> > Leonard
>


Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-04-11 Thread Hongshun Wang
Hi everyone,

I have already modified FLIP-288 to provide a
newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
KafkaSourceEnumerator. Users can use
KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new
partitions.

Surely, enabling the partition discovery strategy by default and modifying
the offset strategy for new partitions should be brought to the user's
attention. Therefore, it will be explained in the 1.18 release notes.

WDYT?CC, Ruan, Shammon, Gordon and Leonard.


Best,

Hongshun

On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang 
wrote:

> Hi everyone,
> Thanks for your participation.
>
> @Gordon, I looked at the several questions you raised:
>
>1. Should we use the firstDiscovery flag or two separate
>OffsetsInitializers? Actually, I have considered later. If we follow
>my initial idea, we can provide a default earliest OffsetsInitializer
>for a new partition. However, According to @Shammon's suggestion, different
>startup OffsetsInitializers correspond to different post-startup
>OffsetsInitializers for Flink's built-in offset strategies.
>2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
>again, and it seems that neither @Shammon nor I have figured out .
>TimestampOffsetsInitializer#getPartitionOffsets has a comment: "First
>get the current end offsets of the partitions. This is going to be used in
>case we cannot find a suitable offset based on the timestamp, i.e., the
>message meeting the requirement of the timestamp has not been produced to
>Kafka yet. *In this case, we just use the latest offset*." Therefore,
>using the TimestampOffsetsInitializer will always have an offset at
>startup.
>3. Clarification on coupling SPECIFIC-OFFSET startup with
>SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses
>"auto.offset.reset" position for partitions that are not hit.
>
> @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> whether the offset specified at the beginning includes non-exist
> partitions. The previous design may have SPECIFIC-OFFSET startup with
> future partition. However, I think since different strategies have been
> used for the first discovered partition and the later discovered partition,
> the specified offset at startup should be the partitions that have been
> confirmed to exist, if not an error will be thrown. If partitions still not
> exist, it should be specified in the post-startup OffsetsInitializers
> (default EARLIEST).
>
> Best
> Hongshun
>
>
> On Thu, Mar 30, 2023 at 1:43 PM Shammon FY  wrote:
>
>> Thanks Gordon and Leonard
>>
>> I'm sorry that there is no specific case from my side, but I consider the
>> issue as follows
>>
>> 1. Users may set an offset later than the current time because Flink does
>> not limit it
>> 2. If we use EARLIEST for a newly discovered partition with different
>> OFFSETs, which may be different from the previous strategy. I think it's
>> best to keep the same strategy as before if it does not cause data losing
>> 3. I think support different OFFSETs in the FLIP will not make the
>> implementation more complexity
>>
>> Of course, if it is confirmed that this is an illegal Timestamp OFFSET and
>> Flink validate it. Then we can use the same strategy to apply to the newly
>> discovered partition, I think this will be nice too
>>
>> Best,
>> Shammon FY
>>
>>
>> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu  wrote:
>>
>> > Thanks Hongshun and Shammon for driving the FLIP!
>> >
>> >
>> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
>> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
>> > SPECIFIC-OFFSET
>> > > post-startup*
>> >
>> > Grodan raised a good point about the future TIMESTAMP and
>> SPECIFIC-OFFSET,
>> > the timestamps/offset of the newly added partition is undetermined when
>> the
>> > job starts (the partition has not been created yet), and it is the
>> > timestamps/offset in the future.
>> >
>> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In my past
>> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually
>> used
>> > to specify existing timestamps/offset, which are used for business
>> > scenarios such as backfilling data and re-refreshing data. At present,
>> It's
>> > hard to imagine a user scenario specifying a future timestamp to filter
>> > data in the current topic of message queue system. Is it overthinking to
>> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
>> >
>> >
>> > Best,
>> > Leonard
>>
>


Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-04-18 Thread Hongshun Wang
Hi Shammon,

Thank you for your advice.I have carefully considered whether to show this
in SQL DDL. Therefore, I carefully studied whether it is feasible Recently

However,  after reading the corresponding code more thoroughly, it appears
that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not
work as we initially thought. Finally, I have decided to only use
"EARLIEST" instead of allowing the user to make a free choice.

Now, let me show my new understanding.

The actual work of SpecifiedOffsetsInitializer and
TimestampOffsetsInitializer:


   - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified
   partitions while use *EARLIEST* for unspecified partitions. Specified
   partitions offset should be less than the latest offset, otherwise it will
   start from the *EARLIEST*.
   - *TimestampOffsetsInitializer*: Initialize the offsets based on a
   timestamp. If the message meeting the requirement of the timestamp have not
   been produced to Kafka yet, just use the *LATEST* offset.

So, some problems will occur when new partition use
SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find
more information in the "Rejected Alternatives" section of Flip-288, which
includes details of the code and process of deductive reasoning.
All these problems can be reproducible in the current version. The reason
why they haven't been exposed is probably because users usually set the
existing specified offset or timestamp, so it appears as earliest in
production.

WDYT?
CC:Ruan, Shammon, Gordon, Leonard and Qingsheng.

Yours

Hongshun




On Fri, Apr 14, 2023 at 5:48 PM Shammon FY  wrote:

> Hi Hongshun
>
> Thanks for updating the FLIP, it totally sounds good to me.
>
> I just have one comment: How does sql job set new discovery offsets
> initializer?
> I found `DataStream` jobs can set different offsets initializers for new
> discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do SQL
> jobs need to support this feature?
>
> Best,
> Shammon FY
>
> On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang 
> wrote:
>
> > Hi everyone,
> >
> > I have already modified FLIP-288 to provide a
> > newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> > KafkaSourceEnumerator. Users can use
> > KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new
> > partitions.
> >
> > Surely, enabling the partition discovery strategy by default and
> modifying
> > the offset strategy for new partitions should be brought to the user's
> > attention. Therefore, it will be explained in the 1.18 release notes.
> >
> > WDYT?CC, Ruan, Shammon, Gordon and Leonard.
> >
> >
> > Best,
> >
> > Hongshun
> >
> > On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang 
> > wrote:
> >
> > > Hi everyone,
> > > Thanks for your participation.
> > >
> > > @Gordon, I looked at the several questions you raised:
> > >
> > >1. Should we use the firstDiscovery flag or two separate
> > >OffsetsInitializers? Actually, I have considered later. If we follow
> > >my initial idea, we can provide a default earliest
> OffsetsInitializer
> > >for a new partition. However, According to @Shammon's suggestion,
> > different
> > >startup OffsetsInitializers correspond to different post-startup
> > >OffsetsInitializers for Flink's built-in offset strategies.
> > >2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
> > >again, and it seems that neither @Shammon nor I have figured out .
> > >TimestampOffsetsInitializer#getPartitionOffsets has a comment:
> "First
> > >get the current end offsets of the partitions. This is going to be
> > used in
> > >case we cannot find a suitable offset based on the timestamp, i.e.,
> > the
> > >message meeting the requirement of the timestamp has not been
> > produced to
> > >Kafka yet. *In this case, we just use the latest offset*."
> Therefore,
> > >using the TimestampOffsetsInitializer will always have an offset at
> > >startup.
> > >3. Clarification on coupling SPECIFIC-OFFSET startup with
> > >SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses
> > >"auto.offset.reset" position for partitions that are not hit.
> > >
> > > @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> > > whether the offset specified at the beginning includes non-exist
> > > partitions. The previous design may have SPECIFIC-OFFSET startup with

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-04-18 Thread Hongshun Wang
Hi Shammon,

I agree with you. Since only EARLIEST is used, it's better not to mislead
users through the interface.


Yours

Hongshun

On Tue, Apr 18, 2023 at 7:12 PM Shammon FY  wrote:

> Hi Hongshun
>
> Thanks for your explanation, I have got your point. I review the FLIP again
> and only have one minor comment which won't block this FLIP: should we need
> in `OffsetsInitializer newDiscoveryOffsetsInitializer` in the constructor
> of `KafkaSourceEnumerator`?  I think we can remove it if we always use
> EARLIEST for new discovery partitions.
>
> Best,
> Shammon FY
>
> On Tue, Apr 18, 2023 at 4:59 PM Hongshun Wang 
> wrote:
>
> > Hi Shammon,
> >
> > Thank you for your advice.I have carefully considered whether to show
> this
> > in SQL DDL. Therefore, I carefully studied whether it is feasible
> Recently
> >
> > However,  after reading the corresponding code more thoroughly, it
> appears
> > that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not
> > work as we initially thought. Finally, I have decided to only use
> > "EARLIEST" instead of allowing the user to make a free choice.
> >
> > Now, let me show my new understanding.
> >
> > The actual work of SpecifiedOffsetsInitializer and
> > TimestampOffsetsInitializer:
> >
> >
> >- *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified
> >partitions while use *EARLIEST* for unspecified partitions. Specified
> >partitions offset should be less than the latest offset, otherwise it
> > will
> >start from the *EARLIEST*.
> >- *TimestampOffsetsInitializer*: Initialize the offsets based on a
> >timestamp. If the message meeting the requirement of the timestamp
> have
> > not
> >been produced to Kafka yet, just use the *LATEST* offset.
> >
> > So, some problems will occur when new partition use
> > SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find
> > more information in the "Rejected Alternatives" section of Flip-288,
> which
> > includes details of the code and process of deductive reasoning.
> > All these problems can be reproducible in the current version. The reason
> > why they haven't been exposed is probably because users usually set the
> > existing specified offset or timestamp, so it appears as earliest in
> > production.
> >
> > WDYT?
> > CC:Ruan, Shammon, Gordon, Leonard and Qingsheng.
> >
> > Yours
> >
> > Hongshun
> >
> >
> >
> >
> > On Fri, Apr 14, 2023 at 5:48 PM Shammon FY  wrote:
> >
> > > Hi Hongshun
> > >
> > > Thanks for updating the FLIP, it totally sounds good to me.
> > >
> > > I just have one comment: How does sql job set new discovery offsets
> > > initializer?
> > > I found `DataStream` jobs can set different offsets initializers for
> new
> > > discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do
> > SQL
> > > jobs need to support this feature?
> > >
> > > Best,
> > > Shammon FY
> > >
> > > On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang  >
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I have already modified FLIP-288 to provide a
> > > > newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> > > > KafkaSourceEnumerator. Users can use
> > > > KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for
> > new
> > > > partitions.
> > > >
> > > > Surely, enabling the partition discovery strategy by default and
> > > modifying
> > > > the offset strategy for new partitions should be brought to the
> user's
> > > > attention. Therefore, it will be explained in the 1.18 release notes.
> > > >
> > > > WDYT?CC, Ruan, Shammon, Gordon and Leonard.
> > > >
> > > >
> > > > Best,
> > > >
> > > > Hongshun
> > > >
> > > > On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <
> loserwang1...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > > Thanks for your participation.
> > > > >
> > > > > @Gordon, I looked at the several questions you raised:
> > > > >
> > > > >1. Should we use the firstDiscovery flag or two separate
> > > > >OffsetsInitializers? Actually, I have considered later. If

[VOTE] FLIP-288: Enable Dynamic Partition Discovery by Default in Kafka Source

2023-04-20 Thread Hongshun Wang
Dear Flink Developers,


Thank you for providing feedback on FLIP-288: Enable Dynamic Partition
Discovery by Default in Kafka Source[1] on the discussion thread[2].

The goal of the FLIP is to enable partition discovery by default and set
EARLIEST offset strategy for later discovered partitions.


I am initiating a vote for this FLIP. The vote will be open for at least 72
hours, unless there is an objection or insufficient votes.


[1]: [
https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source](https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source)
[2]: [
https://lists.apache.org/thread/581f2xq5d1tlwc8gcr27gwkp3zp0wrg6](https://lists.apache.org/thread/581f2xq5d1tlwc8gcr27gwkp3zp0wrg6)


Best regards,
Hongshun


Re: [VOTE] FLIP-288: Enable Dynamic Partition Discovery by Default in Kafka Source

2023-04-26 Thread Hongshun Wang
Thanks, everyone, I'm closing this vote now. I'll follow up with the result
in a separate email.


Best

Hongshun

On Tue, Apr 25, 2023 at 1:54 PM Jing Ge  wrote:

> +1(binding)
>
> Best regards,
> Jing
>
> On Tue, Apr 25, 2023 at 5:17 AM Rui Fan <1996fan...@gmail.com> wrote:
>
> > +1 (binding)
> >
> > Best,
> > Rui Fan
> >
> > On Tue, Apr 25, 2023 at 10:06 AM Biao Geng  wrote:
> >
> > > +1 (non-binding)
> > > Best,
> > > Biao Geng
> > >
> > > Martijn Visser  于2023年4月24日周一 20:20写道:
> > >
> > > > +1 (binding)
> > > >
> > > > On Mon, Apr 24, 2023 at 4:10 AM Feng Jin 
> > wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > >
> > > > > Best,
> > > > > Feng
> > > > >
> > > > > On Mon, Apr 24, 2023 at 9:55 AM Hang Ruan 
> > > > wrote:
> > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > Best,
> > > > > > Hang
> > > > > >
> > > > > > Paul Lam  于2023年4月23日周日 11:58写道:
> > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > Best,
> > > > > > > Paul Lam
> > > > > > >
> > > > > > > > 2023年4月23日 10:57,Shammon FY  写道:
> > > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Shammon FY
> > > > > > > >
> > > > > > > > On Sun, Apr 23, 2023 at 10:35 AM Qingsheng Ren <
> > > renqs...@gmail.com
> > > > > > > <mailto:renqs...@gmail.com>> wrote:
> > > > > > > >
> > > > > > > >> Thanks for pushing this FLIP forward, Hongshun!
> > > > > > > >>
> > > > > > > >> +1 (binding)
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Qingsheng
> > > > > > > >>
> > > > > > > >> On Fri, Apr 21, 2023 at 2:52 PM Hongshun Wang <
> > > > > > loserwang1...@gmail.com>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >>> Dear Flink Developers,
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> Thank you for providing feedback on FLIP-288: Enable
> Dynamic
> > > > > > Partition
> > > > > > > >>> Discovery by Default in Kafka Source[1] on the discussion
> > > > > thread[2].
> > > > > > > >>>
> > > > > > > >>> The goal of the FLIP is to enable partition discovery by
> > > default
> > > > > and
> > > > > > > set
> > > > > > > >>> EARLIEST offset strategy for later discovered partitions.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> I am initiating a vote for this FLIP. The vote will be open
> > for
> > > > at
> > > > > > > least
> > > > > > > >> 72
> > > > > > > >>> hours, unless there is an objection or insufficient votes.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> [1]: [
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source](https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source)
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source%5D(https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source)>
> > <
&

[RESULT][VOTE] FLIP-288: Enable Dynamic Partition Discovery by Default in Kafka Source

2023-04-26 Thread Hongshun Wang
Hi everyone,

Happy to announce that FLIP-288[1] has been approved unanimously!

Voting included 11 votes, out of which 6 were binding -- and no

disapproving votes.[2]

   - Tzu-Li (Gordon) Tai (binding)
   - Leonard Xu (binding)
   - Qingsheng Ren(binding)
   - Shammon FY
   - Paul Lam
   - Hang Ruan
   - Feng Jin
   - Martijn Visser (binding)
   - Biao Geng
   - Rui Fan(binding)
   - Jing Ge(binding)

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
[2] https://lists.apache.org/thread/opbg0k2v2kdsyt6jt8c334kgv7mo8rk5

Best,

Hongshun


Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-04-27 Thread Hongshun Wang
Hi,Raman Verma*:*

Generally, Kafka responds quickly. However, as an asynchronous operation,
we cannot guarantee that there will be no abnormal operations, such as
temporary network issues. The two cases I mentioned are special situations
to better understand the concept.


Best

Hongshun

On Fri, Apr 28, 2023 at 9:48 AM Raman Verma  wrote:

> Hello Hongshun Wang,
>
> You have mentioned that the first partition discovery can be very slow
> (section: Why do we need initialDiscoveryFinished?)
>
> Do you mean that Kafka can be slow to respond. If so, any idea under what
> conditions Kafka would be slow.
> Or, is it just a matter of bad timing, where this call does not return
> before checkpoint.
>
> Thanks,
> Raman Verma
>
> On 2023/03/17 10:41:40 Hongshun Wang wrote:
> > Hi everyone,
> >
> > I would like to start a discussion on FLIP-288:Enable Dynamic Partition
> > Discovery by Default in Kafka Source[1].
> >
> > As described in mail thread[2], dynamic partition discovery is disabled
> by
> > default and users have to explicitly specify the interval of discovery in
> > order to turn it on. Besides, if the initial offset strategy is LATEST,
> > same strategy is used for new partitions, leading to the loss of some
> data
> > (thinking a new partition is created and might be discovered by Kafka
> > source several minutes later, and the message produced into the partition
> > within the gap might be dropped if we use for example "latest" as the
> > initial offset strategy.)
> >
> > The goals of this FLIP are as follows:
> >
> > 1. Enable partition discovery by default.
> > 2. Use earliest as the offset strategy for new partitions after the
> > first discovery.
> >
> > Looking forward to hearing from you.
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
> >
> > [2] <https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln>
> > https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln
> >
> >
> > Best,
> >
> > Hongshun
> >
>
>
> Sent from my iPad


[DISCUSS] Whether to handle removed partitions in KafkaSourceEnumerator ?

2023-05-09 Thread Hongshun Wang
Hi Devs,

The KafkaSourceEnumerator contains to-do comments and variables related to
the handling of removed partitions (such as
PartitionSplitChange#removedPartitions), but no actual handling has been
implemented despite them having been present for years..

I am wondering if we still need to implement it or if it is no longer
necessary to handle the removed partitions? If no longer necessary, why not
remove the corresponding code?


Best,

Hongshun


Whether to handle removed partitions in KafkaSourceEnumerator ?

2023-05-09 Thread Hongshun Wang
Hi Devs,

The KafkaSourceEnumerator contains to-do comments and variables related to
the handling of removed partitions (such as
PartitionSplitChange#removedPartitions), but no actual handling has been
implemented despite them having been present for years.

I am wondering if we still need to implement it or if it is no longer
necessary to handle the removed partitions? If no longer necessary, why not
remove it?

Best,

Hongshun


Whether to handle removed partitions in KafkaSourceEnumerator ?

2023-05-09 Thread Hongshun Wang
Hi Devs,

There are some to-do comments and variables related to the removed
partitions handle (such as PartitionSplitChange#removedPartitions) in
KafkaSourceEnumerator that have been around for years. I am wondering
whether  we still need to implement it or  no longer necessary to handle
the removed partitions? If no longer necessary, why not remove it?

Best,

Hongshun


Re: Whether to handle removed partitions in KafkaSourceEnumerator ?

2023-05-09 Thread Hongshun Wang
Hi Ran Tao,

Thank you for your response. It's very helpful.

I still have some questions:

Currently, if a partition is removed, the reader will not be notified about
it. Will this cause an exception with the split reader?If this happens, the
system will never be able to restart successfully, repeatedly.

Perhaps the reason why this issue has not been considered is because Kafka
does not allow for scaling partitions?


Yours

Hongshun


Re: Whether to handle removed partitions in KafkaSourceEnumerator ?

2023-05-10 Thread Hongshun Wang
Hi Ran Tao,

> task-side just continues to consume the removed partitions to the end.

I am curious about how task-side consumes removed partitions. Are you
saying that the task-side can still run without exceptions even if the
partition is removed from metadata?

If so, the "removed partitions" only affects the unassigned partitions. If
the partition has already been successfully unassigned, even if it no
longer exists, it will not affect the task-side.Therefore, I suggest
removing partitions from the pendingPartitionSplitAssignment so that
unassigned but removed partitions will not be assigned to the task side.
This assignment is redundant.

By the way, I wonder under what situations Kafka removes a partition?

Best

Hongshun


Re: Questions on checkpointing mechanism for FLIP-27 Source API

2023-05-24 Thread Hongshun Wang
Hi Hong,

The checkpoint is triggered by the timer executor of CheckpointCoordinator.
It triggers the checkpoint in SourceCoordinator (which is passed to
SplitEnumerator) and then in SourceOperator. The checkpoint event is put in
SplitEnumerator's event loop to be executed. You can see the details here.

Yours
Hongshun

On Wed, May 17, 2023 at 11:39 PM Teoh, Hong 
wrote:

> Hi all,
>
> I’m writing a new source based on the FLIP-27 Source API, and I had some
> questions on the checkpointing mechanisms and associated guarantees. Would
> appreciate if someone more familiar with the API would be able to provide
> insights here!
>
> In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a
> SourceReader (running on TM). However, the SourceReader can send events to
> the SplitEnumerator. Given this, we have introduced a “loopback”
> communication mechanism from TM to JM, and I wonder if/how we handle this
> during checkpoints.
>
>
> Example of how data might be lost:
> 1. Checkpoint 123 triggered
> 2. SplitEnumerator takes checkpoint of state for checkpoint 123
> 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this
> 4. SourceReader takes checkpoint of state for checkpoint 123
> …
> 5. Checkpoint 123 completes
>
> Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once
> processed, There is now inconsistent state between SourceReader state and
> SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is processed,
> whereas SplitEnumerator has not processed OperatorEvent 1)
>
> Do we have any mechanisms for mitigating this issue? For example, does the
> SplitEnumerator re-take the snapshot of state for a checkpoint if an
> OperatorEvent is sent before the checkpoint is complete?
>
> Regards,
> Hong


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-20 Thread Hongshun Wang
Hi Becket,

> Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue  as
a constructor parameter, which is not allowed  now.
Sorry, it was my writing mistake. What I meant is that *SplitFetcher*
requires FutureCompletingBlockingQueue as a constructor parameter. SplitFetcher
is a class rather than Interface. Therefore, I want to  change
SplitFetcher to a public Interface and moving its implementation
details to an implement
subclass .

Thanks,
Hongshun Wang

On Fri, Nov 17, 2023 at 6:21 PM Becket Qin  wrote:

> Hi Hongshun,
>
> SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is already
> an interface, and we need to make that as a PublicEvolving API as well.
>
> So overall, a source developer can potentially do a few things in the
> SplitFetcherManager.
> 1. for customized logic including split-to-fetcher assignment, threading
> model, etc.
> 2. create their own SplitFetcherTask for the SplitFetcher / SplitReader to
> execute in a coordinated manner.
>
> It should be powerful enough for the vast majority of the source
> implementation, if not all.
>
>
> Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > as a
> > constructor parameter, which is not allowed
> > now.
>
> Are you referring to FetchTask which implements SplitFetcherTask? That
> class will remain internal.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 17, 2023 at 5:23 PM Hongshun Wang 
> wrote:
>
> > Hi, Jiangjie(Becket) ,
> > Thank you for your advice. I have learned a lot.
> >
> > If SplitFetcherManager becomes PublicEvolving, that also means
> > > SplitFetcher needs to be PublicEvolving, because it is returned by the
> > > protected method SplitFetcherManager.createSplitFetcher().
> >
> > I completely agree with you. However, if SplitFetcher becomes
> > PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
> > because it is returned by the public method SplitFetcher#enqueueTask.
> > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > as a
> > constructor parameter, which is not allowed
> > now. Therefore, I propose changing SplitFetcher to a public Interface
> > and moving its implementation details to an implement class (e.g.,
> > SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
> > marked as internal and managed by SplitFetcherManager,
> > and put data in the queue.
> > Subclasses of SplitFetcherManager can only use the SplitFetcher
> interface,
> > also ensuring that the current subclasses are not affected.
> >
> >
> >
> > The current SplitFetcherManager basically looks up
> > > the SplitT from the fetcher with the split Id, and immediately passes
> the
> > > SplitT back to the fetcher, which is unnecessary.
> >
> > I inferred that this is because SplitReader#pauseOrResumeSplits
> > requires SplitT instead of SpiltId.  Perhaps some external source
> > requires more information to pause. However, SplitReader doesn't store
> > all its split data, while SplitFetcherManager saves them.
> > CC, @Dawid Wysakowicz
> >
> >
> >
> >  If not, SplitFetcher.pause() and
> > > SplitFetcher.resume() can be removed. In fact, they seem no longer used
> > > anywhere.
> >
> > It seems no use any more. CC, @Arvid Heise
> >
> >
> >
> > Thanks,
> > Hongshun Wang
> >
> > On Fri, Nov 17, 2023 at 11:42 AM Becket Qin 
> wrote:
> >
> > > Hi Hongshun,
> > >
> > > Thanks for updating the FLIP. I think that makes sense. A few comments
> > > below:
> > >
> > > 1. If SplitFetcherManager becomes PublicEvolving, that also means
> > > SplitFetcher needs to be PublicEvolving, because it is returned by the
> > > protected method SplitFetcherManager.createSplitFetcher().
> > >
> > > 2. When checking the API of the classes to be marked as PublicEvolving,
> > > there might be a few methods' signatures worth some discussion.
> > >
> > > For SplitFetcherManager:
> > > a) Currently removeSplits() methods takes a list of SplitT. I am
> > wondering
> > > if it should be a list of splitIds. SplitT actually contains two parts
> of
> > > information, the static split Id and some dynamically changing state of
> > the
> > > split (e.g. Kafka consumer offset). The source of truth for the dynamic
> > > state is SourceReaderBase. Currently we are passing in the full source
> > > split with the dynamic state for split removal. But it looks like only
> > &g

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-21 Thread Hongshun Wang
Hi Becket,

If SplitFetcherManager becomes PublicEvolving, that also means SplitFetcher
> needs to be PublicEvolving, because it is returned by the protected method
> SplitFetcherManager.createSplitFetcher().



> it looks like there is no need to expose the constructor of SplitFetcher
> to the end users. Having an interface of SplitFetcher is also fine, but
> might not be necessary in this case.



I don't know how to make SplitFetcher as PublicEnvolving but not  to expose
the constructor of SplitFetcher to the end users?

Thanks,
Hongshun Wang

On Tue, Nov 21, 2023 at 7:23 PM Becket Qin  wrote:

> Hi Hongshun,
>
> Do we need to expose the constructor of SplitFetcher to the users? Ideally,
> users should always get a new fetcher instance by calling
> SplitFetcherManager.createSplitFetcher(). Or, they can get an existing
> SplitFetcher by looking up in the SplitFetcherManager.fetchers map. I think
> this makes sense because a SplitFetcher should always belong to a
> SplitFetcherManager. Therefore, it should be created via a
> SplitFetcherManager as well. So, it looks like there is no need to expose
> the constructor of SplitFetcher to the end users.
>
> Having an interface of SplitFetcher is also fine, but might not be
> necessary in this case.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Nov 21, 2023 at 10:36 AM Hongshun Wang 
> wrote:
>
> > Hi Becket,
> >
> > > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> as
> > a constructor parameter, which is not allowed  now.
> > Sorry, it was my writing mistake. What I meant is that *SplitFetcher*
> > requires FutureCompletingBlockingQueue as a constructor parameter.
> > SplitFetcher
> > is a class rather than Interface. Therefore, I want to  change
> > SplitFetcher to a public Interface and moving its implementation
> > details to an implement
> > subclass .
> >
> > Thanks,
> > Hongshun Wang
> >
> > On Fri, Nov 17, 2023 at 6:21 PM Becket Qin  wrote:
> >
> > > Hi Hongshun,
> > >
> > > SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is
> > already
> > > an interface, and we need to make that as a PublicEvolving API as well.
> > >
> > > So overall, a source developer can potentially do a few things in the
> > > SplitFetcherManager.
> > > 1. for customized logic including split-to-fetcher assignment,
> threading
> > > model, etc.
> > > 2. create their own SplitFetcherTask for the SplitFetcher / SplitReader
> > to
> > > execute in a coordinated manner.
> > >
> > > It should be powerful enough for the vast majority of the source
> > > implementation, if not all.
> > >
> > >
> > > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > > > as a
> > > > constructor parameter, which is not allowed
> > > > now.
> > >
> > > Are you referring to FetchTask which implements SplitFetcherTask? That
> > > class will remain internal.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Fri, Nov 17, 2023 at 5:23 PM Hongshun Wang  >
> > > wrote:
> > >
> > > > Hi, Jiangjie(Becket) ,
> > > > Thank you for your advice. I have learned a lot.
> > > >
> > > > If SplitFetcherManager becomes PublicEvolving, that also means
> > > > > SplitFetcher needs to be PublicEvolving, because it is returned by
> > the
> > > > > protected method SplitFetcherManager.createSplitFetcher().
> > > >
> > > > I completely agree with you. However, if SplitFetcher becomes
> > > > PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
> > > > because it is returned by the public method SplitFetcher#enqueueTask.
> > > > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > > > as a
> > > > constructor parameter, which is not allowed
> > > > now. Therefore, I propose changing SplitFetcher to a public Interface
> > > > and moving its implementation details to an implement class (e.g.,
> > > > SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
> > > > marked as internal and managed by SplitFetcherManager,
> > > > and put data in the queue.
> > > > Subclasses of SplitFetcherManager can only use the SplitFetcher
> > > interface,
> > > > also ensuring that the current subclasses are not affected.
> > > >
> > > >

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-21 Thread Hongshun Wang
Hi Becket,

Thanks a lot, I have no problem any more. And I have made further
modifications to FLIP-389[1].
In summary, this flip has 2 goals:

   - Annotate SingleThreadFetcherManager as PublicEvolving.
   - Shield FutureCompletingBlockingQueue from users and limit all
   operations on FutureCompletingBlockingQueue in SplitFetcherManager.

All the changes are listed below:

   - Mark constructor of SourceReaderBase and
   SingleThreadMultiplexSourceReaderBase as @Depricated and provide a new
   constructor without FutureCompletingBlockingQueue.
   - Mark SplitFetcherManager andSingleThreadFetcherManager as
   `@PublicEvolving`,  mark constructor of SplitFetcherManager and
   SingleThreadFetcherManager as  @Depricated and provide a new constructor
   without FutureCompletingBlockingQueue.
   - SplitFetcherManager provides  wrapper methods for
   FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.
   - Mark SplitFetcher and SplitFetcherTask as PublicEvolving.


Any additional questions regarding this FLIP? Looking forward to hearing
from you.

Thanks,
Hongshun Wang

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498




On Wed, Nov 22, 2023 at 10:15 AM Becket Qin  wrote:

> Hi Hongshun,
>
> The constructor of the SplitFetcher is already package private. So it can
> only be accessed from the classes in the package
> org.apache.flink.connector.base.source.reader.fetcher. And apparently, user
> classes should not be in this package. Therefore, even if we mark the
> SplitFetcher class as PublicEvolving, the constructor is not available to
> the users. Only the public and protected methods are considered public API
> in this case. Private / package private methods and fields are still
> internal.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Nov 22, 2023 at 9:46 AM Hongshun Wang 
> wrote:
>
> > Hi Becket,
> >
> > If SplitFetcherManager becomes PublicEvolving, that also means
> SplitFetcher
> > > needs to be PublicEvolving, because it is returned by the protected
> > method
> > > SplitFetcherManager.createSplitFetcher().
> >
> >
> >
> > > it looks like there is no need to expose the constructor of
> SplitFetcher
> > > to the end users. Having an interface of SplitFetcher is also fine, but
> > > might not be necessary in this case.
> >
> >
> >
> > I don't know how to make SplitFetcher as PublicEnvolving but not  to
> expose
> > the constructor of SplitFetcher to the end users?
> >
> > Thanks,
> > Hongshun Wang
> >
> > On Tue, Nov 21, 2023 at 7:23 PM Becket Qin  wrote:
> >
> > > Hi Hongshun,
> > >
> > > Do we need to expose the constructor of SplitFetcher to the users?
> > Ideally,
> > > users should always get a new fetcher instance by calling
> > > SplitFetcherManager.createSplitFetcher(). Or, they can get an existing
> > > SplitFetcher by looking up in the SplitFetcherManager.fetchers map. I
> > think
> > > this makes sense because a SplitFetcher should always belong to a
> > > SplitFetcherManager. Therefore, it should be created via a
> > > SplitFetcherManager as well. So, it looks like there is no need to
> expose
> > > the constructor of SplitFetcher to the end users.
> > >
> > > Having an interface of SplitFetcher is also fine, but might not be
> > > necessary in this case.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Tue, Nov 21, 2023 at 10:36 AM Hongshun Wang <
> loserwang1...@gmail.com>
> > > wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > > Additionally, SplitFetcherTask requires
> FutureCompletingBlockingQueue
> > > as
> > > > a constructor parameter, which is not allowed  now.
> > > > Sorry, it was my writing mistake. What I meant is that *SplitFetcher*
> > > > requires FutureCompletingBlockingQueue as a constructor parameter.
> > > > SplitFetcher
> > > > is a class rather than Interface. Therefore, I want to  change
> > > > SplitFetcher to a public Interface and moving its implementation
> > > > details to an implement
> > > > subclass .
> > > >
> > > > Thanks,
> > > > Hongshun Wang
> > > >
> > > > On Fri, Nov 17, 2023 at 6:21 PM Becket Qin 
> > wrote:
> > > >
> > > > > Hi Hongshun,
> > > > >
> > > > > SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is
> > > > already
> > > > > an inter

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-30 Thread Hongshun Wang
Hi all,
Any additional questions or concern regarding this FLIP-389[1].? Looking
forward to hearing from you.

Thanks,
Hongshun Wang

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

On Wed, Nov 22, 2023 at 3:44 PM Hongshun Wang 
wrote:

> Hi Becket,
>
> Thanks a lot, I have no problem any more. And I have made further
> modifications to FLIP-389[1].
> In summary, this flip has 2 goals:
>
>- Annotate SingleThreadFetcherManager as PublicEvolving.
>- Shield FutureCompletingBlockingQueue from users and limit all
>operations on FutureCompletingBlockingQueue in SplitFetcherManager.
>
> All the changes are listed below:
>
>- Mark constructor of SourceReaderBase and
>SingleThreadMultiplexSourceReaderBase as @Depricated and provide a new
>constructor without FutureCompletingBlockingQueue.
>- Mark SplitFetcherManager andSingleThreadFetcherManager as
>`@PublicEvolving`,  mark constructor of SplitFetcherManager and
>SingleThreadFetcherManager as  @Depricated and provide a new constructor
>without FutureCompletingBlockingQueue.
>- SplitFetcherManager provides  wrapper methods for
>FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.
>- Mark SplitFetcher and SplitFetcherTask as PublicEvolving.
>
>
> Any additional questions regarding this FLIP? Looking forward to hearing
> from you.
>
> Thanks,
> Hongshun Wang
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
>
>
>
>
> On Wed, Nov 22, 2023 at 10:15 AM Becket Qin  wrote:
>
>> Hi Hongshun,
>>
>> The constructor of the SplitFetcher is already package private. So it can
>> only be accessed from the classes in the package
>> org.apache.flink.connector.base.source.reader.fetcher. And apparently,
>> user
>> classes should not be in this package. Therefore, even if we mark the
>> SplitFetcher class as PublicEvolving, the constructor is not available to
>> the users. Only the public and protected methods are considered public API
>> in this case. Private / package private methods and fields are still
>> internal.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Wed, Nov 22, 2023 at 9:46 AM Hongshun Wang 
>> wrote:
>>
>> > Hi Becket,
>> >
>> > If SplitFetcherManager becomes PublicEvolving, that also means
>> SplitFetcher
>> > > needs to be PublicEvolving, because it is returned by the protected
>> > method
>> > > SplitFetcherManager.createSplitFetcher().
>> >
>> >
>> >
>> > > it looks like there is no need to expose the constructor of
>> SplitFetcher
>> > > to the end users. Having an interface of SplitFetcher is also fine,
>> but
>> > > might not be necessary in this case.
>> >
>> >
>> >
>> > I don't know how to make SplitFetcher as PublicEnvolving but not  to
>> expose
>> > the constructor of SplitFetcher to the end users?
>> >
>> > Thanks,
>> > Hongshun Wang
>> >
>> > On Tue, Nov 21, 2023 at 7:23 PM Becket Qin 
>> wrote:
>> >
>> > > Hi Hongshun,
>> > >
>> > > Do we need to expose the constructor of SplitFetcher to the users?
>> > Ideally,
>> > > users should always get a new fetcher instance by calling
>> > > SplitFetcherManager.createSplitFetcher(). Or, they can get an existing
>> > > SplitFetcher by looking up in the SplitFetcherManager.fetchers map. I
>> > think
>> > > this makes sense because a SplitFetcher should always belong to a
>> > > SplitFetcherManager. Therefore, it should be created via a
>> > > SplitFetcherManager as well. So, it looks like there is no need to
>> expose
>> > > the constructor of SplitFetcher to the end users.
>> > >
>> > > Having an interface of SplitFetcher is also fine, but might not be
>> > > necessary in this case.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On Tue, Nov 21, 2023 at 10:36 AM Hongshun Wang <
>> loserwang1...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Becket,
>> > > >
>> > > > > Additionally, SplitFetcherTask requires
>> FutureCompletingBlockingQueue
>> > > as
>> > > > a constructor parameter, which is not allowed  now.
>> > > > Sorry, it was my writing mistake. What I meant is that
>> *SplitFe

Re: [PROPOSAL] Contribute Flink CDC Connectors project to Apache Flink

2023-12-06 Thread Hongshun Wang
So cool, Big +1 for this exciting work.

Best
Hongshun

On Thu, Dec 7, 2023 at 12:20 PM Qingsheng Ren  wrote:

> Thanks for kicking this off, Leonard!
>
> As one of the contributors of the CDC project, I'm truly honored to be part
> of the community and so excited to hear the news. CDC project was born from
> and developed together with Apache Flink, and we are so proud to be
> accepted by more and more users around the world.
>
> To put my Flink hat on, I believe having the CDC project as a part of
> Apache Flink will broadly expand our ecosystem and the usage scenarios.
> Both two projects will benefit from closer cooperation, so +1 from my side.
>
> Best,
> Qingsheng
>
> On Thu, Dec 7, 2023 at 12:06 PM Samrat Deb  wrote:
>
> > That's really cool :)
> > +1 for the great addition
> >
> > Bests,
> > Samrat
> >
> > On Thu, 7 Dec 2023 at 9:20 AM, Jingsong Li 
> wrote:
> >
> >> Wow, Cool, Nice
> >>
> >> CDC is playing an increasingly important role.
> >>
> >> +1
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Thu, Dec 7, 2023 at 11:25 AM Leonard Xu  wrote:
> >> >
> >> > Dear Flink devs,
> >> >
> >> > As you may have heard, we at Alibaba (Ververica) are planning to
> donate
> >> CDC Connectors for the Apache Flink project[1] to the Apache Flink
> >> community.
> >> >
> >> > CDC Connectors for Apache Flink comprise a collection of source
> >> connectors designed specifically for Apache Flink. These connectors[2]
> >> enable the ingestion of changes from various databases using Change Data
> >> Capture (CDC), most of these CDC connectors are powered by Debezium[3].
> >> They support both the DataStream API and the Table/SQL API, facilitating
> >> the reading of database snapshots and continuous reading of transaction
> >> logs with exactly-once processing, even in the event of failures.
> >> >
> >> >
> >> > Additionally, in the latest version 3.0, we have introduced many
> >> long-awaited features. Starting from CDC version 3.0, we've built a
> >> Streaming ELT Framework available for streaming data integration. This
> >> framework allows users to write their data synchronization logic in a
> >> simple YAML file, which will automatically be translated into a Flink
> >> DataStreaming job. It emphasizes optimizing the task submission process
> and
> >> offers advanced functionalities such as whole database synchronization,
> >> merging sharded tables, and schema evolution[4].
> >> >
> >> >
> >> > I believe this initiative is a perfect match for both sides. For the
> >> Flink community, it presents an opportunity to enhance Flink's
> competitive
> >> advantage in streaming data integration, promoting the healthy growth
> and
> >> prosperity of the Apache Flink ecosystem. For the CDC Connectors
> project,
> >> becoming a sub-project of Apache Flink means being part of a neutral
> >> open-source community, which can attract a more diverse pool of
> >> contributors.
> >> >
> >> > Please note that the aforementioned points represent only some of our
> >> motivations and vision for this donation. Specific future operations
> need
> >> to be further discussed in this thread. For example, the sub-project
> name
> >> after the donation; we hope to name it Flink-CDC aiming to streaming
> data
> >> intergration through Apache Flink, following the naming convention of
> >> Flink-ML; And this project is managed by a total of 8 maintainers,
> >> including 3 Flink PMC members and 1 Flink Committer. The remaining 4
> >> maintainers are also highly active contributors to the Flink community,
> >> donating this project to the Flink community implies that their
> permissions
> >> might be reduced. Therefore, we may need to bring up this topic for
> further
> >> discussion within the Flink PMC. Additionally, we need to discuss how to
> >> migrate existing users and documents. We have a user group of nearly
> 10,000
> >> people and a multi-version documentation site need to migrate. We also
> need
> >> to plan for the migration of CI/CD processes and other specifics.
> >> >
> >> >
> >> > While there are many intricate details that require implementation, we
> >> are committed to progressing and finalizing this donation process.
> >> >
> >> >
> >> > Despite being Flink’s most active ecological project (as evaluated by
> >> GitHub metrics), it also boasts a significant user base. However, I
> believe
> >> it's essential to commence discussions on future operations only after
> the
> >> community reaches a consensus on whether they desire this donation.
> >> >
> >> >
> >> > Really looking forward to hear what you think!
> >> >
> >> >
> >> > Best,
> >> > Leonard (on behalf of the Flink CDC Connectors project maintainers)
> >> >
> >> > [1] https://github.com/ververica/flink-cdc-connectors
> >> > [2]
> >>
> https://ververica.github.io/flink-cdc-connectors/master/content/overview/cdc-connectors.html
> >> > [3] https://debezium.io
> >> > [4]
> >>
> https://ververica.github.io/flink-cdc-connectors/master/content/overview/cdc-pipeline.html
> >>
> >

Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-12-19 Thread Hongshun Wang
Hi Becket,


It has been a long time since we last discussed. Are there any other
problems with this Flip from your side? I am looking forward to hearing
from you.


Thanks,
Hongshun Wang


[VOTE] FLIP-389: Annotate SingleThreadFetcherManager as PublicEvolving

2024-01-03 Thread Hongshun Wang
Dear Flink Developers,

Thank you for providing feedback on FLIP-389: Annotate
SingleThreadFetcherManager as PublicEvolving[1] on the discussion
thread[2]. The goal of the FLIP is as follows:

   - To expose the SplitFetcherManager / SingleThreadFetcheManager as
   Public, allowing connector developers to easily create their own threading
   models in the SourceReaderBase by implementing addSplits(), removeSplits(),
   maybeShutdownFinishedFetchers() and other functions.
   - To hide the element queue from the connector developers and simplify
   the SourceReaderBase to consist of only SplitFetcherManager and
   RecordEmitter as major components.


Any additional questions regarding this FLIP? Looking forward to hearing
from you.


Thanks,
Hongshun Wang


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

[2] https://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2024-01-10 Thread Hongshun Wang
 and RecordEmitter
>> as major components.
>>
>> In short, the public interface section answers the question of "what". We
>> should list all the user-sensible changes in the public interface section,
>> without verbose explanation. The proposed changes section answers "how",
>> where we can add more details to explain the changes listed in the public
>> interface section.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>> On Wed, Dec 20, 2023 at 10:07 AM Hongshun Wang 
>> wrote:
>>
>> > Hi Becket,
>> >
>> >
>> > It has been a long time since we last discussed. Are there any other
>> > problems with this Flip from your side? I am looking forward to hearing
>> > from you.
>> >
>> >
>> > Thanks,
>> > Hongshun Wang
>> >
>>
>


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2024-01-18 Thread Hongshun Wang
Hi Devs,
  Thanks for all your advice, it helps a lot. I have already revised
the document[1] and started a vote[2].

Thanks,

Hongshun


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

[2] https://lists.apache.org/thread/t1zff21z440pvv48jyhm8pgtqsyplchn

On Fri, Jan 12, 2024 at 1:00 AM Becket Qin  wrote:

> Hi Qingsheng,
>
> Thanks for the comment. I think the initial idea is to hide the queue
> completely from the users, i.e. make FutureCompletingBlockingQueue class
> internal. If it is OK to expose the class to the users, then just returning
> the queue sounds reasonable to me.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Jan 10, 2024 at 10:39 PM Hongshun Wang 
> wrote:
>
>> Hi Qingsheng,
>>
>>
>> I agree with you that it would be clearer to have a new interface that
>> extracts the SplitFetcher creation and management logic from the current
>> SplitFetcherManager. However, extensive modifications to the interface may
>> influence a lot and cause compatibility issues. Perhaps we can consider
>> doing it later, rather than in this FLIP.
>>
>>
>> Adding a new internal method, SplitFetcherManager#getQueue(), to
>> SourceReaderBase seems to be a better option than exposing methods like
>> poll and notifyAvailable on SplitFetcherManager.
>>
>>
>> I have taken this valuable suggestion and updated the FLIP accordingly.
>>
>>
>> Thanks,
>>
>> Hongshun
>>
>> On Thu, Jan 11, 2024 at 2:09 PM Qingsheng Ren  wrote:
>>
>>> Hi Hongshun and Becket,
>>>
>>> Sorry for being late in the discussion! I went through the entire FLIP
>>> but I still have some concerns about the new SplitFetcherManager.
>>>
>>> First of all I agree that we should hide the elementQueue from connector
>>> developers. This could simplify the interface exposed to developers so that
>>> they can focus on the interaction with external systems.
>>>
>>> However in the current FLIP, SplitFetcherManager exposes 4 more methods,
>>> poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which
>>> are tightly coupled with the implementation of the elementQueue. The naming
>>> of these methods look weird to me, like what does it mean to "poll from a
>>> SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify
>>> these methods we have to explain to developers that "well we hide a queue
>>> inside SplitFetcherMamager and the poll method is actually polling from the
>>> queue". I'm afraid these methods will implicitly expose the concept and the
>>> implementation of the queue to developers.
>>>
>>> I think a cleaner solution would be having a new interface that extracts
>>> SplitFetcher creating and managing logic from the current
>>> SplitFetcherManager, but having too many concepts might make the entire
>>> Source API even harder to understand. To make a compromise, I'm considering
>>> only exposing constructors of SplitFetcherManager as public APIs, and
>>> adding a new internal method SplitFetcherManager#getQueue() for
>>> SourceReaderBase (well it's a bit hacky I admit but I think exposing
>>> methods like poll and notifyAvailable on SplitFetcherManager is even
>>> worth). WDTY?
>>>
>>> Thanks,
>>> Qingsheng
>>>
>>> On Thu, Dec 21, 2023 at 8:36 AM Becket Qin  wrote:
>>>
>>>> Hi Hongshun,
>>>>
>>>> I think the proposal in the FLIP is basically fine. A few minor
>>>> comments:
>>>>
>>>> 1. In FLIPs, we define all the user-sensible changes as public
>>>> interfaces.
>>>> The public interface section should list all of them. So, the code
>>>> blocks
>>>> currently in the proposed changes section should be put into the public
>>>> interface section instead.
>>>>
>>>> 2. It would be good to put all the changes of one class together. For
>>>> example, for SplitFetcherManager, we can say:
>>>> - Change SplitFetcherManager from Internal to PublicEvolving.
>>>> - Deprecate the old constructor exposing the
>>>> FutureCompletingBlockingQueue, and add new constructors as replacements
>>>> which creates the FutureCompletingBlockingQueue instance internally.
>>>> - Add a few new methods to expose the functionality of the internal
>>>> FutureCompletingBlockingQueue via the SplitFetcherManager.
>>>>And th

[RESULT][VOTE] FLIP-389 Annotate SingleThreadFetcherManager as PublicEvolving.

2024-01-22 Thread Hongshun Wang
Hi devs,

I'm glad to announce that the FLIP-389[1]  has been accepted. The voting
thread is here[2].

The proposal received 4 approving votes, three of which are binding:
 - Leonard Xu (binding)
 - Qingsheng Ren (binding)
 - Xuyang (non-binding)
 - Ron liu (binding)

 And there is no disapproving one.

Thanks to all participants for discussion and voting!

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
[2] https://lists.apache.org/thread/t1zff21z440pvv48jyhm8pgtqsyplchn

Best,
Hongshun Wang


Re: [DISCUSS] Release new version of Flink's Kafka connector

2024-01-25 Thread Hongshun Wang
Hi Martin,

Thank you for your invitation. The idea of adding new improvements to
either version V3.1 or V4.0 sounds appealing to me.

> if the functionality was added in a
> backwards compatible matter (meaning a new minor version would be
> sufficient).

It seems there is no backwards compatible between new Interface
KafkaDynamicSource and privious Interface KafkaSource. As this FLIP
shows, the source state is incompatible between KafkaSource and
DynamicKafkaSource so it is recommended to reset all state or reset
partial state by setting a different uid and starting the application
from nonrestore state.[1]

However, it will not influence the current job in the previous
version. For Datastream jobs, it seems there will be no impact because
they will not call the new interface unless changes are made in the
code. For table jobs, the new FLIP-246 DynamicKafkaSource is not yet
being used.


We should pay more attention if we decide to migrate to the new
DynamicKafkaSource for table API
later on.


Yours

Hongshun

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320




On Fri, Jan 26, 2024 at 6:16 AM Martijn Visser 
wrote:

> Hi everyone,
>
> The latest version of the Flink Kafka connector that's available is
> currently v3.0.2, which is compatible with both Flink 1.17 and Flink 1.18.
>
> I would like to propose to create a release which is either v3.1, or v4.0
> (see below), with compatibility for Flink 1.17 and Flink 1.18. This newer
> version would contain many improvements [1] [2] like:
>
> * FLIP-246 Dynamic Kafka Source
> * FLIP-288 Dynamic Partition Discovery
> * Rack Awareness support
> * Kafka Record support for KafkaSink
> * Misc bug fixes and CVE issues
>
> If there are no objections, I would like to volunteer as release manager.
>
> The only thing why I'm not sure if this should be a v3.1 or a v4.0, is
> because I'm not 100% sure if FLIP-246 introduces incompatible API changes
> (requiring a new major version), or if the functionality was added in a
> backwards compatible matter (meaning a new minor version would be
> sufficient). I'm looping in Hongshun Wang and Leonard Xu to help clarify
> this.
>
> There's also a discussion happening in an open PR [3] on dropping support
> for Flink 1.18 afterwards (since this PR would add support for
> RecordEvaluator, which only exists in Flink 1.19). My proposal would be
> that after either v3.1 or v4.0 is released, we would indeed drop support
> for Flink 1.18 with that PR and the next Flink Kafka connector would be
> either v4.0 (if v3.1 is the next release) or v5.0 (if v4.0 is the next
> release).
>
> Best regards,
>
> Martijn
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353135
> [2]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352917
> [3]
>
> https://github.com/apache/flink-connector-kafka/pull/76#pullrequestreview-1844645464
>


Re: [ANNOUNCE] New Apache Flink Committer - Jiabao Sun

2024-02-20 Thread Hongshun Wang
Congratulations, Jiabao :)
Congratulations Jiabao!

Best,
Hongshun
Best regards,

Weijie

On Tue, Feb 20, 2024 at 2:19 PM Runkang He  wrote:

> Congratulations Jiabao!
>
> Best,
> Runkang He
>
> Jane Chan  于2024年2月20日周二 14:18写道:
>
> > Congrats, Jiabao!
> >
> > Best,
> > Jane
> >
> > On Tue, Feb 20, 2024 at 10:32 AM Paul Lam  wrote:
> >
> > > Congrats, Jiabao!
> > >
> > > Best,
> > > Paul Lam
> > >
> > > > 2024年2月20日 10:29,Zakelly Lan  写道:
> > > >
> > > >> Congrats! Jiabao!
> > >
> > >
> >
>


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-20 Thread Hongshun Wang
Congratulations!

Best,
Hongshun

On Tue, Mar 19, 2024 at 3:12 PM Shawn Huang  wrote:

> Congratulations!
>
> Best,
> Shawn Huang
>
>
> Xuannan Su  于2024年3月19日周二 14:40写道:
>
> > Congratulations! Thanks for all the great work!
> >
> > Best regards,
> > Xuannan
> >
> > On Tue, Mar 19, 2024 at 1:31 PM Yu Li  wrote:
> > >
> > > Congrats and thanks all for the efforts!
> > >
> > > Best Regards,
> > > Yu
> > >
> > > On Tue, 19 Mar 2024 at 11:51, gongzhongqiang <
> gongzhongqi...@apache.org>
> > wrote:
> > > >
> > > > Congrats! Thanks to everyone involved!
> > > >
> > > > Best,
> > > > Zhongqiang Gong
> > > >
> > > > Lincoln Lee  于2024年3月18日周一 16:27写道:
> > > >>
> > > >> The Apache Flink community is very happy to announce the release of
> > Apache
> > > >> Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19
> > series.
> > > >>
> > > >> Apache Flink® is an open-source stream processing framework for
> > > >> distributed, high-performing, always-available, and accurate data
> > streaming
> > > >> applications.
> > > >>
> > > >> The release is available for download at:
> > > >> https://flink.apache.org/downloads.html
> > > >>
> > > >> Please check out the release blog post for an overview of the
> > improvements
> > > >> for this bugfix release:
> > > >>
> >
> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
> > > >>
> > > >> The full release notes are available in Jira:
> > > >>
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353282
> > > >>
> > > >> We would like to thank all contributors of the Apache Flink
> community
> > who
> > > >> made this release possible!
> > > >>
> > > >>
> > > >> Best,
> > > >> Yun, Jing, Martijn and Lincoln
> >
>


Re: [NOTICE] Flink CDC is importing GitHub issues into Apache Jira

2024-03-20 Thread Hongshun Wang
Congratulations. Thanks to Qingsheng for driving this job forward.

Best,
Hongshun

On Wed, Mar 20, 2024 at 9:08 PM Qingsheng Ren  wrote:

> FYI: The auto-import has completed. Totally 137 issues are migrated from
> GitHub issue to Apache Jira. Sorry again for flushing your inbox!
>
> Best,
> Qingsheng
>
> On Wed, Mar 20, 2024 at 3:47 PM Qingsheng Ren  wrote:
>
> > Hi devs,
> >
> > We are in the process of finalizing the donation of Flink CDC as a
> > sub-project of Apache Flink. Given that all issues in the Flink project
> are
> > tracked via Jira, we will be migrating open tickets for Flink CDC from
> > GitHub issues to Jira.
> >
> > We plan to trigger the auto-import around 5pm (UTC+8), Mar 20, 2024.
> > During this import process, you may receive automated emails regarding
> the
> > creation of Jira tickets. We apologize for any inconvenience and
> potential
> > overflow in your inbox!
> >
> > We have made efforts to eliminate 500+ outdated and invalid issues.
> > However, we estimate that there are still around 100 issues that are
> > significant and worth retaining.
> >
> > Thanks your understanding and support! And special thanks to community
> > volunteers helping with issue cleanup: Hang Ruan, Zhongqiang Gong, He
> Wang,
> > Xin Gong, Yanquan Lyu and Jiabao Sun
> >
> > Best,
> > Qingsheng
> >
>


Re: [DISCUSS] Flink Website Menu Adjustment

2024-03-25 Thread Hongshun Wang
+1 for the proposal

Best Regards,
Hongshun Wang

On Tue, Mar 26, 2024 at 11:37 AM gongzhongqiang 
wrote:

> Hi Martijn,
>
> Thank you for your feedback.
>
> I agree with your point that we should make a one-time update to the menu,
> rather than continuously updating it. This will be done unless some
> sub-projects are moved or archived.
>
> Best regards,
>
> Zhongqiang Gong
>
>
> Martijn Visser  于2024年3月25日周一 23:35写道:
>
> > Hi Zhongqiang Gong,
> >
> > Are you suggesting to continuously update the menu based on the number of
> > releases, or just this one time? I wouldn't be in favor of continuously
> > updating: returning customers expect a certain order in the menu, and I
> > don't see a lot of value in continuously changing that. I do think that
> the
> > order that you have currently proposed is better then the one we have
> right
> > now, so I would +1 a one-time update but not a continuously updating
> order.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Mon, Mar 25, 2024 at 4:15 PM Yanquan Lv  wrote:
> >
> > > +1 for this proposal.
> > >
> > > gongzhongqiang  于2024年3月25日周一 15:49写道:
> > >
> > > > Hi everyone,
> > > >
> > > > I'd like to start a discussion on adjusting the Flink website [1]
> menu
> > to
> > > > improve accuracy and usability.While migrating Flink CDC
> documentation
> > > > to the website, I found outdated links, need to review and update
> menus
> > > > for the most relevant information for our users.
> > > >
> > > >
> > > > Proposal:
> > > >
> > > > - Remove Paimon [2] from the "Getting Started" and "Documentation"
> > menus:
> > > > Paimon [2] is now an independent top project of ASF. CC: jingsong
> lees
> > > >
> > > > - Sort the projects in the subdirectory by the activity of the
> > projects.
> > > > Here I list the number of releases for each project in the past year.
> > > >
> > > > Flink Kubernetes Operator : 7
> > > > Flink CDC : 5
> > > > Flink ML  : 2
> > > > Flink Stateful Functions : 1
> > > >
> > > >
> > > > Expected Outcome :
> > > >
> > > > - Menu "Getting Started"
> > > >
> > > > Before:
> > > >
> > > > With Flink
> > > >
> > > > With Flink Stateful Functions
> > > >
> > > > With Flink ML
> > > >
> > > > With Flink Kubernetes Operator
> > > >
> > > > With Paimon(incubating) (formerly Flink Table Store)
> > > >
> > > > With Flink CDC
> > > >
> > > > Training Course
> > > >
> > > >
> > > > After:
> > > >
> > > > With Flink
> > > > With Flink Kubernetes Operator
> > > >
> > > > With Flink CDC
> > > >
> > > > With Flink ML
> > > >
> > > > With Flink Stateful Functions
> > > >
> > > > Training Course
> > > >
> > > >
> > > > - Menu "Documentation" will same with "Getting Started"
> > > >
> > > >
> > > > I look forward to hearing your thoughts and suggestions on this
> > proposal.
> > > >
> > > > [1] https://flink.apache.org/
> > > > [2] https://github.com/apache/incubator-paimon
> > > > [3] https://github.com/apache/flink-statefun
> > > >
> > > >
> > > >
> > > > Best regards,
> > > >
> > > > Zhongqiang Gong
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-453: Promote Unified Sink API V2 to Public and Deprecate SinkFunction

2024-05-07 Thread Hongshun Wang
Hi Martijn, Thanks for the proposal +1 from me.Some sinks still use
sinkfunction; it's time to take a step forward.

Best,
Hongshun

On Mon, May 6, 2024 at 5:44 PM Leonard Xu  wrote:

> +1 from my side, thanks Martijn for the effort.
>
> Best,
> Leonard
>
> > 2024年5月4日 下午7:41,Ahmed Hamdy  写道:
> >
> > Hi Martijn
> > Thanks for the proposal +1 from me.
> > Should this change take place in 1.20, what are the planned release steps
> > for connectors that only offer a deprecated interface in this case (i.e.
> > RabbitMQ, Cassandra, pusbub, Hbase)? Are we going to refrain from
> releases
> > that support 1.20+ till the blockers are implemented?
> > Best Regards
> > Ahmed Hamdy
> >
> >
> > On Fri, 3 May 2024 at 14:32, Péter Váry 
> wrote:
> >
> >>> With regards to FLINK-35149, the fix version indicates a change at
> Flink
> >> CDC; is that indeed correct, or does it require a change in the SinkV2
> >> interface?
> >>
> >> The fix doesn't need change in SinkV2, so we are good there.
> >> The issue is that the new SinkV2
> SupportsCommitter/SupportsPreWriteTopology
> >> doesn't work with the CDC yet.
> >>
> >> Martijn Visser  ezt írta (időpont: 2024. máj.
> >> 3.,
> >> P, 14:06):
> >>
> >>> Hi Ferenc,
> >>>
> >>> You're right, 1.20 it is :)
> >>>
> >>> I've assigned the HBase one to you!
> >>>
> >>> Thanks,
> >>>
> >>> Martijn
> >>>
> >>> On Fri, May 3, 2024 at 1:55 PM Ferenc Csaky  >
> >>> wrote:
> >>>
>  Hi Martijn,
> 
>  +1 for the proposal.
> 
> > targeted for Flink 1.19
> 
>  I guess you meant Flink 1.20 here.
> 
>  Also, I volunteer to take updating the HBase sink, feel free to assign
>  that task to me.
> 
>  Best,
>  Ferenc
> 
> 
> 
> 
>  On Friday, May 3rd, 2024 at 10:20, Martijn Visser <
>  martijnvis...@apache.org> wrote:
> 
> >
> >
> > Hi Peter,
> >
> > I'll add it for completeness, thanks!
> > With regards to FLINK-35149, the fix version indicates a change at
> >>> Flink
> > CDC; is that indeed correct, or does it require a change in the
> >> SinkV2
> > interface?
> >
> > Best regards,
> >
> > Martijn
> >
> >
> > On Fri, May 3, 2024 at 7:47 AM Péter Váry
> >> peter.vary.apa...@gmail.com
> >
> > wrote:
> >
> >> Hi Martijn,
> >>
> >> We might want to add FLIP-371 [1] to the list. (Or we aim only for
>  higher
> >> level FLIPs?)
> >>
> >> We are in the process of using the new API in Iceberg connector
> >> [2] -
>  so
> >> far, so good.
> >>
> >> I know of one minor known issue about the sink [3], which should be
>  ready
> >> for the release.
> >>
> >> All-in-all, I think we are in good shape, and we could move forward
>  with
> >> the promotion.
> >>
> >> Thanks,
> >> Peter
> >>
> >> [1] -
> >>
> >>
> 
> >>>
> >>
> https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=263430387
> >> [2] - https://github.com/apache/iceberg/pull/10179
> >> [3] - https://issues.apache.org/jira/browse/FLINK-35149
> >>
> >> On Thu, May 2, 2024, 09:47 Muhammet Orazov
>  mor+fl...@morazow.com.invalid
> >> wrote:
> >>
> >>> Got it, thanks!
> >>>
> >>> On 2024-05-02 06:53, Martijn Visser wrote:
> >>>
>  Hi Muhammet,
> 
>  Thanks for joining the discussion! The changes in this FLIP
> >> would
>  be
>  targeted for Flink 1.19, since it's only a matter of changing
> >> the
>  annotation.
> 
>  Best regards,
> 
>  Martijn
> 
>  On Thu, May 2, 2024 at 7:26 AM Muhammet Orazov
>  mor+fl...@morazow.com
>  wrote:
> 
> > Hello Martijn,
> >
> > Thanks for the FLIP and detailed history of changes, +1.
> >
> > Would FLIP changes target for 2.0? I think it would be good
> > to have clear APIs on 2.0 release.
> >
> > Best,
> > Muhammet
> >
> > On 2024-05-01 15:30, Martijn Visser wrote:
> >
> >> Hi everyone,
> >>
> >> I would like to start a discussion on FLIP-453: Promote
>  Unified Sink
> >> API V2
> >> to Public and Deprecate SinkFunction
> >> https://cwiki.apache.org/confluence/x/rIobEg
> >>
> >> This FLIP proposes to promote the Unified Sink API V2 from
> >> PublicEvolving
> >> to Public and to mark the SinkFunction as Deprecated.
> >>
> >> I'm looking forward to your thoughts.
> >>
> >> Best regards,
> >>
> >> Martijn
> 
> >>>
> >>
>
>


Re: [DISCUSS] Flink CDC 3.2 Release Planning

2024-05-08 Thread Hongshun Wang
Thanks Qinsheng for driving,
+1 from my side.

Besi,
Hongshun

On Wed, May 8, 2024 at 11:41 PM Leonard Xu  wrote:

> +1 for the proposal code freeze date and RM candidate.
>
> Best,
> Leonard
>
> > 2024年5月8日 下午10:27,gongzhongqiang  写道:
> >
> > Hi Qingsheng
> >
> > Thank you for driving the release.
> > Agree with the goal and I'm willing to help.
> >
> > Best,
> > Zhongqiang Gong
> >
> > Qingsheng Ren  于2024年5月8日周三 14:22写道:
> >
> >> Hi devs,
> >>
> >> As we are in the midst of the release voting process for Flink CDC
> 3.1.0, I
> >> think it's a good time to kick off the upcoming Flink CDC 3.2 release
> >> cycle.
> >>
> >> In this release cycle I would like to focus on the stability of Flink
> CDC,
> >> especially for the newly introduced YAML-based data integration
> >> framework. To ensure we can iterate and improve swiftly, I propose to
> make
> >> 3.2 a relatively short release cycle, targeting a feature freeze by May
> 24,
> >> 2024.
> >>
> >> For developers that are interested in participating and contributing new
> >> features in this release cycle, please feel free to list your planning
> >> features in the wiki page [1].
> >>
> >> I'm happy to volunteer as a release manager and of course open to work
> >> together with someone on this.
> >>
> >> What do you think?
> >>
> >> Best,
> >> Qingsheng
> >>
> >> [1]
> >> https://cwiki.apache.org/confluence/display/FLINK/Flink+CDC+3.2+Release
> >>
>
>


Re: [DISCUSS] Connector releases for Flink 1.19

2024-05-12 Thread Hongshun Wang
Hello Danny,
Thanks for pushing this forward.  I am available to assist with the CDC
connector[1].

[1] https://github.com/apache/flink-cdc

Best
Hongshun

On Sun, May 12, 2024 at 8:48 PM Sergey Nuyanzin  wrote:

> I'm in a process of preparation of RC for OpenSearch connector
>
> however it seems I need PMC help: need to create opensearch-2.0.0 on jira
> since as it was proposed in another ML[1] to have 1.x for OpenSearch
> v1 and 2.x for OpenSearch v2
>
> would be great if someone from PMC could help here
>
> [1] https://lists.apache.org/thread/3w1rnjp5y612xy5k9yv44hy37zm9ph15
>
> On Wed, Apr 17, 2024 at 12:42 PM Ferenc Csaky
>  wrote:
> >
> > Thank you Danny and Sergey for pushing this!
> >
> > I can help with the HBase connector if necessary, will comment the
> > details to the relevant Jira ticket.
> >
> > Best,
> > Ferenc
> >
> >
> >
> >
> > On Wednesday, April 17th, 2024 at 11:17, Danny Cranmer <
> dannycran...@apache.org> wrote:
> >
> > >
> > >
> > > Hello all,
> > >
> > > I have created a parent Jira to cover the releases [1]. I have
> assigned AWS
> > > and MongoDB to myself and OpenSearch to Sergey. Please assign the
> > > relevant issue to yourself as you pick up the tasks.
> > >
> > > Thanks!
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-35131
> > >
> > > On Tue, Apr 16, 2024 at 2:41 PM Muhammet Orazov
> > > mor+fl...@morazow.com.invalid wrote:
> > >
> > > > Thanks Sergey and Danny for clarifying, indeed it
> > > > requires committer to go through the process.
> > > >
> > > > Anyway, please let me know if I can be any help.
> > > >
> > > > Best,
> > > > Muhammet
> > > >
> > > > On 2024-04-16 11:19, Danny Cranmer wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I have opened the VOTE thread for the AWS connectors release [1].
> > > > >
> > > > > > If I'm not mistaking (please correct me if I'm wrong) this
> request is
> > > > > > not
> > > > > > about version update it is about new releases for connectors
> > > > >
> > > > > Yes, correct. If there are any other code changes required then
> help
> > > > > would be appreciated.
> > > > >
> > > > > > Are you going to create an umbrella issue for it?
> > > > >
> > > > > We do not usually create JIRA issues for releases. That being said
> it
> > > > > sounds like a good idea to have one place to track the status of
> the
> > > > > connector releases and pre-requisite code changes.
> > > > >
> > > > > > I would like to work on this task, thanks for initiating it!
> > > > >
> > > > > The actual release needs to be performed by a committer. However,
> help
> > > > > getting the connectors building against Flink 1.19 and testing the
> RC
> > > > > is
> > > > > appreciated.
> > > > >
> > > > > Thanks,
> > > > > Danny
> > > > >
> > > > > [1]
> https://lists.apache.org/thread/0nw9smt23crx4gwkf6p1dd4jwvp1g5s0
> > > > >
> > > > > On Tue, Apr 16, 2024 at 6:34 AM Sergey Nuyanzin
> snuyan...@gmail.com
> > > > > wrote:
> > > > >
> > > > > > Thanks for volunteering Muhammet!
> > > > > > And thanks Danny for starting the activity.
> > > > > >
> > > > > > If I'm not mistaking (please correct me if I'm wrong)
> > > > > >
> > > > > > this request is not about version update it is about new
> releases for
> > > > > > connectors
> > > > > > btw for jdbc connector support of 1.19 and 1.20-SNAPSHOT is
> already
> > > > > > done
> > > > > >
> > > > > > I would volunteer for Opensearch connector since currently I'm
> working
> > > > > > on
> > > > > > support of Opensearch v2
> > > > > > and I think it would make sense to have a release after it is
> done
> > > > > >
> > > > > > On Tue, Apr 16, 2024 at 4:29 AM Muhammet Orazov
> > > > > > mor+fl...@morazow.com.invalid wrote:
> > > > > >
> > > > > > > Hello Danny,
> > > > > > >
> > > > > > > I would like to work on this task, thanks for initiating it!
> > > > > > >
> > > > > > > I could update the versions on JDBC and Pulsar connectors.
> > > > > > >
> > > > > > > Are you going to create an umbrella issue for it?
> > > > > > >
> > > > > > > Best,
> > > > > > > Muhammet
> > > > > > >
> > > > > > > On 2024-04-15 13:44, Danny Cranmer wrote:
> > > > > > >
> > > > > > > > Hello all,
> > > > > > > >
> > > > > > > > Flink 1.19 was released on 2024-03-18 [1] and the connectors
> have not
> > > > > > > > yet
> > > > > > > > caught up. I propose we start releasing the connectors with
> support
> > > > > > > > for
> > > > > > > > Flink 1.19 as per the connector support guidelines [2].
> > > > > > > >
> > > > > > > > I will make a start on flink-connector-aws, then pickup
> others in the
> > > > > > > > coming days. Please respond to the thread if you are/want to
> work on
> > > > > > > > a
> > > > > > > > particular connector to avoid duplicate work.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Danny
> > > > > > > >
> > > > > > > > [1]
> > > >
> > > >
> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
> > > >
> > > > > > > > [2]
> > > >
> > > >
> https://cwiki.apache.org/confl

Re: [ANNOUNCE] New Apache Flink Committer - Zhongqiang Gong

2024-06-17 Thread Hongshun Wang
Congrats Zhongqiang !

Best,
Hongshun

On Mon, Jun 17, 2024 at 12:35 PM Geng Biao  wrote:

> Congratulations, Zhongqiang!
> Best,
> Biao Geng
>
> 发送自 Outlook for iOS
> 
> 发件人: Zakelly Lan 
> 发送时间: Monday, June 17, 2024 12:11:47 PM
> 收件人: dev@flink.apache.org 
> 主题: Re: [ANNOUNCE] New Apache Flink Committer - Zhongqiang Gong
>
> Congratulations, Zhongqiang!
>
>
> Best,
> Zakelly
>
> On Mon, Jun 17, 2024 at 12:05 PM Shawn Huang  wrote:
>
> > Congratulations !
> >
> > Best,
> > Shawn Huang
> >
> >
> > Yuepeng Pan  于2024年6月17日周一 12:03写道:
> >
> > > Congratulations ! Best regards Yuepeng Pan
> > >
> > >
> > >
> > >
> > >
> > > At 2024-06-17 11:20:30, "Leonard Xu"  wrote:
> > > >Hi everyone,
> > > >On behalf of the PMC, I'm happy to announce that Zhongqiang Gong has
> > > become a new Flink Committer!
> > > >
> > > >Zhongqiang has been an active Flink community member since November
> > 2021,
> > > contributing numerous PRs to both the Flink and Flink CDC repositories.
> > As
> > > a core contributor to Flink CDC, he developed the Oracle and SQL Server
> > CDC
> > > Connectors and managed essential website and CI migrations during the
> > > donation of Flink CDC to Apache Flink.
> > > >
> > > >Beyond his technical contributions, Zhongqiang actively participates
> in
> > > discussions on the Flink dev mailing list and responds to threads on
> the
> > > user and user-zh mailing lists. As an Apache StreamPark (incubating)
> > > Committer, he promotes Flink SQL and Flink CDC technologies at meetups
> > and
> > > within the StreamPark community.
> > > >
> > > >Please join me in congratulating Zhongqiang Gong for becoming an
> Apache
> > > Flink committer!
> > > >
> > > >Best,
> > > >Leonard (on behalf of the Flink PMC)
> > >
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Hang Ruan

2024-06-17 Thread Hongshun Wang
Congratulations Hang!

Best,
Hongshun

On Mon, Jun 17, 2024 at 5:49 PM Ron Liu  wrote:

> Congratulations, Hang!
>
> Best,
> Ron
>
> Geng Biao  于2024年6月17日周一 12:35写道:
>
> > Congrats, Hang!
> > Best,
> > Biao Geng
> >
> > 发送自 Outlook for iOS
> > 
> > 发件人: Zakelly Lan 
> > 发送时间: Monday, June 17, 2024 12:12:10 PM
> > 收件人: dev@flink.apache.org 
> > 主题: Re: [ANNOUNCE] New Apache Flink Committer - Hang Ruan
> >
> > Congratulations, Hang!
> >
> >
> > Best,
> > Zakelly
> >
> > On Mon, Jun 17, 2024 at 12:07 PM Yanquan Lv 
> wrote:
> >
> > > Congratulations, Hang!
> > >
> > > Samrat Deb  于2024年6月17日周一 11:32写道:
> > >
> > > > Congratulations Hang Ruan !
> > > >
> > > > Bests,
> > > > Samrat
> > > >
> > > > On Mon, Jun 17, 2024 at 8:47 AM Leonard Xu 
> wrote:
> > > >
> > > > > Hi everyone,
> > > > > On behalf of the PMC, I'm happy to let you know that Hang Ruan has
> > > become
> > > > > a new Flink Committer !
> > > > >
> > > > > Hang Ruan has been continuously contributing to the Flink project
> > since
> > > > > August 2021. Since then, he has continuously contributed to Flink,
> > > Flink
> > > > > CDC, and various Flink connector repositories, including
> > > > > flink-connector-kafka, flink-connector-elasticsearch,
> > > > flink-connector-aws,
> > > > > flink-connector-rabbitmq, flink-connector-pulsar, and
> > > > > flink-connector-mongodb. Hang Ruan focuses on the improvements
> > related
> > > to
> > > > > connectors and catalogs and initiated FLIP-274. He is most
> recognized
> > > as
> > > > a
> > > > > core contributor and maintainer for the Flink CDC project,
> > contributing
> > > > > many features such as MySQL CDC newly table addition and the Schema
> > > > > Evolution feature.
> > > > >
> > > > > Beyond his technical contributions, Hang Ruan is an active member
> of
> > > the
> > > > > Flink community. He regularly engages in discussions on the Flink
> dev
> > > > > mailing list and the user-zh and user mailing lists, participates
> in
> > > FLIP
> > > > > discussions, assists with user Q&A, and consistently volunteers for
> > > > release
> > > > > verifications.
> > > > >
> > > > > Please join me in congratulating Hang Ruan for becoming an Apache
> > Flink
> > > > > committer!
> > > > >
> > > > > Best,
> > > > > Leonard (on behalf of the Flink PMC)
> > > >
> > >
> >
>


Re: Flink CDC insert-only changelog mode

2024-08-21 Thread Hongshun Wang
Hi Daniel,
>
>  There's no way to convert a different changelog stream to an insertonly
> stream so I'm pushing this upstream to the connector.

I wonder whether you just want the insert type changelog and ignore other
kinds of changelog, or just want to read all kinds of changelog as insert
in flink row data? If you want later, we need an `op_type` to distinguish
the type of changelog,  and users can filter depending on their need.

Best,
Hongshun
[1] https://issues.apache.org/jira/browse/FLINK-35067




On Thu, Aug 22, 2024 at 9:29 AM Daniel Henneberger 
wrote:

> Hey,
>
> I'd like to consume the postgres-cdc connector with an insert-only
> changelog. I'm uninterested in any retraction messages since our use case
> doesn't require it, and we can greatly benefit from the flink operations
> that insert-only changelog provides. E.g. I want to do a tumble window
> aggregation on the stream of events. We're using the table api exclusively
> and there's no way to convert a different changelog stream to an insert
> only stream so I'm pushing this upstream to the connector. I threw up a PR
> to showcase the implementation. Let me know what you think of this.
>
> Thanks,
> Daniel Henneberger
>
> https://github.com/apache/flink-cdc/pull/3562
>


Re: Flink CDC insert-only changelog mode

2024-08-25 Thread Hongshun Wang
Hi Daniel,
>  The changelog type needs to be
insert-only to invoke the correct streaming planner rules.

 I mean that we can read all kind of data from mysql(insert, upsert,
delete) as +I[optype, xxx]  in flink (such as +I[insert, 1,
'a'], +I[delete, 2, null]). If downstream only needs +[insert, xx], a
filter of optype is enough.

Besides that,  users can also do other things:
1.  count the total number of each op_type
2. Do logical delete action in sink with flink sql( update value of
is_delete in sink table if op_type is delete)

Best,
Hongshun



On Fri, Aug 23, 2024 at 2:45 AM Daniel Henneberger 
wrote:

> Unfortunately no, this would not be helpful. The changelog type needs to be
> insert-only to invoke the correct streaming planner rules.
>
> On Wed, Aug 21, 2024 at 9:01 PM Hongshun Wang 
> wrote:
>
> > Hi Daniel,
> > >
> > >  There's no way to convert a different changelog stream to an
> insertonly
> > > stream so I'm pushing this upstream to the connector.
> >
> > I wonder whether you just want the insert type changelog and ignore other
> > kinds of changelog, or just want to read all kinds of changelog as insert
> > in flink row data? If you want later, we need an `op_type` to distinguish
> > the type of changelog,  and users can filter depending on their need.
> >
> > Best,
> > Hongshun
> > [1] https://issues.apache.org/jira/browse/FLINK-35067
> >
> >
> >
> >
> > On Thu, Aug 22, 2024 at 9:29 AM Daniel Henneberger <
> > m...@danielhenneberger.com>
> > wrote:
> >
> > > Hey,
> > >
> > > I'd like to consume the postgres-cdc connector with an insert-only
> > > changelog. I'm uninterested in any retraction messages since our use
> case
> > > doesn't require it, and we can greatly benefit from the flink
> operations
> > > that insert-only changelog provides. E.g. I want to do a tumble window
> > > aggregation on the stream of events. We're using the table api
> > exclusively
> > > and there's no way to convert a different changelog stream to an insert
> > > only stream so I'm pushing this upstream to the connector. I threw up a
> > PR
> > > to showcase the implementation. Let me know what you think of this.
> > >
> > > Thanks,
> > > Daniel Henneberger
> > >
> > > https://github.com/apache/flink-cdc/pull/3562
> > >
> >
>


[jira] [Created] (FLINK-32591) Update document of Kafka Source: Enable Dynamic Partition Discovery by Default in Kafka Source

2023-07-14 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-32591:
-

 Summary: Update document of Kafka Source: Enable Dynamic Partition 
Discovery by Default in Kafka Source
 Key: FLINK-32591
 URL: https://issues.apache.org/jira/browse/FLINK-32591
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Hongshun Wang


Based on Flip 288,  dynamic partition discovery is enabled by Default in Kafka 
Source  now. some corresponding document in Chinese and English should be 
modified:
 * "Partition discovery is *disabled* by default. You need to explicitly set 
the partition discovery interval to enable this feature" in 
[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/]
 * 
h5. scan.topic-partition-discovery.interval is (none) in 
[https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32668) fix up watchdog timeout bug in common.sh(e2e test) ?

2023-07-25 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-32668:
-

 Summary: fix up watchdog timeout bug in common.sh(e2e test) ?
 Key: FLINK-32668
 URL: https://issues.apache.org/jira/browse/FLINK-32668
 Project: Flink
  Issue Type: Improvement
  Components: Build System / CI
Affects Versions: 1.17.1
Reporter: Hongshun Wang
 Fix For: 1.17.2
 Attachments: image-2023-07-25-15-27-37-441.png

When run e2e test, an error like this occrurs:

!image-2023-07-25-15-27-37-441.png|width=733,height=115!

then I find a problem in the corresponding code:

 
{code:java}
kill_test_watchdog() {
    local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid)
    echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
    kill $watchdog_pid
} 
internal_run_with_timeout() {
    local timeout_in_seconds="$1"
    local on_failure="$2"
    local command_label="$3"
    local command="${@:4}"

    on_exit kill_test_watchdog
   (
           command_pid=$BASHPID
           (sleep "${timeout_in_seconds}" # set a timeout for this command
            echo "${command_label:-"The command '${command}'"} (pid: 
$command_pid) did not finish after $timeout_in_seconds seconds."
eval "${on_failure}"
           kill "$command_pid") & watchdog_pid=$!
           echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid
           # invoke
          $command
  )

}{code}
 

When {{$command}} completes before the timeout, the watchdog process is killed 
successfully. However, when {{$command}} times out, the watchdog process kills 
{{$command}} and then exits itself, leaving behind an error message when trying 
to kill its own process ID with {{{}kill $watchdog_pid{}}}.

 

So, I will modify like this:

 
{code:java}
kill_test_watchdog() {
      local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid)
      if kill -0 $watchdog_pid > /dev/null 2>&1; then
           echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
           kill $watchdog_pid
      else
            echo "watchdog (with pid=$watchdog_pid) does not exist now"
      fi
} {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33465) Make SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving.

2023-11-06 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-33465:
-

 Summary: Make SingleThreadFetcherManager and 
FutureCompletingBlockingQueue as PublicEvolving.
 Key: FLINK-33465
 URL: https://issues.apache.org/jira/browse/FLINK-33465
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Parent
Affects Versions: 1.18.0
Reporter: Hongshun Wang
 Fix For: 1.19.0


As discussed in FLINK-31324, though the {{SingleThreadFetcherManager}} is 
annotated as {{{}Internal{}}}, it actually acts as some-degree public API, 
which is widely used in many connector projects:
[flink-cdc-connector|https://github.com/ververica/flink-cdc-connectors/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L93],
 
[flink-connector-mongodb|https://github.com/apache/flink-connector-mongodb/blob/main/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/MongoSourceReader.java#L58]
 and so on.

More over, even the constructor of 
`SingleThreadMultiplexSourceReaderBase`  (which is PublicEvolving) includes the 
params of `SingleThreadFetcherManager`  and `FutureCompletingBlockingQueue` 
.That means that the `SingleThreadFetcherManager` and 
`FutureCompletingBlockingQueue`have already been exposed to users for a long 
time and are widely used.
```java
public SingleThreadMultiplexSourceReaderBase(
FutureCompletingBlockingQueue> elementsQueue,
SingleThreadFetcherManager splitFetcherManager,
RecordEmitter recordEmitter,
Configuration config,
SourceReaderContext context) {
super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
}
```
 
Thus, why not make SingleThreadFetcherManager and FutureCompletingBlockingQueue 
PublicEvolving?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31208) KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits)

2023-02-23 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-31208:
-

 Summary: KafkaSourceReader overrides meaninglessly a 
method(pauseOrResumeSplits)
 Key: FLINK-31208
 URL: https://issues.apache.org/jira/browse/FLINK-31208
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Hongshun Wang


KafkaSourceReader overrides meaninglessly a method(pauseOrResumeSplits) 
,because no difference with its Parent class (SourceReaderBase). why not remove 
this override method?

 

Relative code is here, which we can see is no difference?
{code:java}
//org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#pauseOrResumeSplits
@Override
public void pauseOrResumeSplits(
Collection splitsToPause, Collection splitsToResume) {
splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
} 

//org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits
@Override
public void pauseOrResumeSplits(
Collection splitsToPause, Collection splitsToResume) {
splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31378) Documentation fails to build due to lack of package

2023-03-09 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-31378:
-

 Summary: Documentation fails to build due to lack of package
 Key: FLINK-31378
 URL: https://issues.apache.org/jira/browse/FLINK-31378
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Hongshun Wang


In [Project Configuration 
Section|[https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/configuration/overview/#running-and-packaging],]
 it shows that "If you want to run your job by simply executing the main class, 
you will need {{flink-runtime}} in your classpath". 

However, when I just add flink-runtime in my classPath, an error is thrown like 
this:"
No ExecutorFactory found to execute the application".

It seems that flink-clients is also needed to supply an excutor through Java 
Service Load.

Could you please add this in official article for beginners like me?

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31953) FLIP-288: Enable Dynamic Partition Discovery by Default in Kafka Source

2023-04-27 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-31953:
-

 Summary: FLIP-288: Enable Dynamic Partition Discovery by Default 
in Kafka Source
 Key: FLINK-31953
 URL: https://issues.apache.org/jira/browse/FLINK-31953
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Hongshun Wang
 Fix For: kafka-4.0.0


This improvement implements 
{{[[FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source]|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source]}}

 to enable partition discovery by default and set EARLIEST offset strategy for 
later discovered partitions.
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32019) EARLIEST offset strategy for partitions discoveried later based on FLIP-288

2023-05-05 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-32019:
-

 Summary: EARLIEST offset strategy for partitions discoveried later 
based on FLIP-288
 Key: FLINK-32019
 URL: https://issues.apache.org/jira/browse/FLINK-32019
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka
Affects Versions: kafka-3.0.0
Reporter: Hongshun Wang
 Fix For: kafka-4.0.0


As described in 
[FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source],
 the strategy used for new partitions is the same as the initial offset 
strategy, which is not reasonable.

According to the semantics, if the startup strategy is latest, the consumed 
data should include all data from the moment of startup, which also includes 
all messages from new created partitions. However, the latest strategy 
currently maybe used for new partitions, leading to the loss of some data 
(thinking a new partition is created and might be discovered by Kafka source 
several minutes later, and the message produced into the partition within the 
gap might be dropped if we use for example "latest" as the initial offset 
strategy).if the data from all new partitions is not read, it does not meet the 
user's expectations.

Other ploblems see final Section of 
[FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source]:
 {{User specifies OffsetsInitializer for new partition}} .

Therefore, it’s better to provide an *EARLIEST* strategy for later discovered 
partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32020) Enable Dynamic Partition Discovery by Default in Kafka Source based on FLIP-288

2023-05-05 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-32020:
-

 Summary: Enable Dynamic Partition Discovery by Default in Kafka 
Source based on FLIP-288
 Key: FLINK-32020
 URL: https://issues.apache.org/jira/browse/FLINK-32020
 Project: Flink
  Issue Type: Sub-task
Reporter: Hongshun Wang


As described in 
[FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source],
 dynamic partition discovery is disabled by default, and users have to specify 
the interval of discovery in order to turn it on.

This subtask is to enable Dynamic Partition Discovery by Default in Kafka 
Source.

Partition discovery is performed on the KafkaSourceEnumerator, which 
asynchronously fetches topic metadata from the Kafka cluster and checks if 
there are any new topics and partitions. This should not cause performance 
issues on the Flink side.

On the Kafka broker side, partition discovery sends a MetadataRequest to the 
Kafka broker to fetch topic information. Considering that the Kafka broker has 
its metadata cache and the default request frequency is relatively low (once 
every 30 seconds), this is not a heavy operation, and the broker's performance 
will not be significantly affected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32021) Improvement the Javadoc for SpecifiedOffsetsInitializer and TimestampOffsetsInitializer.

2023-05-05 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-32021:
-

 Summary: Improvement the Javadoc for SpecifiedOffsetsInitializer 
and TimestampOffsetsInitializer.
 Key: FLINK-32021
 URL: https://issues.apache.org/jira/browse/FLINK-32021
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka
Affects Versions: kafka-3.0.0
Reporter: Hongshun Wang
 Fix For: kafka-4.0.0


As described in 
[FLIP-288|https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source],
 Current JavaDoc does not fully explain the behavior of OffsetsInitializers. 
When the partition does not meet the condition, there will be a different 
offset strategy. This may lead to misunderstandings in the design and usage.

 

Add to SpecifiedOffsetsInitializer: "Use Specified offset for specified 
partitions while use commit offset or Earliest for unspecified partitions. 
Specified partition offset should be less than the latest offset, otherwise it 
will start from the earliest."

 

Add to TimestampOffsetsInitializer:Initialize the offsets based on a timestamp. 
If the message meeting the requirement of the timestamp have not been produced 
to Kafka yet, just use the latest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34129) correct writing: MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when state expired

2024-01-17 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-34129:
-

 Summary: correct writing: MiniBatchGlobalGroupAggFunction will 
make -D as +I then make +I as -U when state expired 
 Key: FLINK-34129
 URL: https://issues.apache.org/jira/browse/FLINK-34129
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.18.1
Reporter: Hongshun Wang
 Fix For: 1.19.0


Take sum for example:
When state is expired, then an update operation from source happens. 
MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but will 
emit +I[1, -20] and -D[1, -20]. The sink will detele the data from external 
database.



Let's see why this will happens: # when state is expired and -U[1, 20] arrive, 
MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set 
firstRow as true.
{code:java}
if (stateAcc == null) { 
stateAcc = globalAgg.createAccumulators(); 
firstRow = true; 
} 
{code}

 # then sum accumulator will retract sum value as -20
 # As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, then 
emit to downstream.
{code:java}
if (!recordCounter.recordCountIsZero(acc)) {
   // if this was not the first row and we have to emit retractions
    if (!firstRow) {
       // ignore
    } else {
    // update acc to state
    accState.update(acc);
 
   // this is the first, output new result
   // prepare INSERT message for new row
   resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
   out.collect(resultRow);
}  {code}

 # when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, so

RetractionRecordCounter#recordCountIsZero will return true. Because firstRow = 
false now, will change the +U as -D, then emit to downtream
{code:java}
if (!recordCounter.recordCountIsZero(acc)) {
// ignode
}else{
   // we retracted the last record for this key
   // if this is not first row sent out a DELETE message
   if (!firstRow) {
   // prepare DELETE message for previous row
   resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
   out.collect(resultRow);
}{code}
 
So the sink will receiver +I and -D after a source update operation, the data 
will be delete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34196) FLIP-389 Annotate SingleThreadFetcherManager as PublicEvolving.

2024-01-22 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-34196:
-

 Summary: FLIP-389 Annotate SingleThreadFetcherManager as 
PublicEvolving.
 Key: FLINK-34196
 URL: https://issues.apache.org/jira/browse/FLINK-34196
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 1.18.1
Reporter: Hongshun Wang
 Fix For: 1.19.0


This improvement implements 
[FLIP-389](|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498]

This flip has 2 goals:
 * To expose the SplitFetcherManager / SingleThreadFetcheManager as Public, 
allowing connector developers to easily create their own threading models in 
the SourceReaderBase.
 * To hide the element queue from the connector developers and make 
SplitFetcherManager the only owner class of the queue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed

2024-03-08 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-34634:
-

 Summary: Restarting the job will not read the changelog anymore if 
it stops before the synchronization of meta information is complete and some 
table is removed
 Key: FLINK-34634
 URL: https://issues.apache.org/jira/browse/FLINK-34634
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang
 Fix For: cdc-3.1.0
 Attachments: image-2024-03-09-15-25-26-187.png, 
image-2024-03-09-15-27-46-073.png

Once, I removed a table from the option and then restarted the job from the 
savepoint, but the job couldn't read the binlog anymore. When I checked the 
logs, I found an Error level log stating:

' The enumerator received invalid request meta group id 6, the valid meta group 
id range is [0, 4].'

It appears that the Reader is requesting more splits than the Enumerator is 
aware of.

However, the code should indeed remove redundant split information from the 
Reader as seen in 
[https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does this 
issue occur?

!image-2024-03-09-15-25-26-187.png!

Upon examining the code, I discovered the cause. If the job stops before 
completing all the split meta information and then restarts, this issue occurs. 
Suppose that the totalFinishedSplitSize of binlogSplit in the Reader is 6, and 
no meta information has been synchronized, leaving the 
finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After 
restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 - 
(0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed 
table have two split). This could lead to an out-of-range request.

!image-2024-03-09-15-27-46-073.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34666) Keep assigned splits in order to fix wrong meta group calculation

2024-03-14 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-34666:
-

 Summary: Keep assigned splits in order to fix wrong meta group 
calculation
 Key: FLINK-34666
 URL: https://issues.apache.org/jira/browse/FLINK-34666
 Project: Flink
  Issue Type: Improvement
Reporter: Hongshun Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34688) CDC framework split snapshot chunks asynchronously

2024-03-14 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-34688:
-

 Summary: CDC framework split snapshot chunks asynchronously
 Key: FLINK-34688
 URL: https://issues.apache.org/jira/browse/FLINK-34688
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang
 Fix For: cdc-3.1.0


In Mysql CDC,  MysqlSnapshotSplitAssigner splits snapshot chunks 
asynchronously([https://github.com/apache/flink-cdc/pull/931).] But CDC 
framework lacks it.

If table is too big to split, the enumerator will be stuck, and checkpoint will 
be influenced( sometime will checkpoint timeout occurs).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34690) If data from upstream is decimal and primary key , starrocks sink will not support.

2024-03-14 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-34690:
-

 Summary: If data from upstream is decimal and primary key , 
starrocks sink will not support.
 Key: FLINK-34690
 URL: https://issues.apache.org/jira/browse/FLINK-34690
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Hongshun Wang
 Fix For: cdc-3.1.0


If data from upstream is decimal and primary key , starrocks sink will not 
support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34724) Fix broken web link in document of legacy Flink CDC Sources

2024-03-18 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-34724:
-

 Summary: Fix broken web link in document of  legacy Flink CDC 
Sources
 Key: FLINK-34724
 URL: https://issues.apache.org/jira/browse/FLINK-34724
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Flink CDC
Reporter: Hongshun Wang
 Fix For: cdc-3.1.0


In current document, many link is broken due to directory structure changes, 
such as:
 * 
http://localhost:1313/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/overview/mongodb-cdc.md
 * 
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mongodb-cdc/3.0-SNAPSHOT/flink-sql-connector-mongodb-cdc-3.0-SNAPSHOT.jar



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34952) Flink CDC pipeline supports SourceFunction

2024-03-27 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-34952:
-

 Summary: Flink CDC pipeline supports SourceFunction 
 Key: FLINK-34952
 URL: https://issues.apache.org/jira/browse/FLINK-34952
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang
 Fix For: cdc-3.1.0


Though current Flink CDC pipeline define com.ververica.cdc.common.sink.
FlinkSinkFunctionProvider to to provide a Flink SinkFunction for writing events 
to external systems. However, 
com.ververica.cdc.runtime.operators.sink.DataSinkWriterOperator don't support 
SouceFunction, which means sink implement SinkFunction cannot use CDC pipeline.
Why not support  SourceFunction in Flink CDC pipeline ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35067) Support metadata 'op_type' virtual column for Postgres CDC Connector.

2024-04-09 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35067:
-

 Summary:  Support metadata 'op_type' virtual column for Postgres 
CDC Connector. 
 Key: FLINK-35067
 URL: https://issues.apache.org/jira/browse/FLINK-35067
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang


Like [https://github.com/apache/flink-cdc/pull/2913,] Support metadata 
'op_type' virtual column for Postgres CDC Connector. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35071) Remove dependency on flink-shaded from cdc source connector

2024-04-09 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35071:
-

 Summary: Remove dependency on flink-shaded from cdc source 
connector
 Key: FLINK-35071
 URL: https://issues.apache.org/jira/browse/FLINK-35071
 Project: Flink
  Issue Type: Improvement
Reporter: Hongshun Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35093) Postgres source connector support SPECIFIC_OFFSETS start up mode from an existed replication slot.

2024-04-11 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35093:
-

 Summary: Postgres source connector support SPECIFIC_OFFSETS start 
up mode from an existed replication slot.
 Key: FLINK-35093
 URL: https://issues.apache.org/jira/browse/FLINK-35093
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang


Current, Postgres source connector  only support INITIAL and LATEST mode.

However, sometimes, user want to restart from an existed replication slot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35121) CDC pipeline connector should verify requiredOptions and optionalOptions

2024-04-16 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35121:
-

 Summary: CDC pipeline connector should verify requiredOptions and 
optionalOptions
 Key: FLINK-35121
 URL: https://issues.apache.org/jira/browse/FLINK-35121
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang


At present, though we provide 
org.apache.flink.cdc.common.factories.Factory#requiredOptions and 
org.apache.flink.cdc.common.factories.Factory#optionalOptions, but both are not 
used anywhere. This means not verifying requiredOptions and optionalOptions.

Thus, like what DynamicTableFactory does, provide 
FactoryHelper to help verify requiredOptions and optionalOptions.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35128) Re-calculate the starting binlog offset after the new table added

2024-04-16 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35128:
-

 Summary: Re-calculate the starting binlog offset after the new 
table added
 Key: FLINK-35128
 URL: https://issues.apache.org/jira/browse/FLINK-35128
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang
 Fix For: 3.1.0


In mysql cdc, re-calculate the starting binlog offset after the new table added 
in MySqlBinlogSplit#appendFinishedSplitInfos, while there lack of same action 
in StreamSplit#appendFinishedSplitInfos. This will cause data loss if any newly 
added table snapshot split's highwatermark is smaller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35129) Postgres source commits the offset after every multiple checkpoint cycles.

2024-04-16 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35129:
-

 Summary: Postgres source commits the offset after every multiple 
checkpoint cycles.
 Key: FLINK-35129
 URL: https://issues.apache.org/jira/browse/FLINK-35129
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang


After entering the Stream phase, the offset consumed by the global slot is 
committed upon the completion of each checkpoint, preventing log files from 
being unable to be recycled continuously, which could lead to insufficient disk 
space.

However, the job can only restart from the latest checkpoint or savepoint. if 
restored from an earlier state, WAL may already have been recycled.

 

The way to solve it is to commit the offset after every multiple checkpoint 
cycles. The number of checkpoint cycles is determine by connector option, and 
the default value is 3.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35143) Expose newly added tables capture in mysql pipeline connector

2024-04-17 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35143:
-

 Summary: Expose newly added tables capture in mysql pipeline 
connector
 Key: FLINK-35143
 URL: https://issues.apache.org/jira/browse/FLINK-35143
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang


Currently, mysql pipeline connector still don't allowed to capture newly added 
tables.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink

2024-04-17 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35149:
-

 Summary: Fix DataSinkTranslator#sinkTo ignoring pre-write topology 
if not TwoPhaseCommittingSink
 Key: FLINK-35149
 URL: https://issues.apache.org/jira/browse/FLINK-35149
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Hongshun Wang
 Fix For: 3.1.0


Current , when sink is not instanceof TwoPhaseCommittingSink, use 
input.transform rather than stream. It means that pre-write topology will be 
ignored.
{code:java}
private void sinkTo(
DataStream input,
Sink sink,
String sinkName,
OperatorID schemaOperatorID) {
DataStream stream = input;
// Pre write topology
if (sink instanceof WithPreWriteTopology) {
stream = ((WithPreWriteTopology) 
sink).addPreWriteTopology(stream);
}

if (sink instanceof TwoPhaseCommittingSink) {
addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
} else {
input.transform(
SINK_WRITER_PREFIX + sinkName,
CommittableMessageTypeInfo.noOutput(),
new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
}
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35234) Fix NullPointerException of org.apache.flink.cdc.common.configuration.ConfigurationUtils#convertToString

2024-04-24 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35234:
-

 Summary: Fix NullPointerException of 
org.apache.flink.cdc.common.configuration.ConfigurationUtils#convertToString
 Key: FLINK-35234
 URL: https://issues.apache.org/jira/browse/FLINK-35234
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Exception like this:
{code:java}
Caused by: java.lang.NullPointerExceptionat 
org.apache.flink.cdc.common.configuration.ConfigurationUtils.convertToString(ConfigurationUtils.java:133)
 ~[?:?]at 
org.apache.flink.cdc.common.configuration.Configuration.toMap(Configuration.java:138)
 ~[?:?] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35281) FlinkEnvironmentUtils#addJar add each jar only once

2024-05-01 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35281:
-

 Summary: FlinkEnvironmentUtils#addJar add each jar only once
 Key: FLINK-35281
 URL: https://issues.apache.org/jira/browse/FLINK-35281
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Current org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils#addJar will 
be invoked for each source and sink.
{code:java}

public static void addJar(StreamExecutionEnvironment env, URL jarUrl) {
try {
Class envClass = 
StreamExecutionEnvironment.class;
Field field = envClass.getDeclaredField("configuration");
field.setAccessible(true);
Configuration configuration = ((Configuration) field.get(env));
List jars =
configuration.getOptional(PipelineOptions.JARS).orElse(new 
ArrayList<>());
jars.add(jarUrl.toString());
configuration.set(PipelineOptions.JARS, jars);
} catch (Exception e) {
throw new RuntimeException("Failed to add JAR to Flink execution 
environment", e);
} {code}
if multiple source or sink share same jar, the par path will be added repeatly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35327) SQL Explain show push down condition

2024-05-09 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35327:
-

 Summary: SQL Explain show push down condition 
 Key: FLINK-35327
 URL: https://issues.apache.org/jira/browse/FLINK-35327
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: Hongshun Wang
 Fix For: 1.20.0


Current, we can not determine whether filter/limit/partition condition is 
pushed down to source. For example, we can only know filter condition is pushed 
down if it is not included in Filter any more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35344) Move same code from multiple subclasses to JdbcSourceChunkSplitter

2024-05-13 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35344:
-

 Summary: Move same code from multiple subclasses to 
JdbcSourceChunkSplitter
 Key: FLINK-35344
 URL: https://issues.apache.org/jira/browse/FLINK-35344
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Current, subclasses of JdbcSourceChunkSplitter almost share same code, but each 
have one copy. It's hard for later maintenance. 

Thus, this Jira aim to move same code from multiple subclasses to 
JdbcSourceChunkSplitter, just like what have done in 
AbstractScanFetchTask(https://github.com/apache/flink-cdc/issues/2690)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35349) Use connection in openJdbcConnection of SqlServerDialect/Db2Dialect/OracleDialect

2024-05-13 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35349:
-

 Summary: Use connection in openJdbcConnection of 
SqlServerDialect/Db2Dialect/OracleDialect
 Key: FLINK-35349
 URL: https://issues.apache.org/jira/browse/FLINK-35349
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Current, some dialect's `openJdbcConnection`  create connection without 
connection pool. It means that will create a new connection each time.

Howver , openJdbcConnection is used in generateSplits now, which means that 
enumerator will create a new connection for once split. A big table will create 
connection again and again.
{code:java}
public Collection generateSplits(TableId tableId) {
try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {

LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();

Table table =
Objects.requireNonNull(dialect.queryTableSchema(jdbc, 
tableId)).getTable();
Column splitColumn = getSplitColumn(table, 
sourceConfig.getChunkKeyColumn());
final List chunks; {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35385) upgrader flink dependency version to 1.19

2024-05-16 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35385:
-

 Summary: upgrader flink dependency version to 1.19
 Key: FLINK-35385
 URL: https://issues.apache.org/jira/browse/FLINK-35385
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Flink 1.19 was released on 2024-03-18  and the connectors have not yet
caught up. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35387) PG CDC source support heart beat

2024-05-16 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35387:
-

 Summary: PG CDC source support heart beat
 Key: FLINK-35387
 URL: https://issues.apache.org/jira/browse/FLINK-35387
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Though, document of PG CDC [1] has heartbeat.interval.ms, but it's not valid. 
The reason is bellow.

In debezium dos says: For the connector to detect and process events from a 
heartbeat table, you must add the table to the PostgreSQL publication specified 
by the 
[publication.name|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-publication-name]
 property. If this publication predates your Debezium deployment, the connector 
uses the publications as defined. If the publication is not already configured 
to automatically replicate changes {{FOR ALL TABLES}} in the database, you must 
explicitly add the heartbeat table to the publication[2].

Thus, if you want use heart beat in cdc:

1. add a heartbeat table to publication: ALTER PUBLICATION __ 
ADD TABLE {_}{_};

2. set heartbeatInterval

3. add 
debezium.[{{heartbeat.action.query}}|https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query]
 [3]

 

However, when I use it it CDC, some exception occurs:
{code:java}
Caused by: java.lang.NullPointerException
at 
io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:127)
at io.debezium.pipeline.EventDispatcher.(EventDispatcher.java:94){code}
!https://alidocs.dingtalk.com/core/api/resources/img/5eecdaf48460cde5292b7c63c883d1620bbf7d3875a3a5b158e70b814913bc360a414d3de9277d871abf3af1cbd75249eddaaa1b37c2b2f5421a918fb1a2f0f3853c0ce41721e620699d98626fa2281948c58faa63edf8ebfc653b69905bac42?tmpCode=9193555a-7bf3-4335-9427-b59c1dfe1931!

 

It seems CDC don't add  a HeartbeatConnectionProvider  when configure 
PostgresEventDispatcher:
{code:java}
//org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configure
this.postgresDispatcher =
                new PostgresEventDispatcher<>(
                        dbzConfig,
                        topicSelector,
                        schema,
                        queue,
                        dbzConfig.getTableFilters().dataCollectionFilter(),
                        DataChangeEvent::new,
                        metadataProvider,
                        schemaNameAdjuster); {code}
in debezium, when PostgresConnectorTask start, it will  do it
{code:java}
//io.debezium.connector.postgresql.PostgresConnectorTask#start
  final PostgresEventDispatcher dispatcher = new 
PostgresEventDispatcher<>(
                    connectorConfig,
                    topicNamingStrategy,
                    schema,
                    queue,
                    connectorConfig.getTableFilters().dataCollectionFilter(),
                    DataChangeEvent::new,
                    PostgresChangeRecordEmitter::updateSchema,
                    metadataProvider,
                    connectorConfig.createHeartbeat(
                            topicNamingStrategy,
                            schemaNameAdjuster,
                            () -> new 
PostgresConnection(connectorConfig.getJdbcConfig(), 
PostgresConnection.CONNECTION_GENERAL),
                            exception -> {
                                String sqlErrorId = exception.getSQLState();
                                switch (sqlErrorId) {
                                    case "57P01":
                                        // Postgres error admin_shutdown, see 
https://www.postgresql.org/docs/12/errcodes-appendix.html
                                        throw new DebeziumException("Could not 
execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
                                    case "57P03":
                                        // Postgres error cannot_connect_now, 
see https://www.postgresql.org/docs/12/errcodes-appendix.html
                                        throw new RetriableException("Could not 
execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
                                    default:
                                        break;
                                }
                            }),
                    schemaNameAdjuster,
                    signalProcessor); {code}
Thus, this jira will add this.

 

 [1] 
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/

[2] 
https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql

[jira] [Created] (FLINK-35524) Clear connections pools when reader exist.

2024-06-05 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35524:
-

 Summary: Clear connections pools when reader exist.
 Key: FLINK-35524
 URL: https://issues.apache.org/jira/browse/FLINK-35524
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Current, inJdbcConnectionPools is static instance, so the datasource pools in 
it won't be recycle when reader close. It will cause memory leak.

```java
public class JdbcConnectionPools implements ConnectionPools {

private static final Logger LOG = 
LoggerFactory.getLogger(JdbcConnectionPools.class);

private static JdbcConnectionPools instance;
private final Map pools = new HashMap<>();
private static final Map POOL_FACTORY_MAP = 
new HashMap<>();
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error

2024-06-11 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35567:
-

 Summary: CDC BinaryWriter cast NullableSerializerWrapper error 
 Key: FLINK-35567
 URL: https://issues.apache.org/jira/browse/FLINK-35567
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Hongshun Wang
 Fix For: cdc-3.1.1


Current, we will generate data type serializers by 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]),
 which will put into a 
NullableSerializerWrapper.
{code:java}
//代码占位符
public BinaryRecordDataGenerator(DataType[] dataTypes) {
this(
dataTypes,
Arrays.stream(dataTypes)
.map(InternalSerializers::create)
.map(NullableSerializerWrapper::new)
.toArray(TypeSerializer[]::new));
} {code}
However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast 
NullableSerializerWrapper to 
ArrayDataSerializer/TypeSerializer/TypeSerializer.
A exception will be thrown:
{code:java}
java.lang.ClassCastException: 
org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be 
cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer
at 
org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134)
at 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35693) Change variable specificOffsetFile, specificOffsetPos and startupTimestampMillis to Offset in StartupOptions

2024-06-25 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35693:
-

 Summary: Change variable specificOffsetFile, specificOffsetPos and 
startupTimestampMillis to Offset in StartupOptions
 Key: FLINK-35693
 URL: https://issues.apache.org/jira/browse/FLINK-35693
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Current, StartupOptions use specificOffsetFile, specificOffsetPos and  
startupTimestampMillis to describe specific offset or timestamp. However, it's 
suitable for mysql, rather than postgres's LSN or oracle's RedoLogOffset.
{code:java}
public final class StartupOptions implements Serializable {
private static final long serialVersionUID = 1L;

public final StartupMode startupMode;
public final String specificOffsetFile;
public final Integer specificOffsetPos;
public final Long startupTimestampMillis;
} {code}
Now that we have already retract Offset to represent the position of log, why 
not use offset in StartupOptions?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35715) Mysql Source support schema cache to deserialize record

2024-06-27 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35715:
-

 Summary: Mysql Source support schema cache to deserialize record
 Key: FLINK-35715
 URL: https://issues.apache.org/jira/browse/FLINK-35715
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


 

Current, DebeziumEventDeserializationSchema will deserialize each record with 
schema inferred by this record.

 
{code:java}
private RecordData extractDataRecord(Struct value, Schema valueSchema) throws 
Exception {
DataType dataType = schemaDataTypeInference.infer(value, valueSchema);
return (RecordData) getOrCreateConverter(dataType).convert(value, 
valueSchema);
}
 {code}
There are some issues:
 # Inferring and creating a converter as soon as a record arrives will incur 
additional costs.
 # Inferring from a record might not reflect the real table schema accurately. 
For instance, a timestamp type with precision 6 in MySQL might have a value 
with 0 nanoseconds of the millisecond. When inferred, it will appear to have a 
precision of 0.

{code:java}
protected DataType inferString(Object value, Schema schema) {
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
int nano =
Optional.ofNullable((String) value)
.map(s -> ZonedTimestamp.FORMATTER.parse(s, 
Instant::from))
.map(Instant::getNano)
.orElse(0);

int precision;
if (nano == 0) {
precision = 0;
} else if (nano % 1000 > 0) {
precision = 9;
} else if (nano % 1000_000 > 0) {
precision = 6;
} else if (nano % 1000_000_000 > 0) {
precision = 3;
} else {
precision = 0;
}
return DataTypes.TIMESTAMP_LTZ(precision);
}
return DataTypes.STRING();
} {code}
However, timestamps with different precisions will have different data formats 
in BinaryRecordData. Placing data with a timestamp of 0 precision and then 
parsing it with a precision of 6 will result in an exception being thrown.

 
{code:java}
//org.apache.flink.cdc.common.data.binary.BinaryRecordData#getTimestamp
@Override
public TimestampData getTimestamp(int pos, int precision) {
assertIndexIsValid(pos);

if (TimestampData.isCompact(precision)) {
return 
TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
}

int fieldOffset = getFieldOffset(pos);
final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readTimestampData(segments, offset, 
offsetAndNanoOfMilli);
} {code}
Thus, I think we should cache the table schema in Source, and only update it 
with SchemaChangeRecord. Thus, the schema of source 
SourceRecordEventDeserializer is always same with database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35759) [docs] Add 'scan.incremental.snapshot.backfill.skip' to docs.

2024-07-04 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35759:
-

 Summary:  [docs] Add 'scan.incremental.snapshot.backfill.skip' to 
docs. 
 Key: FLINK-35759
 URL: https://issues.apache.org/jira/browse/FLINK-35759
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


scan.incremental.snapshot.backfill.skip has been exposed for a long time, but 
lack docs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35760) [docs] Add scan.newly-added-table.enabled to docs.

2024-07-04 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35760:
-

 Summary: [docs] Add scan.newly-added-table.enabled to docs.
 Key: FLINK-35760
 URL: https://issues.apache.org/jira/browse/FLINK-35760
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


scan.newly-added-table.enabled  has been exposed for a long time, but lacks of 
docs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35849) [flink-cdc] Use expose_snapshot to read snapshot of postgres cdc connector.

2024-07-15 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35849:
-

 Summary: [flink-cdc] Use expose_snapshot to read snapshot of 
postgres cdc connector.
 Key: FLINK-35849
 URL: https://issues.apache.org/jira/browse/FLINK-35849
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.3.0


In current postgres cdc connector, we use incremental framework to read 
data[1], which include the following step:
 # create a global slot in case that the wal log be recycle.
 # Enumerator split the table into multiple chunks(named "snapshot split" in 
cdc), than assigned this snapshot splits to the readers.
 # The read read the snapshot data of the snapshot split and backfill log. Each 
reader need a temporary slot to read log.
 # when all snapshot snapshots are finished, enumerator will send a stream 
split to reader. The one reader will read log.

 

However, read backfill log will also increase burden in source database. For 
example, the Postgres cdc connector will establish many logical replication 
connections to the Postgres database, which can easily reach the max_sender_num 
or max_slot_number limit. Assuming there are 10 Postgres cdc sources and each 
runs 4 parallel processes, a total of 10*(4+1) = 50 replication connections 
will be created.In many situations, the sink databases provides idempotence. 
Therefore, We can also support at-least-once semantics by skipping the backfill 
period, which will reduce budget on the source databases. Users can choose 
between at-least-once or exactly-once based on their demands.[2]

 
The two methods make a tradeoff between semantics and performance. Is there any 
other method to do well in both?

It seems expose_snapshot[3] can do both. When creating global slot, we can save 
the the snapshot name, and search it in snapshot split reading(thus no need to 
read backfill log). Then we just read the wal-log based on global slot. It can 
also provide exactly-once semantics. 
And expose_snapshot is also a default behavior when create a new replication 
slot, thus will not occur other side effects .
 
 
 
 

 

[1] [https://github.com/apache/flink-cdc/pull/2216]

 [2][https://github.com/apache/flink-cdc/issues/2553]

 [3] [https://www.postgresql.org/docs/14/protocol-replication.html]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35859) [flink-cdc] Fix: The assigner is not ready to offer finished split information, this should not be called

2024-07-17 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35859:
-

 Summary: [flink-cdc] Fix: The assigner is not ready to offer 
finished split information, this should not be called
 Key: FLINK-35859
 URL: https://issues.apache.org/jira/browse/FLINK-35859
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


When use CDC with newly added table,  an error occurs: 
{code:java}
The assigner is not ready to offer finished split information, this should not 
be called. {code}
It's because:
1. when stop then restart the job , the status is 
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED.
 
2. Then Enumerator will send each reader with 
BinlogSplitUpdateRequestEvent to update binlog. (see 
org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator#syncWithReaders).
3. The Reader will suspend binlog reader then send BinlogSplitMetaRequestEvent 
to Enumerator.
4. The Enumerator found that some tables are not sent, an error will occur
{code:java}
private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent 
requestEvent) {
// initialize once
if (binlogSplitMeta == null) {
final List finishedSnapshotSplitInfos =
splitAssigner.getFinishedSplitInfos();
if (finishedSnapshotSplitInfos.isEmpty()) {
LOG.error(
"The assigner offers empty finished split information, this 
should not happen");
throw new FlinkRuntimeException(
"The assigner offers empty finished split information, this 
should not happen");
}
binlogSplitMeta =
Lists.partition(
finishedSnapshotSplitInfos, 
sourceConfig.getSplitMetaGroupSize());
   } 
}{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35968) Remove flink-cdc-runtime depedency from connectors

2024-08-02 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35968:
-

 Summary: Remove flink-cdc-runtime depedency from connectors
 Key: FLINK-35968
 URL: https://issues.apache.org/jira/browse/FLINK-35968
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Current, flink-cdc-source-connectors and flink-cdc-pipeline-connectors depends 
on flink-cdc-runtime, which is not ideal for design and is redundant.

This issue is aimed to remove it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35994) CDC pipeline use SavepointRestoreSettings in flink config flile as a fallback strategy.

2024-08-07 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35994:
-

 Summary: CDC pipeline use SavepointRestoreSettings in flink config 
flile as a fallback strategy.
 Key: FLINK-35994
 URL: https://issues.apache.org/jira/browse/FLINK-35994
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Hongshun Wang
 Fix For: cdc-3.1.1


Current, we only use SavepointRestoreSettings from command line, if not 
setting, use 
SavepointRestoreSettings.none(). A fallback strategy from flink config flile 
will be better.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36007) cdc load and register factory in once search

2024-08-08 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-36007:
-

 Summary: cdc load and register factory in once search
 Key: FLINK-36007
 URL: https://issues.apache.org/jira/browse/FLINK-36007
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0
 Attachments: image-2024-08-08-17-02-34-553.png, 
image-2024-08-08-17-03-20-893.png

Current, in cdc, it will search the factory to generate source and sink, and 
then will search again to get url, which is no need.

 

!image-2024-08-08-17-03-20-893.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36023) Flink K8S Native Application Mode add wrong jar url.

2024-08-09 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-36023:
-

 Summary: Flink K8S Native Application Mode add wrong jar url.
 Key: FLINK-36023
 URL: https://issues.apache.org/jira/browse/FLINK-36023
 Project: Flink
  Issue Type: Improvement
Reporter: Hongshun Wang


In https://issues.apache.org/jira/browse/FLINK-34853 , a great job made to 
summit CDC Job in Flink K8S Native Application Mode.However,  some change will 
occur problem:
 # if user summit job not in Flink K8S Native Application Mode, and put jar in 
/mypackage/usrlib will be casted to &\{FLINK_HOME}/usrlib.
 # if user summit job in Flink K8S Native Application Mode , jar in 
local://usrlib have already been added to classpath of JM and TM, thus no need 
to add jar anymore.

 
{code:java}
public static  Optional getJarPathByIdentifier(
String identifier, Class factoryClass) {
try {
 ...
if (urlString.contains("usrlib")) {
String flinkHome = System.getenv("FLINK_HOME");
urlString = urlString.replace("usrlib", flinkHome + "/usrlib");
}
   
}
return Optional.of(url);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to search JAR by factory identifier 
\"%s\"", identifier),
e);
}
}
}
 {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36036) CDC remove unused dependency jar from calcite-core

2024-08-12 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-36036:
-

 Summary: CDC remove unused dependency jar from calcite-core
 Key: FLINK-36036
 URL: https://issues.apache.org/jira/browse/FLINK-36036
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Remove unused dependency jar from calcite-core.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36076) Hotfix: Set isSchemaChangeApplying as isSchemaChangeApplying for thread safe.

2024-08-16 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-36076:
-

 Summary: Hotfix: Set isSchemaChangeApplying as 
isSchemaChangeApplying for thread safe.
 Key: FLINK-36076
 URL: https://issues.apache.org/jira/browse/FLINK-36076
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Hongshun Wang
 Fix For: cdc-3.3.0


Current, we apply schema change asynchronous.  
SchemaChangeThreadPool will set isSchemaChangeApplying as true when apply 
schema change event. And the main thread which handle operater event will check 
isSchemaChangeApplying.
 
For thread safe, this should be volatile.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36094) CDC SchemaRegistryRequestHandler should throw exception which is not SchemaEvolveException

2024-08-19 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-36094:
-

 Summary: CDC SchemaRegistryRequestHandler should throw exception 
which is not SchemaEvolveException
 Key: FLINK-36094
 URL: https://issues.apache.org/jira/browse/FLINK-36094
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Current, SchemaRegistryRequestHandler only throw 
SchemaEvolveException, which will not handle the others(like network, oom, or 
else.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36150) tables.exclude is not valid if scan.binlog.newly-added-table.enabled is true.

2024-08-25 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-36150:
-

 Summary: tables.exclude is not valid if 
scan.binlog.newly-added-table.enabled is true.
 Key: FLINK-36150
 URL: https://issues.apache.org/jira/browse/FLINK-36150
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0


Current, `tables.exclude` is provided for user to exclude some table, because
 tables passed to DataSource will be filtered when MySqlDataSourceFactory 
creates DataSource.
However, when scan.binlog.newly-added-table.enabled is true, new table ddl from 
binlog will be read and won't be filtered by `tables.exclude`.
 
This Jira aims to pass `tables.exclude` to MySqlSourceConfig and will use it 
when find tables from mysql database.
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36163) Kafka cdc pipeline source supports protobuf format.

2024-08-27 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-36163:
-

 Summary: Kafka cdc pipeline source supports protobuf format.
 Key: FLINK-36163
 URL: https://issues.apache.org/jira/browse/FLINK-36163
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Hongshun Wang
 Fix For: cdc-3.3.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36174) CDC yaml without pipeline will throw NullPointException

2024-08-29 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-36174:
-

 Summary: CDC yaml without pipeline will throw NullPointException
 Key: FLINK-36174
 URL: https://issues.apache.org/jira/browse/FLINK-36174
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
 Fix For: cdc-3.2.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)