[jira] [Created] (FLINK-35077) Add package license check for Flink CDC modules.

2024-04-10 Thread Xiqian YU (Jira)
Xiqian YU created FLINK-35077:
-

 Summary: Add package license check for Flink CDC modules.
 Key: FLINK-35077
 URL: https://issues.apache.org/jira/browse/FLINK-35077
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Xiqian YU


Currently, Flink project has CI scripts checking if dependencies with 
incompatible licenses are introduced.

Flink CDC module heavily relies on external libraries (especially connectors), 
so running similar checking scripts during every CI would be helpful preventing 
developers introducing questionable dependencies by accident.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSSION] FLIP-442: FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-10 Thread Xuannan Su
Hi Muhammet,

Thanks for the suggestion. I updated the FLIP to include the options class.

Best regards,
Xuannan

On Wed, Apr 10, 2024 at 1:56 PM Muhammet Orazov
 wrote:
>
> Hey Xuannan,
>
> Thanks for the FLIP and your efforts!
>
> Minor clarification from my side:
>
> > We will relocate these ConfigOptions to a class that is included
> > in the documentation generation.
>
> Would it make sense to define also in the FLIP the options class for
> these variables? For example, GPUDriverOptions?
>
> Best,
> Muhammet
>
> On 2024-04-09 08:20, Xuannan Su wrote:
> > Hi all,
> >
> > I'd like to start a discussion on FLIP-442: General Improvement to
> > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we aim to
> > provide users with a better experience with the existing
> > configuration. This FLIP proposes several general improvements to the
> > current configuration.
> >
> > Looking forward to everyone's feedback and suggestions. Thank you!
> >
> > Best regards,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0


Re: [DISCUSSION] FLIP-442: FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-10 Thread Xuannan Su
Hi Zakelly,

Thanks for the comments. I updated the FLIP accordingly.

Best regards,
Xuannan

On Wed, Apr 10, 2024 at 11:12 AM Zakelly Lan  wrote:
>
> Thanks Xuannan for driving this! +1 for cleaning these up.
>
> And minor comments: It seems the StateBackendOptions is already annotated
> with @PublicEvolving.
>
>
> Best,
> Zakelly
>
>
> On Tue, Apr 9, 2024 at 4:21 PM Xuannan Su  wrote:
>
> > Hi all,
> >
> > I'd like to start a discussion on FLIP-442: General Improvement to
> > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we aim to
> > provide users with a better experience with the existing
> > configuration. This FLIP proposes several general improvements to the
> > current configuration.
> >
> > Looking forward to everyone's feedback and suggestions. Thank you!
> >
> > Best regards,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
> >


Re: [VOTE] FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI

2024-04-10 Thread Muhammet Orazov

Hey Rui,

+1 (non-binding).

Thanks for driving it!

Best,
Muhammet

On 2024-04-10 04:36, Rui Fan wrote:

Hi devs,

Thank you to everyone for the feedback on FLIP-441: Show
the JobType and remove Execution Mode on Flink WebUI[1]
which has been discussed in this thread [2].

I would like to start a vote for it. The vote will be open for at least 
72

hours unless there is an objection or not enough votes.

[1] https://cwiki.apache.org/confluence/x/agrPEQ
[2] https://lists.apache.org/thread/0s52w17w24x7m2zo6ogl18t1fy412vcd

Best,
Rui


Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-04-10 Thread Jamie Grier
Sorry for coming very late to this thread.  I have not contributed much to
Flink publicly for quite some time but I have been involved with Flink,
daily, for years now and I'm keenly interested in where we take Flink SQL
going forward.

Thanks for the proposal!!  I think it's definitely a step in the right
direction and I'm thrilled this is happening.  The end state I have in mind
is that we get rid of execution modes as something users have to think
about and instead make sure the SQL a user writes completely describes
their intent.  In the case of this proposal the intent a user has is that
the system continually maintains an object (whatever we decide to call it)
that is the result of their query and further that these can be easily
chained together into declarative data pipelines.

I would think it would be very unsurprising to users to call this a
MATERIALIZED VIEW, except for the fact that this object can also accept
updates via one-off DML statements.  However, I don't think this object
*should* accept updates via one-off DML statements so I may be the odd man
out here.   I would like to dive into this a little more if at all
possible.  The reasoning I've seen mentioned is GDPR requirements so can we
dig into that specifically?  I am not terribly familiar with the exact GDPR
requirements but I should think that the solution to deleting data is to
delete it in the upstream tables which would appropriately update any
downstream MVs (or whatever we call it).

So, with that context and the desire to explore the GDPR requirements a
little more I would vote like so:

(1) Materialized View, it should work as expected to SQL users, no new
concept, no direct updates, dig into GDPR requirements though.
(2) Dynamic Table, just follow the Snowflake precedent.

I'm actually against all of the other proposed names so I rank them equally
last.  I don't think we need yet another new concept for this.  I think
that will just add to users' confusion and learning curve which is already
substantial with Flink.  We need to make things easier rather than harder.

All of that said, I think these discussions may be a bit easier if they
were part of a shared longer term vision for Flink SQL overall.  You can
see this from the little bits of side discussion that come up even in this
thread.  I'm not quite sure how to address that though.  I will however
give an example.

I think that longer term the Flink SQL query text alone should dictate
everything the system should do and we shouldn't rely on things like
runtime execution modes at all.  This means, for example, that a SELECT
statement should always be a point in time query and immediately return
results over the current data set and terminate.   This also holds for an
INSERT INTO query for that matter, and CTAS.  A continuous query that
perpetually maintains some view in the background should really have a
distinct syntax.  Basically Flink SQL should behave in a way that is
unsurprising to users of existing database systems.

Anyway, the point is that maybe we need a high level sketch of where we're
going so we can make sure it all hangs together nicely.

That all said I do think CREATE MATERIALIZED is a step in the right
direction but we should figure out the GDPR stuff and the overall direction
for Flink SQL going forward as well.





On Wed, Apr 10, 2024 at 6:16 AM Dawid Wysakowicz 
wrote:

> Hi all,
> I thought I'd cast my vote as well to give extra data:
>
>
>1. Materialized Table
>2. Materialized View (generally speaking I am not too concerned with
>using a View here, but since there are concerns around updating a view I
>put it second)
>
> I think what is suggested in this FLIP is really close to what MATERIALIZED
> VIEWS do already, that's why I very much prefer any of the two options
> above over any of the remaining candidates, but if I were to order them it
> would be:
>
> 3. Refresh Table (it says what it does)
> 4. Live Table - a new concept to explain, "live" can be interpreted in many
> ways
> 5. Derived Table - does not say much
>
> Best,
> Dawid
>
> On Wed, 10 Apr 2024 at 04:50, Jark Wu  wrote:
>
> > I have been following up on the discussion, it's a great FLIP to further
> > unify stream and batch ETL pipelines. Thanks for the proposal!
> >
> > Here is my ranking:
> >
> > 1. Materialized Table  -> "The table materializes the results of a query
> > that you specify", this can reflect what we want and doesn't conflict
> with
> > any SQL standard.
> > 2. Derived Table -> easy to understand and write, but need to extend the
> > standard
> > 3. Live Table ->  looks too much like Databrick's Delta Live Table.
> > 4. Materialized View -> looks weird to insert/update a view.
> >
> >
> > Best,
> > Jark
> >
> >
> >
> >
> > On Wed, 10 Apr 2024 at 09:57, Becket Qin  wrote:
> >
> > > Thanks for the proposal. I like the FLIP.
> > >
> > > My ranking:
> > >
> > > 1. Refresh(ing) / Live Table -> easy to understand and implies the
> > dynamic
> > > 

Re: [VOTE] FLIP-441: Show the JobType and remove Execution Mode on Flink WebUI

2024-04-10 Thread Ahmed Hamdy
+1 (non-binding)
Best Regards
Ahmed Hamdy


On Wed, 10 Apr 2024 at 06:39, Zhu Zhu  wrote:

> +1
>
> Thanks,
> Zhu
>
> gongzhongqiang  于2024年4月10日周三 13:11写道:
>
> > +1 (non binding)
> >
> >
> > Bests,
> >
> > Zhongqiang Gong
> >
> > Rui Fan <1996fan...@gmail.com> 于2024年4月10日周三 12:36写道:
> >
> > > Hi devs,
> > >
> > > Thank you to everyone for the feedback on FLIP-441: Show
> > > the JobType and remove Execution Mode on Flink WebUI[1]
> > > which has been discussed in this thread [2].
> > >
> > > I would like to start a vote for it. The vote will be open for at least
> > 72
> > > hours unless there is an objection or not enough votes.
> > >
> > > [1] https://cwiki.apache.org/confluence/x/agrPEQ
> > > [2] https://lists.apache.org/thread/0s52w17w24x7m2zo6ogl18t1fy412vcd
> > >
> > > Best,
> > > Rui
> > >
> >
>


Re: Support minibatch for TopNFunction

2024-04-10 Thread Roman Boyko
Finally I published the POC of the minibatch for TopN function [1]. It
covers all the implementations of TopN functions because it buffers the
records before putting them to the collector inside AbstractTopNFunction.
For proving the performance optimization I used the nexmark q19 which was
enhanced by adding INNER JOIN after the TopN function. Using only 1000
records output buffer gives nice results: the number of output records of
TopN function decreased from 490.000 to 136.000 records and job execution
time decreased 3 times. The details of the experiment and result
measurements can be found in document [2] (under Nexmark subtitle)

Looking forward to your feedback.

[1]
https://github.com/rovboyko/flink/tree/feature/topn-output-buffer
[2]
https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk

--
Best regards,
Roman Boyko
e.: ro.v.bo...@gmail.com
m.: +79059592443
telegram: @rboyko

On Thu, 28 Mar 2024 at 06:54, Roman Boyko  wrote:

> Hi  Xushuai!
>
> Thank you for your reply!
>
> 1. Yes, you are absolutely right - we can't fold the records inside output
> buffer if the current record, which is provided to output, has accumulate
> type (+I or +U). Only revoke type of records (-U or -D which produced by
> current TopN function or received by TopN function as input) can cause the
> folding process inside buffer. Thus accumulate message which was provided
> to output in previous batch would receive its revoke message in next
> batch for sure. I added the description of folding rules to the document [1]
> 2. Absolutely correct, the main purpose of this optimization is to reduce
> the workload of downstream operators. And it wouldn't increase the
> performance of current TopN operator.
> 3. Ok, I'll try to do it after I finish the POC code.
>
> [1]
>
> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk
> 
>
>
> --
> Best regards,
> Roman Boyko
> e.: ro.v.bo...@gmail.com
> m.: +79059592443
> telegram: @rboyko
>
> On Thu, 28 Mar 2024 at 09:41, shuai xu  wrote:
>
>> Hi, Roman
>>
>> Thanks for your proposal. I think this is an interesting idea and it
>> might be useful when there are operators downstream of the TopN.
>> And I have some questions about your proposal after reading your doc.
>>
>> 1.  From the input-output perspective, only the accumulated data seems to
>> be sent. If the accumulated data  +recordA has already been sent in the
>> previous batch, the -recordA would be sent in this batch? Could you provide
>> a detailed rule about folding redundant records?
>>
>> 2. The Minibatch Join[1] reduces state access during join process because
>> it folds redundant records before entering the process. From your doc,
>> folding redundant records is implemented after the TopN process. In other
>> words, it does not reduce the pressure of state access on TopN itself, but
>> rather just folds the output results that could be redundant. Is it right?
>>
>> 3. For the optimization results, the metric of output rows may not be
>> persuasive. Could you offer a result with metric in nexmark?
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-regular-joins
>>
>> Best,
>> Xushuai
>> > 2024年3月26日 00:00,Roman Boyko  写道:
>> >
>> > Hi Ron,
>> > Thank you so much for your reply!
>> >
>> > 1. I added the description to Motivation part of my document [1]
>> > 2. I suppose to inject this functionality to AbstractTopNFunction, thus
>> it
>> > will work for all its implementations. It doesn't depend of
>> implementation
>> > (either it would be AppendOnlyTopNFunction or RetractableTopNFunction,
>> > except maybe FastTop1Function), the most effect it would have for
>> functions
>> > with:
>> > - topN functions without no-ranking optimization [2]
>> > - high value of N (top1 has less possibilities for optimization
>> here)
>> > - frequent input records which are placed to the top 1 position
>> > 3. I will do it in a week - I need to fix and recheck some parts
>> > 4. Unfortunately I don't have permissions to Flink confluence and
>> according
>> > to contribution guide I first expressed my idea as google doc. I would
>> be
>> > happy to transform this idea to FLIP.
>> >
>> > [1]
>> >
>> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk
>> > <
>> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing
>> >
>> > [2]
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/topn/#no-ranking-output-optimization
>> >
>> > --
>> > Best regards,
>> > Roman Boyko
>> > e.: ro.v.bo...@gmail.com
>> > m.: +79059592443
>> > telegram: @rboyko
>> >
>> > On Mon, 25 Mar 2024 at 15:12, Ron liu  wrote:
>> >
>> >> Hi, Roman
>> >>
>> >> Thanks for your proposal, I intuitively feel that this optimization
>> would
>> >> be very useful to reduce 

Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-04-10 Thread Dawid Wysakowicz
Hi all,
I thought I'd cast my vote as well to give extra data:


   1. Materialized Table
   2. Materialized View (generally speaking I am not too concerned with
   using a View here, but since there are concerns around updating a view I
   put it second)

I think what is suggested in this FLIP is really close to what MATERIALIZED
VIEWS do already, that's why I very much prefer any of the two options
above over any of the remaining candidates, but if I were to order them it
would be:

3. Refresh Table (it says what it does)
4. Live Table - a new concept to explain, "live" can be interpreted in many
ways
5. Derived Table - does not say much

Best,
Dawid

On Wed, 10 Apr 2024 at 04:50, Jark Wu  wrote:

> I have been following up on the discussion, it's a great FLIP to further
> unify stream and batch ETL pipelines. Thanks for the proposal!
>
> Here is my ranking:
>
> 1. Materialized Table  -> "The table materializes the results of a query
> that you specify", this can reflect what we want and doesn't conflict with
> any SQL standard.
> 2. Derived Table -> easy to understand and write, but need to extend the
> standard
> 3. Live Table ->  looks too much like Databrick's Delta Live Table.
> 4. Materialized View -> looks weird to insert/update a view.
>
>
> Best,
> Jark
>
>
>
>
> On Wed, 10 Apr 2024 at 09:57, Becket Qin  wrote:
>
> > Thanks for the proposal. I like the FLIP.
> >
> > My ranking:
> >
> > 1. Refresh(ing) / Live Table -> easy to understand and implies the
> dynamic
> > characteristic
> >
> > 2. Derived Table -> easy to understand.
> >
> > 3. Materialized Table -> sounds like just a table with physical data
> stored
> > somewhere.
> >
> > 4. Materialized View -> modifying a view directly is a little weird.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Tue, Apr 9, 2024 at 5:46 AM Lincoln Lee 
> wrote:
> >
> > > Thanks Ron and Timo for your proposal!
> > >
> > > Here is my ranking:
> > >
> > > 1. Derived table -> extend the persistent semantics of derived table in
> > SQL
> > >standard, with a strong association with query, and has industry
> > > precedents
> > >such as Google Looker.
> > >
> > > 2. Live Table ->  an alternative for 'dynamic table'
> > >
> > > 3. Materialized Table -> combination of the Materialized View and
> Table,
> > > but
> > > still a table which accept data changes
> > >
> > > 4. Materialized View -> need to extend understanding of the view to
> > accept
> > > data changes
> > >
> > > The reason for not adding 'Refresh Table' is I don't want to tell the
> > user
> > > to 'refresh a refresh table'.
> > >
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Ron liu  于2024年4月9日周二 20:11写道:
> > >
> > > > Hi, Dev
> > > >
> > > > My rankings are:
> > > >
> > > > 1. Derived Table
> > > > 2. Materialized Table
> > > > 3. Live Table
> > > > 4. Materialized View
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > >
> > > >
> > > > Ron liu  于2024年4月9日周二 20:07写道:
> > > >
> > > > > Hi, Dev
> > > > >
> > > > > After several rounds of discussion, there is currently no consensus
> > on
> > > > the
> > > > > name of the new concept. Timo has proposed that we decide the name
> > > > through
> > > > > a vote. This is a good solution when there is no clear preference,
> so
> > > we
> > > > > will adopt this approach.
> > > > >
> > > > > Regarding the name of the new concept, there are currently five
> > > > candidates:
> > > > > 1. Derived Table -> taken by SQL standard
> > > > > 2. Materialized Table -> similar to SQL materialized view but a
> table
> > > > > 3. Live Table -> similar to dynamic tables
> > > > > 4. Refresh Table -> states what it does
> > > > > 5. Materialized View -> needs to extend the standard to support
> > > modifying
> > > > > data
> > > > >
> > > > > For the above five candidates, everyone can give your rankings
> based
> > on
> > > > > your preferences. You can choose up to five options or only choose
> > some
> > > > of
> > > > > them.
> > > > > We will use a scoring rule, where the* first rank gets 5 points,
> > second
> > > > > rank gets 4 points, third rank gets 3 points, fourth rank gets 2
> > > points,
> > > > > and fifth rank gets 1 point*.
> > > > > After the voting closes, I will score all the candidates based on
> > > > > everyone's votes, and the candidate with the highest score will be
> > > chosen
> > > > > as the name for the new concept.
> > > > >
> > > > > The voting will last up to 72 hours and is expected to close this
> > > Friday.
> > > > > I look forward to everyone voting on the name in this thread. Of
> > > course,
> > > > we
> > > > > also welcome new input regarding the name.
> > > > >
> > > > > Best,
> > > > > Ron
> > > > >
> > > > > Ron liu  于2024年4月9日周二 19:49写道:
> > > > >
> > > > >> Hi, Dev
> > > > >>
> > > > >> Sorry for my previous statement was not quite accurate. We will
> > hold a
> > > > >> vote for the name within this thread.
> > > > >>
> > > > >> Best,
> > > > >> Ron
> > > > >>
> > > > >>
> > > > >> Ron 

[jira] [Created] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-10 Thread elon_X (Jira)
elon_X created FLINK-35076:
--

 Summary: Watermark alignment will cause data flow to experience 
serious shake
 Key: FLINK-35076
 URL: https://issues.apache.org/jira/browse/FLINK-35076
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.16.1
Reporter: elon_X
 Attachments: image-2024-04-10-20-13-14-752.png, 
image-2024-04-10-20-15-05-731.png, image-2024-04-10-20-23-13-872.png

In our company, there is a requirement scenario for multi-stream join 
operations, we are making modifications based on Flink watermark alignment, 
then I found that the final join output would experience serious shake.

and I analyzed the reasons: an upstream topic has more than 300 partitions. The 
number of partitions requested for this topic is too large, causing some 
partitions to frequently experience intermittent writes with QPS=0. This 
phenomenon is more serious between 2 am and 5 am.However, the overall topic 
writing is very smooth.

 

The final join output will experience serious shake, as shown in the following 
diagram:

!image-2024-04-10-20-15-05-731.png!

Root cause:
 # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark 
to the SourceCoordinator.
 # If the partition write is zero during a certain period, the 
lastEmittedWatermark sent by the subtask corresponding to that partition 
remains unchanged.
 # The SourceCoordinator aggregates the watermarks of all subtasks according to 
the watermark group and takes the smallest watermark. This means that the 
maxAllowedWatermark may remain unchanged for some time, even though the overall 
upstream data flow is moving forward, until that minimum value is updated. Only 
then will everything change, which will manifest as serious shake in the output 
data stream.

I think choosing the global minimum might not be a good option. Using min/max 
could more likely encounter some edge cases. Perhaps choosing a median value 
would be more appropriate? Or a more complex selection strategy?

If replaced with a median value, it can ensure that the overall data flow is 
very smooth:

!image-2024-04-10-20-23-13-872.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35075) Migrate TwoStageOptimizedAggregateRule

2024-04-10 Thread Jacky Lau (Jira)
Jacky Lau created FLINK-35075:
-

 Summary: Migrate TwoStageOptimizedAggregateRule
 Key: FLINK-35075
 URL: https://issues.apache.org/jira/browse/FLINK-35075
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: Jacky Lau
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35074) SavepointITCase.testStopWithSavepointWithDrainGlobalFailoverIfSavepointAborted

2024-04-10 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-35074:
---

 Summary: 
SavepointITCase.testStopWithSavepointWithDrainGlobalFailoverIfSavepointAborted
 Key: FLINK-35074
 URL: https://issues.apache.org/jira/browse/FLINK-35074
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.18.2
Reporter: Ryan Skraba


AdaptiveScheduler: Test (module: tests) 
https://github.com/apache/flink/actions/runs/8609297979/job/23593291616#step:10:7708

{code}
Error: 02:38:03 02:38:03.567 [ERROR] 
org.apache.flink.test.checkpointing.SavepointITCase.testStopWithSavepointWithDrainGlobalFailoverIfSavepointAborted
  Time elapsed: 0.62 s  <<< ERROR!
Apr 09 02:38:03 java.util.concurrent.ExecutionException: 
org.apache.flink.util.FlinkException: Stop with savepoint operation could not 
be completed.
Apr 09 02:38:03 at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
Apr 09 02:38:03 at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
Apr 09 02:38:03 at 
org.apache.flink.test.checkpointing.SavepointITCase.testStopWithSavepointWithDrainGlobalFailoverIfSavepointAborted(SavepointITCase.java:1072)
Apr 09 02:38:03 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Apr 09 02:38:03 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Apr 09 02:38:03 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Apr 09 02:38:03 at java.lang.reflect.Method.invoke(Method.java:498)
Apr 09 02:38:03 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Apr 09 02:38:03 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Apr 09 02:38:03 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Apr 09 02:38:03 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Apr 09 02:38:03 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Apr 09 02:38:03 at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
Apr 09 02:38:03 at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
Apr 09 02:38:03 at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
Apr 09 02:38:03 at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
Apr 09 02:38:03 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Apr 09 02:38:03 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
Apr 09 02:38:03 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
Apr 09 02:38:03 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
Apr 09 02:38:03 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
Apr 09 02:38:03 at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
Apr 09 02:38:03 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
Apr 09 02:38:03 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
Apr 09 02:38:03 at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
Apr 09 02:38:03 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
Apr 09 02:38:03 at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
Apr 09 02:38:03 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
Apr 09 02:38:03 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Apr 09 02:38:03 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
Apr 09 02:38:03 at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
Apr 09 02:38:03 at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
Apr 09 02:38:03 at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
Apr 09 02:38:03 at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
Apr 09 02:38:03 at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
Apr 09 02:38:03 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
Apr 09 02:38:03 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
Apr 09 02:38:03 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
Apr 09 02:38:03 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
Apr 09 02:38:03 

RE: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-04-10 Thread David Radley
Hi,
I notice in the draft pr that there is a schema id in the format config. I was 
wondering why? In the confluent avro and existing debezium formats,  there is 
no schema id in the config, but there is the ability to specify a complete 
schema. In the protobuf format there is no schema id.

I assume the schema id would be used during serialize in the case there is 
already an existing registered schema and you have its id. I see in the docs 
https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html
 there is a serialize example where 2 schemas are registered.

I would suggest aiming to copy what the confluent DeSer libraries do rather 
than having a schema id hard coded in the config.

WDYT?
Kind regards, David.

From: Kevin Lam 
Date: Tuesday, 26 March 2024 at 20:06
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf 
Confluent Format
Thanks Anupam! Looking forward to it.

On Thu, Mar 14, 2024 at 1:50 AM Anupam Aggarwal 
wrote:

> Hi Kevin,
>
> Thanks, these are some great points.
> Just to clarify, I do agree that the subject should be an option (like in
> the case of RegistryAvroFormatFactory).
> We could fallback to subject and auto-register schemas, if schema-Id not
> provided explicitly.
> In general, I think it would be good to be more explicit about the schemas
> used (
>
> https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> <
> https://docs.confluent.io/platform/current/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration
> >
> ).
> This would also help prevent us from overriding the ids in incompatible
> ways.
>
> Under the current implementation of FlinkToProtoSchemaConverter we might
> end up overwriting the field-Ids.
> If we are able to locate a prior schema, the approach you outlined makes a
> lot of sense.
> Let me explore this a bit further and get back(in terms of feasibility).
>
> Thanks again!
> - Anupam
>
> On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam 
> wrote:
>
> > Hi Anupam,
> >
> > Thanks again for your work on contributing this feature back.
> >
> > Sounds good re: the refactoring/re-organizing.
> >
> > Regarding the schema-id, in my opinion this should NOT be a configuration
> > option on the format. We should be able to deterministically map the
> Flink
> > type to the ProtoSchema and register that with the Schema Registry.
> >
> > I think it can make sense to provide the `subject` as a parameter, so
> that
> > the serialization format can look up existing schemas.
> >
> > This would be used in something I mentioned something related above
> >
> > > Another topic I had is Protobuf's field ids. Ideally in Flink it would
> be
> > > nice if we are idiomatic about not renumbering them in incompatible
> ways,
> > > similar to what's discussed on the Schema Registry issue here:
> > > https://github.com/confluentinc/schema-registry/issues/2551
> >
> >
> > When we construct the ProtobufSchema from the Flink LogicalType, we
> > shouldn't renumber the field ids in an incompatible way, so one approach
> > would be to use the subject to look up the most recent version, and use
> > that to evolve the field ids correctly.
> >
> >
> > On Tue, Mar 12, 2024 at 2:33 AM Anupam Aggarwal <
> anupam.aggar...@gmail.com
> > >
> > wrote:
> >
> > > Hi Kevin,
> > >
> > > Thanks for starting the discussion on this.
> > > I will be working on contributing this feature back (This was developed
> > by
> > > Dawid Wysakowicz and others at Confluent).
> > >
> > > I have opened a (very initial) draft PR here
> > > https://github.com/apache/flink/pull/24482  with our current
> > > implementation.
> > > Thanks for the feedback on the PR, I haven’t gotten around to
> > > re-organizing/refactoring the classes yet, but it would be inline with
> > some
> > > of your comments.
> > >
> > > On the overall approach there are some slight variations from the
> initial
> > > proposal.
> > > Our implementation relies on an explicit schema-id being passed through
> > the
> > > config. This could help in cases where one Flink type could potentially
> > map
> > > to multiple proto types.
> > > We could make the schema-Id optional and fall back to deriving it from
> > the
> > > rowType (during serialization) if not present?
> > >
> > > The message index handling is still TBD. I am thinking of replicating
> > logic
> > > in AbstractKafkaProtobufSerializer.java
> > > <
> > >
> >
> https://github.com/confluentinc/schema-registry/blob/342c8a9d3854d4253d785214f5dcfb1b6cc59a06/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufSerializer.java#L157
> > > >
> > >  (|Deserializer).
> > > Please let me know if this makes sense / or in case you have any other
> > > feedback.
> > >
> > > Thanks
> > > Anupam
> > >
> > > On Thu, Feb 29, 2024 at 8:54 PM Kevin Lam
>  > >
> > > wrote:
> > >
> > > > Hey Robert,
> 

[jira] [Created] (FLINK-35073) Deadlock in LocalBufferPool when NetworkBufferPool.internalRecycleMemorySegments is called concurrently

2024-04-10 Thread Julien Tournay (Jira)
Julien Tournay created FLINK-35073:
--

 Summary: Deadlock in LocalBufferPool when 
NetworkBufferPool.internalRecycleMemorySegments is called concurrently
 Key: FLINK-35073
 URL: https://issues.apache.org/jira/browse/FLINK-35073
 Project: Flink
  Issue Type: Bug
Reporter: Julien Tournay
 Attachments: deadlock_threaddump_extract.json

The reported issue is easy to reproduce in batch mode using hybrid shuffle and 
a somewhat large total number of slots in the cluster. Parallelism does not 
seem to matter much.

Note: Joined a partial threaddump to illustrate the issue.

When `NetworkBufferPool.internalRecycleMemorySegments` is called concurrently. 
The following chain of call may happen:
{code:java}
NetworkBufferPool.internalRecycleMemorySegments -> 
LocalBufferPool.onGlobalPoolAvailable ->
LocalBufferPool.checkAndUpdateAvailability -> 
LocalBufferPool.requestMemorySegmentFromGlobalWhenAvailable{code}
`requestMemorySegmentFromGlobalWhenAvailable can cause `onGlobalPoolAvailable` 
to be invoked on another `LocalBufferPool` instance which triggers the same 
chain of actions.

The issue arises when 2 threads go through this specific code path at the same 
time.

Each thread will `requestMemorySegmentFromGlobalWhenAvailable` and in the 
process try to acquire a new locks on a series of LocalBuffer.

As an example, assume there are 6 `LocalBufferPool` instance A, B, C, D, E and 
F:
Thread 1 locks A, B, C and tries to lock D
Thread 2 locks D, E, F and tried to lock A
==> Both threads 1 and 2 are blocked.

The example threadump captured this issue:
First thread locked java.util.ArrayDeque@41d6a3bb and is blocked on 
java.util.ArrayDeque@e2b5e34
Second thread locked java.util.ArrayDeque@e2b5e34 and is blocked on 
java.util.ArrayDeque@41d6a3bb

 

Note that I'm not familiar enough with Flink internals to know what the fix 
should be but I'm happy to submit a PR if someone tells me what the correct 
behaviour should be.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35072) Doris pipeline sink does not support applying AlterColumnTypeEvent

2024-04-10 Thread Xiqian YU (Jira)
Xiqian YU created FLINK-35072:
-

 Summary: Doris pipeline sink does not support applying 
AlterColumnTypeEvent
 Key: FLINK-35072
 URL: https://issues.apache.org/jira/browse/FLINK-35072
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Xiqian YU


According to [Doris 
documentation|https://doris.apache.org/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN/],
 altering column types dynamically is supported (via ALTER TABLE ... MODIFY 
COLUMN statement) when lossless conversion is available. However, now Doris 
pipeline connector has no support to AlterColumnTypeEvent, and raises 
RuntimeException all the time.

It would be convenient for users if they can sync compatible type conversions, 
and could be easily implemented by extending Doris' SchemaChangeManager helper 
class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35071) Remove dependency on flink-shaded from cdc source connector

2024-04-10 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35071:
-

 Summary: Remove dependency on flink-shaded from cdc source 
connector
 Key: FLINK-35071
 URL: https://issues.apache.org/jira/browse/FLINK-35071
 Project: Flink
  Issue Type: Improvement
Reporter: Hongshun Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSSION] FLIP-442: FLIP-442: General Improvement to Configuration for Flink 2.0

2024-04-10 Thread Rui Fan
Thanks Xuannan for driving this proposal!

> Ensure all the ConfigOptions are properly annotated as PublicEvolving

Could we add a specification directly? All XxxOptions classes are
PublicEvolving by default. I'm afraid some new classes still miss
PublicEvolving in the future.

If we have a specification, it will be clear. And we don't need to
add PublicEvolving for each XxxOptions.

Best,
Rui

On Wed, Apr 10, 2024 at 1:54 PM Muhammet Orazov
 wrote:

> Hey Xuannan,
>
> Thanks for the FLIP and your efforts!
>
> Minor clarification from my side:
>
> > We will relocate these ConfigOptions to a class that is included
> > in the documentation generation.
>
> Would it make sense to define also in the FLIP the options class for
> these variables? For example, GPUDriverOptions?
>
> Best,
> Muhammet
>
> On 2024-04-09 08:20, Xuannan Su wrote:
> > Hi all,
> >
> > I'd like to start a discussion on FLIP-442: General Improvement to
> > Configuration for Flink 2.0 [1]. As Flink moves toward 2.0, we aim to
> > provide users with a better experience with the existing
> > configuration. This FLIP proposes several general improvements to the
> > current configuration.
> >
> > Looking forward to everyone's feedback and suggestions. Thank you!
> >
> > Best regards,
> > Xuannan
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-442%3A+General+Improvement+to+Configuration+for+Flink+2.0
>