Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread weijie guo
Hi Gyula,


Thanks for your reply!


Let me answer these questions:


> What is the semantics of the usesStates method? When is it called? Can
the used state change dynamically at runtime? Can the logic depend on
something computed in open(..) for example?


useStates is used to predefine all the states that the process
function needs to access. In other words, we want to avoid declaring
the state dynamically at runtime and this allows the SQL planner and
JM to optimize the job better. As a result, this logic must be fully
available at compile time (when the JobGraph is generated), so it
can't rely on computations that are executed after deploy to TM.


>
Currently state access is pretty dynamic in Flink and I would assume
many jobs create states on the fly based on some required logic. Are
we planning to address these use-cases?


It depends on what type of context we need. If the type and number of
states depend on runtime context, that's something we want to avoid.
If it only depended on information available at compile time, I think
we could support
it.


>
Are we planning to support deleting/dropping states that are not
required anymore?


We really don't want the user to be able to dynamically declare/delete
a state at runtime, but if you just want to clear/clean the value of
state, the new API works the same as the old API.


> I think if a state is not declared or otherwise cannot be accessed, an
exceptions must be thrown. We cannot confuse empty value with something
inaccessible.


After thinking about it a bit more, I think you have a point!
It's important to make a clear distinction between an empty state and
illegal access, especially since flink currently discourage setting a
non-null default value for the state.
I will modify the proposal as you suggested then :)


> The RedistributionMode enum sounds a bit strange to me, as it doesn't
actually specify a mode of redistribution. It feels more like a flag. Can
we simply have an Optional instead?


We actually define three types RedistributionMode instead of two because we
don't want to think of IDENTICAL as a redistribution strategy, it's just an
invariant: the State of that type is always the same across partitions. If
it only has None and REDISTRIBUTABLE, I think your proposal is feasible
then. But we don't want to confuse these three semantics/modes.


> BroadcastStates are currently very limited by only Map-like states, and
the new interface also enforces that. Can we remove this limitation? If
not, should broadcastState declaration extend mapstate declaration?


Personally, I don't want to make this restriction. This is also why
the method in StateManager to get BroadcastState has the parameter of
BroadcastStateDeclaration instead of MapStateDeclaration. In the
future, if the state backend supports other types of broadcast state,
we can add a corresponding method to the States utility class to get
the BroadcastSateDeclaration.



Best regards,

Weijie


Hangxiang Yu  于2024年3月7日周四 11:55写道:

> Hi, Weijie.
> Thanks for your proposal.
> I'd like to start the discussion with some questions:
> 1. We have also discussed in FLIP-359/FLINK-32658 about limiting the user
> operation to avoid creating state when processElement. Could current
> interfaces also help this?
>
> 2. Could you provide more examples about how useStates() works ? Since some
> operations may change their used states at runtime, the value this method
> returns will be modified at runtime, right ?
> If so, I'm thinking if we could get some deterministic State Declaration
> Set before running which could help a lot for some state operations e.g.
> pre-check schema compatibility, queryable schema.
>
> 3. IIUC, RedistributionMode/Strategy should not be used by users, right ?
> If so, I'm +1 to move them to inner interfaces which seems a bit confusing
> to users.
>
> On Thu, Mar 7, 2024 at 11:39 AM Zakelly Lan  wrote:
>
> > Hi Weijie,
> >
> > Thanks for proposing this!
> >
> > Unifying and optimizing state definitions is a very good thing. I like
> the
> > idea of 'definition goes before using', so overall +1 for this proposal.
> >
> > However, I think the current definition is somewhat unclear. From a
> user's
> > point of view, I believe that state can be characterized along two
> > relatively independent axes: the scenario (keyed, non-keyed, or
> broadcast)
> > and the data structure (single value, list, map). I recommend that we
> fully
> > decouple these aspects, rather than linking the nature of the definition
> to
> > specific assumptions, such as equating broadcast states with maps, or
> > considering list states could be non-keyed.
> > Furthermore, the concept of 'Redistribution' may impose a cognitive
> burden
> > on general users. My advice would be to conceal
> RedistributionMode/Strategy
> > from the standard user interface, particularly within the helper class
> > 'State'. But I'm OK to keep it in `StateDeclaration` since its interfaces
> > are basically used by the fra

[DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-07 Thread Zakelly Lan
Hi devs,

I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
State Storage and Management[1], which is a joint work of Yuan Mei, Zakelly
Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:

 - FLIP-424: Asynchronous State APIs [2]

This FLIP introduces new APIs for asynchronous state access.

Please make sure you have read the FLIP-423[1] to know the whole story, and
we'll discuss the details of FLIP-424[2] under this mail. For the
discussion of overall architecture or topics related with multiple
sub-FLIPs, please post in the previous mail[3].

Looking forward to hearing from you!

[1] https://cwiki.apache.org/confluence/x/R4p3EQ
[2] https://cwiki.apache.org/confluence/x/SYp3EQ
[3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0


Best,
Zakelly


[DISCUSS] FLIP-426: Grouping Remote State Access

2024-03-07 Thread Jinzhong Li
Hi devs,


I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
State Storage and Management[1], which is a joint work of Yuan Mei, Zakelly
Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:

- FLIP-426: Grouping Remote State Access

 [2]

This FLIP enables retrieval of remote state data in batches to avoid
unnecessary round-trip costs for remote access.

Please make sure you have read the FLIP-423[1] to know the whole story, and
we'll discuss the details of FLIP-424[2] under this mail. For the
discussion of overall architecture or topics related with multiple
sub-FLIPs, please post in the previous mail[3].

Looking forward to hearing from you!

[1] https://cwiki.apache.org/confluence/x/R4p3EQ

[2] https://cwiki.apache.org/confluence/x/TYp3EQ

[3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0

Best,

Jinzhong Li


[DISCUSS] FLIP-428: Fault Tolerance/Rescale Integration for Disaggregated State

2024-03-07 Thread Jinzhong Li
Hi devs,

I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
State Storage and Management[1], which is a joint work of Yuan Mei, Zakelly
Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:

- FLIP-428: Fault Tolerance/Rescale Integration for Disaggregated State

 [2]

This FLIP integrates checkpointing mechanisms with the disaggregated state
store for fault tolerance and fast rescaling.

Please make sure you have read the FLIP-423[1] to know the whole story, and
we'll discuss the details of FLIP-424[2] under this mail. For the
discussion of overall architecture or topics related with multiple
sub-FLIPs, please post in the previous mail[3].

Looking forward to hearing from you!

[1] https://cwiki.apache.org/confluence/x/R4p3EQ

[2] https://cwiki.apache.org/confluence/x/UYp3EQ

[3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0

Best,

Jinzhong Li


Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread weijie guo
Hi Zakelly,

Thanks for your reply!

> My advice would be to conceal RedistributionMode/Strategy from the
standard user interface, particularly within the helper class 'State'. But
I'm OK to keep it in `StateDeclaration` since its interfaces are basically
used by the framework.

I'm sorry, I didn't mention some of the details/concepts introduced by
the Umbrella FLIP and FLIP-409 in this FLIP. This might make it hard
to understand the motivation behind
RedistributionMode, I'll add more context in FLIP then.


Briefly, in V2, we explicitly define the concept of partition. In the
case of KeyedPartitionStream, one key corresponds to one partition.For
NonKeyedPartitionStream one parallelism/subtask corresponds to one
partition. All states are considered to be confined within the
partition. On this basis, an obvious question is whether and how the
state should be redistribution when the partition changes? So we
divide the state into three categories:

   - Don't need to redistribute states when the partition changes.
   - Has to decide how to distribute states when the partition changes.
   - Always has the same state across different partitions.

After introducing the concept of partition, the redistribution
pattern/mode of state is the more essential difference between states.
For this reason, we don't want to emphasize keyed/operator state in
the V2 API
any
more. Keep in mind, partition are first-class citizens. And, even in
V1, we have to let the user know that split/union are two different
strategies for list state.


As for whether or not to expose RedistributionMode to users, I have an
open mind. But as I said just now, we still can't avoid this problem
in the splitRedistributionListState and unionRedistributionListState.
IMO, it's better to explain it in the API level instead of avoiding
it. WDTY?

Best regards,

Weijie


weijie guo  于2024年3月7日周四 16:39写道:

> Hi Gyula,
>
>
> Thanks for your reply!
>
>
> Let me answer these questions:
>
>
> > What is the semantics of the usesStates method? When is it called? Can
> the used state change dynamically at runtime? Can the logic depend on 
> something computed in open(..) for example?
>
>
>
> useStates is used to predefine all the states that the process function needs 
> to access. In other words, we want to avoid declaring the state dynamically 
> at runtime and this allows the SQL planner and JM to optimize the job better. 
> As a result, this logic must be fully available at compile time (when the 
> JobGraph is generated), so it can't rely on computations that are executed 
> after deploy to TM.
>
>
> >
> Currently state access is pretty dynamic in Flink and I would assume many 
> jobs create states on the fly based on some required logic. Are we planning 
> to address these use-cases?
>
>
> It depends on what type of context we need. If the type and number of states 
> depend on runtime context, that's something we want to avoid. If it only 
> depended on information available at compile time, I think we could support
> it.
>
>
> >
> Are we planning to support deleting/dropping states that are not required 
> anymore?
>
>
>
> We really don't want the user to be able to dynamically declare/delete a 
> state at runtime, but if you just want to clear/clean the value of state, the 
> new API works the same as the old API.
>
>
> > I think if a state is not declared or otherwise cannot be accessed, an
> exceptions must be thrown. We cannot confuse empty value with something
> inaccessible.
>
>
> After thinking about it a bit more, I think you have a point!
> It's important to make a clear distinction between an empty state and illegal 
> access, especially since flink currently discourage setting a non-null 
> default value for the state.
> I will modify the proposal as you suggested then :)
>
>
> > The RedistributionMode enum sounds a bit strange to me, as it doesn't
> actually specify a mode of redistribution. It feels more like a flag. Can
> we simply have an Optional instead?
>
>
> We actually define three types RedistributionMode instead of two because
> we don't want to think of IDENTICAL as a redistribution strategy, it's just
> an invariant: the State of that type is always the same across partitions.
> If it only has None and REDISTRIBUTABLE, I think your proposal is
> feasible then. But we don't want to confuse these three semantics/modes.
>
>
> > BroadcastStates are currently very limited by only Map-like states, and
> the new interface also enforces that. Can we remove this limitation? If
> not, should broadcastState declaration extend mapstate declaration?
>
>
>
> Personally, I don't want to make this restriction. This is also why the 
> method in StateManager to get BroadcastState has the parameter of 
> BroadcastStateDeclaration instead of MapStateDeclaration. In the future, if 
> the state backend supports other types of broadcast state, we can add a 
> corresponding method to the States utility class to get the 
> BroadcastSateDeclaration.
>
>
>

[DISCUSS] FLIP-425: Asynchronous Execution Model

2024-03-07 Thread Yanfei Lei
Hi devs,

I'd like to start a discussion on FLIP-425: Asynchronous Execution
Model[1], which is a sub-FLIP of FLIP-423: Disaggregated State Storage
and Management[2].

FLIP-425 introduces a non-blocking execution model leveraging the
asynchronous APIs introduced in FLIP-424[3].
For the whole story please read the FLIP-423[2], and this thread is
aimed to discuss the details of "FLIP-425: Asynchronous Execution
Model".

Regarding the details of this FLIP, there have been some discussions
here[4], mainly focusing on framework overhead profiling, watermark
processing, etc. Please see link[4] for the context.

Looking forward to hearing from you!


[1] https://cwiki.apache.org/confluence/x/S4p3EQ
[2] https://cwiki.apache.org/confluence/x/R4p3EQ
[3] https://cwiki.apache.org/confluence/x/SYp3EQ
[4] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0

Best,
Yanfei


Re: Default scale and precision SQL data types

2024-03-07 Thread lorenzo . affetti
Hello Sergei!

The proposal makes a lot of sense, and Martijn is right as well.
Are you willing to drive the FLIP effort?
Do you need any assistance with that?
On Mar 4, 2024 at 01:48 +0100, Martijn Visser , wrote:
> Hi,
>
> I think it would first require a FLIP, given it touches on the core type
> system of SQL.
>
> Best regards,
>
> Martijn
>
> On Sat, Mar 2, 2024 at 5:34 PM Sergei Morozov  wrote:
>
> > Hi there,
> >
> > org.apache.flink.table.api.DataTypes allows the creation of temporal data
> > types by specifying precision (e.g. TIME(3)) or omitting it (TIME()). The
> > ability to omit precision for temporal types was introduced in
> > apache/flink@36fef44
> > <
> > https://github.com/apache/flink/commit/36fef4457a7f1de47989c8a2485581bcf8633b32
> > > >
> > .
> >
> > Unfortunately, this isn't possible for other data types (e.g. CHAR,
> > DECIMAL).
> > Even though they define defaults for length, precision, and scale, their
> > values have to be passed to the method explicitly.
> >
> > Would a PR be accepted which will introduce the methods for the remaining
> > types similar to the temporal ones?
> >
> > Thanks.
> >


Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread weijie guo
Hi Hangxiang,

Thanks for your reply!

> We have also discussed in FLIP-359/FLINK-32658 about limiting the user
operation to avoid creating state when processElement. Could current
interfaces also help this?

I think so. It is illegal to create state at runtime in our proposal.


> Could you provide more examples about how useStates() works ? Since some
operations may change their used states at runtime, the value this method
returns will be modified at runtime, right?


No, Introducing and declaring new state
at runtime is something we want to explicitly disallow. You can simply
assume that useState is only called when the JobGraph is generated,
and any future changes to it are invalid and illegal.


> IIUC, RedistributionMode/Strategy should not be used by users, right ?
If so, I'm +1 to move them to inner interfaces which seems a bit confusing
to users.


As for this question, I think my answer to Zakelly should be helpful.



Best regards,

Weijie


weijie guo  于2024年3月7日周四 16:58写道:

> Hi Zakelly,
>
> Thanks for your reply!
>
> > My advice would be to conceal RedistributionMode/Strategy from the
> standard user interface, particularly within the helper class 'State'. But
> I'm OK to keep it in `StateDeclaration` since its interfaces are basically
> used by the framework.
>
> I'm sorry, I didn't mention some of the details/concepts introduced by the 
> Umbrella FLIP and FLIP-409 in this FLIP. This might make it hard to 
> understand the motivation behind
> RedistributionMode, I'll add more context in FLIP then.
>
>
>
> Briefly, in V2, we explicitly define the concept of partition. In the case of 
> KeyedPartitionStream, one key corresponds to one partition.For 
> NonKeyedPartitionStream one parallelism/subtask corresponds to one partition. 
> All states are considered to be confined within the partition. On this basis, 
> an obvious question is whether and how the state should be redistribution 
> when the partition changes? So we divide the state into three categories:
>
>- Don't need to redistribute states when the partition changes.
>- Has to decide how to distribute states when the partition changes.
>- Always has the same state across different partitions.
>
> After introducing the concept of partition, the redistribution pattern/mode 
> of state is the more essential difference between states. For this reason, we 
> don't want to emphasize keyed/operator state in the V2 API
> any
> more. Keep in mind, partition are first-class citizens. And, even in V1, we 
> have to let the user know that split/union are two different strategies for 
> list state.
>
>
>
> As for whether or not to expose RedistributionMode to users, I have an open 
> mind. But as I said just now, we still can't avoid this problem in the 
> splitRedistributionListState and unionRedistributionListState. IMO, it's 
> better to explain it in the API level instead of avoiding it. WDTY?
>
> Best regards,
>
> Weijie
>
>
> weijie guo  于2024年3月7日周四 16:39写道:
>
>> Hi Gyula,
>>
>>
>> Thanks for your reply!
>>
>>
>> Let me answer these questions:
>>
>>
>> > What is the semantics of the usesStates method? When is it called? Can
>> the used state change dynamically at runtime? Can the logic depend on 
>> something computed in open(..) for example?
>>
>>
>>
>> useStates is used to predefine all the states that the process function 
>> needs to access. In other words, we want to avoid declaring the state 
>> dynamically at runtime and this allows the SQL planner and JM to optimize 
>> the job better. As a result, this logic must be fully available at compile 
>> time (when the JobGraph is generated), so it can't rely on computations that 
>> are executed after deploy to TM.
>>
>>
>> >
>> Currently state access is pretty dynamic in Flink and I would assume many 
>> jobs create states on the fly based on some required logic. Are we planning 
>> to address these use-cases?
>>
>>
>> It depends on what type of context we need. If the type and number of states 
>> depend on runtime context, that's something we want to avoid. If it only 
>> depended on information available at compile time, I think we could support
>> it.
>>
>>
>> >
>> Are we planning to support deleting/dropping states that are not required 
>> anymore?
>>
>>
>>
>> We really don't want the user to be able to dynamically declare/delete a 
>> state at runtime, but if you just want to clear/clean the value of state, 
>> the new API works the same as the old API.
>>
>>
>> > I think if a state is not declared or otherwise cannot be accessed, an
>> exceptions must be thrown. We cannot confuse empty value with something
>> inaccessible.
>>
>>
>> After thinking about it a bit more, I think you have a point!
>> It's important to make a clear distinction between an empty state and 
>> illegal access, especially since flink currently discourage setting a 
>> non-null default value for the state.
>> I will modify the proposal as you suggested then :)
>>
>>
>> > The RedistributionMode en

[DISCUSS] FLIP-427: Disaggregated State Store

2024-03-07 Thread Hangxiang Yu
Hi devs,


I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
State Storage and Management[1], which is a joint work of Yuan Mei, Zakelly
Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:

- FLIP-427: Disaggregated State Store

This FLIP introduces the initial version of the ForSt disaggregated state
store.

Please make sure you have read the FLIP-423[1] to know the whole story, and
we'll discuss the details of FLIP-427[2] under this mail. For the
discussion of overall architecture or topics related with multiple
sub-FLIPs, please post in the previous mail[3].

Looking forward to hearing from you!

[1] https://cwiki.apache.org/confluence/x/R4p3EQ

[2] https://cwiki.apache.org/confluence/x/T4p3EQ

[3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0


Best,

Hangxiang.


Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-07 Thread Zakelly Lan
Hi everyone,

Thank you all for a lively discussion here, and it is a good time to move
forward to more detailed discussions. Thus we open several threads for
sub-FLIPs:

FLIP-424: https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
FLIP-425: https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
FLIP-426: https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
FLIP-427: https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
FLIP-428: https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b

If you want to talk about the overall architecture, roadmap, milestones or
something related with multiple FLIPs, please post it here. Otherwise you
can discuss some details in separate mails. Let's try to avoid repeated
discussion in different threads. I will sync important messages here if
there are any in the above threads.

And reply to @Jeyhun: We will ensure the content between those FLIPs is
consistent.


Best,
Zakelly

On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei  wrote:

> I have been a bit busy these few weeks and sorry for responding late.
>
> The original thinking of keeping discussion within one thread is for easier
> tracking and avoid for repeated discussion in different threads.
>
> For details, It might be good to start in different threads if needed.
>
> We will think of a way to better organize the discussion.
>
> Best
> Yuan
>
>
> On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov 
> wrote:
>
> > Hi,
> >
> > + 1 for the suggestion.
> > Maybe we can the discussion with the FLIPs with minimum dependencies
> (from
> > the other new/proposed FLIPs).
> > Based on our discussion on a particular FLIP, the subsequent (or its
> > dependent) FLIP(s) can be updated accordingly?
> >
> > Regards,
> > Jeyhun
> >
> > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra  wrote:
> >
> > > Hey all!
> > >
> > > This is a massive improvement / work. I just started going through the
> > > Flips and have a more or less meta comment.
> > >
> > > While it's good to keep the overall architecture discussion here, I
> think
> > > we should still have separate discussions for each FLIP where we can
> > > discuss interface details etc. With so much content if we start adding
> > > minor comments here that will lead to nowhere but those discussions are
> > > still important and we should have them in separate threads (one for
> each
> > > FLIP)
> > >
> > > What do you think?
> > > Gyula
> > >
> > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei  wrote:
> > >
> > > > Hi team,
> > > >
> > > > Thanks for your discussion. Regarding FLIP-425, we have supplemented
> > > > several updates to answer high-frequency questions:
> > > >
> > > > 1. We captured a flame graph of the Hashmap state backend in
> > > > "Synchronous execution with asynchronous APIs"[1], which reveals that
> > > > the framework overhead (including reference counting, future-related
> > > > code and so on) consumes about 9% of the keyed operator CPU time.
> > > > 2. We added a set of comparative experiments for watermark
> processing,
> > > > the performance of Out-Of-Order mode is 70% better than
> > > > strictly-ordered mode under ~140MB state size. Instructions on how to
> > > > run this test have also been added[2].
> > > > 3. Regarding the order of StreamRecord, whether it has state access
> or
> > > > not. We supplemented a new *Strict order of 'processElement'*[3].
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> > > > [3]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
> > > >
> > > >
> > > > Best regards,
> > > > Yanfei
> > > >
> > > > Yunfeng Zhou  于2024年3月5日周二 09:25写道:
> > > > >
> > > > > Hi Zakelly,
> > > > >
> > > > > > 5. I'm not very sure ... revisiting this later since it is not
> > > > important.
> > > > >
> > > > > It seems that we still have some details to confirm about this
> > > > > question. Let's postpone this to after the critical parts of the
> > > > > design are settled.
> > > > >
> > > > > > 8. Yes, we had considered ... metrics should be like afterwards.
> > > > >
> > > > > Oh sorry I missed FLIP-431. I'm fine with discussing this topic in
> > > > milestone 2.
> > > > >
> > > > > Looking forward to the detailed design about the strict mode
> between
> > > > > same-key records and the benchmark results about the epoch
> mechanism.
> > > > >
> > > > > Best regards,
> > > > > Yunfeng
> > > > >
> > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan 
> > > > wrote:
> > > > > >
> > > > > > Hi Yunfeng,
> > > > > >
> > > > > > For 1:
> > > > >

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread Jinzhong Li
Hi Weijie,

Thanks for driving this!

1. Overall, I think that the “Eager State Declaration” is a good proposal,
which can enhance Flink's state management capabilities and provide
possibilities for subsequent state optimizations.

2. When the user attempts to access an undeclared state at runtime, it is
more reasonable to throw an exception rather than returning Option#empty,
as Gyula mentioned above.
In addition, I'm not quite sure whether all of the existing usage in which
states are registered at runtime dynamically can be migrated to the "Eager
State Declaration" style with minimal cost?

3. For state TTL, should StateDeclaration also provide interfaces for users
to declare state ttl?

Best,
Jinzhong Li


On Thu, Mar 7, 2024 at 5:08 PM weijie guo  wrote:

> Hi Hangxiang,
>
> Thanks for your reply!
>
> > We have also discussed in FLIP-359/FLINK-32658 about limiting the user
> operation to avoid creating state when processElement. Could current
> interfaces also help this?
>
> I think so. It is illegal to create state at runtime in our proposal.
>
>
> > Could you provide more examples about how useStates() works ? Since some
> operations may change their used states at runtime, the value this method
> returns will be modified at runtime, right?
>
>
> No, Introducing and declaring new state
> at runtime is something we want to explicitly disallow. You can simply
> assume that useState is only called when the JobGraph is generated,
> and any future changes to it are invalid and illegal.
>
>
> > IIUC, RedistributionMode/Strategy should not be used by users, right ?
> If so, I'm +1 to move them to inner interfaces which seems a bit confusing
> to users.
>
>
> As for this question, I think my answer to Zakelly should be helpful.
>
>
>
> Best regards,
>
> Weijie
>
>
> weijie guo  于2024年3月7日周四 16:58写道:
>
> > Hi Zakelly,
> >
> > Thanks for your reply!
> >
> > > My advice would be to conceal RedistributionMode/Strategy from the
> > standard user interface, particularly within the helper class 'State'.
> But
> > I'm OK to keep it in `StateDeclaration` since its interfaces are
> basically
> > used by the framework.
> >
> > I'm sorry, I didn't mention some of the details/concepts introduced by
> the Umbrella FLIP and FLIP-409 in this FLIP. This might make it hard to
> understand the motivation behind
> > RedistributionMode, I'll add more context in FLIP then.
> >
> >
> >
> > Briefly, in V2, we explicitly define the concept of partition. In the
> case of KeyedPartitionStream, one key corresponds to one partition.For
> NonKeyedPartitionStream one parallelism/subtask corresponds to one
> partition. All states are considered to be confined within the partition.
> On this basis, an obvious question is whether and how the state should be
> redistribution when the partition changes? So we divide the state into
> three categories:
> >
> >- Don't need to redistribute states when the partition changes.
> >- Has to decide how to distribute states when the partition changes.
> >- Always has the same state across different partitions.
> >
> > After introducing the concept of partition, the redistribution
> pattern/mode of state is the more essential difference between states. For
> this reason, we don't want to emphasize keyed/operator state in the V2 API
> > any
> > more. Keep in mind, partition are first-class citizens. And, even in V1,
> we have to let the user know that split/union are two different strategies
> for list state.
> >
> >
> >
> > As for whether or not to expose RedistributionMode to users, I have an
> open mind. But as I said just now, we still can't avoid this problem in the
> splitRedistributionListState and unionRedistributionListState. IMO, it's
> better to explain it in the API level instead of avoiding it. WDTY?
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > weijie guo  于2024年3月7日周四 16:39写道:
> >
> >> Hi Gyula,
> >>
> >>
> >> Thanks for your reply!
> >>
> >>
> >> Let me answer these questions:
> >>
> >>
> >> > What is the semantics of the usesStates method? When is it called? Can
> >> the used state change dynamically at runtime? Can the logic depend on
> something computed in open(..) for example?
> >>
> >>
> >>
> >> useStates is used to predefine all the states that the process function
> needs to access. In other words, we want to avoid declaring the state
> dynamically at runtime and this allows the SQL planner and JM to optimize
> the job better. As a result, this logic must be fully available at compile
> time (when the JobGraph is generated), so it can't rely on computations
> that are executed after deploy to TM.
> >>
> >>
> >> >
> >> Currently state access is pretty dynamic in Flink and I would assume
> many jobs create states on the fly based on some required logic. Are we
> planning to address these use-cases?
> >>
> >>
> >> It depends on what type of context we need. If the type and number of
> states depend on runtime context, that's something we want to avoid. If it
> only depe

[jira] [Created] (FLINK-34616) python dist doesn't clean when open method construct resource

2024-03-07 Thread Jacky Lau (Jira)
Jacky Lau created FLINK-34616:
-

 Summary: python dist doesn't clean when open method construct 
resource
 Key: FLINK-34616
 URL: https://issues.apache.org/jira/browse/FLINK-34616
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.20.0
Reporter: Jacky Lau
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] Release 1.19.0, release candidate #2

2024-03-07 Thread Lincoln Lee
Hi everyone,

Please review and vote on the release candidate #2 for the version 1.19.0,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:

* JIRA release notes [1], and the pull request adding release note for
users [2]
* the official Apache source release and binary convenience releases to be
deployed to dist.apache.org [3], which are signed with the key with
fingerprint E57D30ABEE75CA06  [4],
* all artifacts to be deployed to the Maven Central Repository [5],
* source code tag "release-1.19.0-rc2" [6],
* website pull request listing the new release and adding announcement blog
post [7].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353282
[2] https://github.com/apache/flink/pull/24394
[3] https://dist.apache.org/repos/dist/dev/flink/flink-1.19.0-rc2/
[4] https://dist.apache.org/repos/dist/release/flink/KEYS
[5] https://repository.apache.org/content/repositories/orgapacheflink-1709
[6] https://github.com/apache/flink/releases/tag/release-1.19.0-rc2
[7] https://github.com/apache/flink-web/pull/721


Best,
Yun, Jing, Martijn and Lincoln


Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread weijie guo
Hi Jinzhong,

Thanks for the reply!

> Overall, I think that the “Eager State Declaration” is a good proposal,
which can enhance Flink's state management capabilities and provide
possibilities for subsequent state optimizations.

It's nice to see that people who are familiar with the state stuff like
this proposal. :)

>  When the user attempts to access an undeclared state at runtime, it is
more reasonable to throw an exception rather than returning Option#empty,
as Gyula mentioned above.

Yes, I agree that this is better then a confused empty, and I have modified
the corresponding part of this FLIP.

> In addition, I'm not quite sure whether all of the existing usage in
which states are registered at runtime dynamically can be migrated to the
"Eager State Declaration" style with minimal cost?

I think for most user functions, this is fairly straightforward to migrate.
But states whose declarations depend on runtime information(e.g.
RuntimeContext) are, in principle, not supported in the new API. Anyway,
the old and new apis are completely incompatible, so rewriting jobs is
inevitable. User can think about how to write a good process function that
conforms to the eager declaration style.

> For state TTL, should StateDeclaration also provide interfaces for users
to declare state ttl?

Of course, We can and we need to provide this one. But whether or not it's
in this FLIP isn't very important for me, because we're mainly talking
about the general principles and ways of declaring and accessing state in
this FLIP. I promise we won't leave it out in the end D).



Best regards,

Weijie


Jinzhong Li  于2024年3月7日周四 17:34写道:

> Hi Weijie,
>
> Thanks for driving this!
>
> 1. Overall, I think that the “Eager State Declaration” is a good proposal,
> which can enhance Flink's state management capabilities and provide
> possibilities for subsequent state optimizations.
>
> 2. When the user attempts to access an undeclared state at runtime, it is
> more reasonable to throw an exception rather than returning Option#empty,
> as Gyula mentioned above.
> In addition, I'm not quite sure whether all of the existing usage in which
> states are registered at runtime dynamically can be migrated to the "Eager
> State Declaration" style with minimal cost?
>
> 3. For state TTL, should StateDeclaration also provide interfaces for users
> to declare state ttl?
>
> Best,
> Jinzhong Li
>
>
> On Thu, Mar 7, 2024 at 5:08 PM weijie guo 
> wrote:
>
> > Hi Hangxiang,
> >
> > Thanks for your reply!
> >
> > > We have also discussed in FLIP-359/FLINK-32658 about limiting the user
> > operation to avoid creating state when processElement. Could current
> > interfaces also help this?
> >
> > I think so. It is illegal to create state at runtime in our proposal.
> >
> >
> > > Could you provide more examples about how useStates() works ? Since
> some
> > operations may change their used states at runtime, the value this method
> > returns will be modified at runtime, right?
> >
> >
> > No, Introducing and declaring new state
> > at runtime is something we want to explicitly disallow. You can simply
> > assume that useState is only called when the JobGraph is generated,
> > and any future changes to it are invalid and illegal.
> >
> >
> > > IIUC, RedistributionMode/Strategy should not be used by users, right ?
> > If so, I'm +1 to move them to inner interfaces which seems a bit
> confusing
> > to users.
> >
> >
> > As for this question, I think my answer to Zakelly should be helpful.
> >
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > weijie guo  于2024年3月7日周四 16:58写道:
> >
> > > Hi Zakelly,
> > >
> > > Thanks for your reply!
> > >
> > > > My advice would be to conceal RedistributionMode/Strategy from the
> > > standard user interface, particularly within the helper class 'State'.
> > But
> > > I'm OK to keep it in `StateDeclaration` since its interfaces are
> > basically
> > > used by the framework.
> > >
> > > I'm sorry, I didn't mention some of the details/concepts introduced by
> > the Umbrella FLIP and FLIP-409 in this FLIP. This might make it hard to
> > understand the motivation behind
> > > RedistributionMode, I'll add more context in FLIP then.
> > >
> > >
> > >
> > > Briefly, in V2, we explicitly define the concept of partition. In the
> > case of KeyedPartitionStream, one key corresponds to one partition.For
> > NonKeyedPartitionStream one parallelism/subtask corresponds to one
> > partition. All states are considered to be confined within the partition.
> > On this basis, an obvious question is whether and how the state should be
> > redistribution when the partition changes? So we divide the state into
> > three categories:
> > >
> > >- Don't need to redistribute states when the partition changes.
> > >- Has to decide how to distribute states when the partition changes.
> > >- Always has the same state across different partitions.
> > >
> > > After introducing the concept of partition, the redistribution
> > pattern/mod

[jira] [Created] (FLINK-34617) Correct the Javadoc of org.apache.flink.api.common.time.Time

2024-03-07 Thread Yun Tang (Jira)
Yun Tang created FLINK-34617:


 Summary: Correct the Javadoc of 
org.apache.flink.api.common.time.Time
 Key: FLINK-34617
 URL: https://issues.apache.org/jira/browse/FLINK-34617
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.19.0
Reporter: Yun Tang
Assignee: Yun Tang


The current Javadoc of {{org.apache.flink.api.common.time.Time}} said it will 
fully replace {{org.apache.flink.streaming.api.windowing.time.Time}} in Flink 
2.0. However, the {{Time}} class has been deprecated, and we should remove the 
description.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34618) Migrate SplitPythonConditionFromJoinRule

2024-03-07 Thread Yongming Zhang (Jira)
Yongming Zhang created FLINK-34618:
--

 Summary: Migrate SplitPythonConditionFromJoinRule
 Key: FLINK-34618
 URL: https://issues.apache.org/jira/browse/FLINK-34618
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: Yongming Zhang
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34619) Do not wait for scaling completion in UPGRADE state with in-place scaling

2024-03-07 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-34619:
--

 Summary: Do not wait for scaling completion in UPGRADE state with 
in-place scaling
 Key: FLINK-34619
 URL: https://issues.apache.org/jira/browse/FLINK-34619
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Gyula Fora
Assignee: Gyula Fora
 Fix For: kubernetes-operator-1.8.0


The operator currently puts the resource into upgrading state after triggering 
in-place scaling and keeps observing until the desired parallelism is reached 
before moving to deployed / stable. 

However this means that due to how the adaptive scheduler works this 
parallelism may never be reached and this is expected.

We should simplify the logic to consider scaling "done" once the resource 
requirements have been set correctly and then leave the rest to the adaptive 
scheduler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34620) Process recursive protobuf schemas in PbToRowTypeUtil

2024-03-07 Thread Artem (Jira)
Artem created FLINK-34620:
-

 Summary: Process recursive protobuf schemas in PbToRowTypeUtil
 Key: FLINK-34620
 URL: https://issues.apache.org/jira/browse/FLINK-34620
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Artem


The current version of _PbToRowTypeUtil.generateRowType_ doesn't support 
recursive protobuf schemas like
message RecursiveMessage \{
   int32 id = 1;
   RecursiveMessage message = 2;
 }
It throws _java.lang.StackOverflowError._


This could be implemented by trimming the recursion. Same approach is used in 
Apache Spark, for example. 
([link|[https://github.com/apache/spark/blob/master/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala#L178])]

 

then output Table schemas for the above proto-schema will be like this:
recursiveFieldMaxDepth=0:  message: ROW< id: INTEGER > 
recursiveFieldMaxDepth=1:  message: ROW< id: INTEGER, message: ROW< id: INTEGER 
> > recursiveFieldMaxDepth=2:  message: ROW< id: INTEGER, message: ROW< id: 
INTEGER, message: ROW< id: INTEGER > > >
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] FLIP-402: Extend ZooKeeper Curator configurations

2024-03-07 Thread Alex Nitavsky
Hi everyone,

I'd like to start a vote on FLIP-402 [1]. It introduces new configuration
options for Apache Flink's ZooKeeper integration for high availability by
reflecting existing Apache Curator configuration options. It has been
discussed in this thread [2].

I would like to start a vote.  The vote will be open for at least 72 hours
(until March 10th 18:00 GMT) unless there is an objection or
insufficient votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-402%3A+Extend+ZooKeeper+Curator+configurations
[2] https://lists.apache.org/thread/gqgs2jlq6bmg211gqtgdn8q5hp5v9l1z

Thanks
Alex


Re: [DISCUSS] FLIP-423 ~FLIP-428: Introduce Disaggregated State Storage and Management in Flink 2.0

2024-03-07 Thread Jing Ge
+1 for Gyula's suggestion. I just finished FLIP-423 which introduced the
intention of the big change and high level architecture. Great content btw!
The only public interface change for this FLIP is one new config to use
ForSt. It makes sense to have one dedicated discussion thread for each
concrete system design.

@Zakelly The links in your mail do not work except the last one, because
the FLIP-xxx has been included into the url like
https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425.

NIT fix:

FLIP-424: https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864

FLIP-425: https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h

FLIP-426: https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf

FLIP-427: https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft

FLIP-428: https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b

Best regards,
Jing




On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan  wrote:

> Hi everyone,
>
> Thank you all for a lively discussion here, and it is a good time to move
> forward to more detailed discussions. Thus we open several threads for
> sub-FLIPs:
>
> FLIP-424: https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> FLIP-425
> :
> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> FLIP-426
> :
> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> FLIP-427
> :
> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> FLIP-428
> :
> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
>
> If you want to talk about the overall architecture, roadmap, milestones or
> something related with multiple FLIPs, please post it here. Otherwise you
> can discuss some details in separate mails. Let's try to avoid repeated
> discussion in different threads. I will sync important messages here if
> there are any in the above threads.
>
> And reply to @Jeyhun: We will ensure the content between those FLIPs is
> consistent.
>
>
> Best,
> Zakelly
>
> On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei  wrote:
>
> > I have been a bit busy these few weeks and sorry for responding late.
> >
> > The original thinking of keeping discussion within one thread is for
> easier
> > tracking and avoid for repeated discussion in different threads.
> >
> > For details, It might be good to start in different threads if needed.
> >
> > We will think of a way to better organize the discussion.
> >
> > Best
> > Yuan
> >
> >
> > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov 
> > wrote:
> >
> > > Hi,
> > >
> > > + 1 for the suggestion.
> > > Maybe we can the discussion with the FLIPs with minimum dependencies
> > (from
> > > the other new/proposed FLIPs).
> > > Based on our discussion on a particular FLIP, the subsequent (or its
> > > dependent) FLIP(s) can be updated accordingly?
> > >
> > > Regards,
> > > Jeyhun
> > >
> > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra 
> wrote:
> > >
> > > > Hey all!
> > > >
> > > > This is a massive improvement / work. I just started going through
> the
> > > > Flips and have a more or less meta comment.
> > > >
> > > > While it's good to keep the overall architecture discussion here, I
> > think
> > > > we should still have separate discussions for each FLIP where we can
> > > > discuss interface details etc. With so much content if we start
> adding
> > > > minor comments here that will lead to nowhere but those discussions
> are
> > > > still important and we should have them in separate threads (one for
> > each
> > > > FLIP)
> > > >
> > > > What do you think?
> > > > Gyula
> > > >
> > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei 
> wrote:
> > > >
> > > > > Hi team,
> > > > >
> > > > > Thanks for your discussion. Regarding FLIP-425, we have
> supplemented
> > > > > several updates to answer high-frequency questions:
> > > > >
> > > > > 1. We captured a flame graph of the Hashmap state backend in
> > > > > "Synchronous execution with asynchronous APIs"[1], which reveals
> that
> > > > > the framework overhead (including reference counting,
> future-related
> > > > > code and so on) consumes about 9% of the keyed operator CPU time.
> > > > > 2. We added a set of comparative experiments for watermark
> > processing,
> > > > > the performance of Out-Of-Order mode is 70% better than
> > > > > strictly-ordered mode under ~140MB state size. Instructions on how
> to
> > > > > run this test have also been added[2].
> > > > > 3. Regarding the order of StreamRecord, whether it has state access
> > or
> > > > > not. We supplemented a new *Strict order of 'processElement'*[3].
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP

Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-07 Thread Jing Ge
Thanks for the clarification. It makes sense to me.

Best regards,
Jing

On Wed, Mar 6, 2024 at 10:03 AM wudi <676366...@qq.com.invalid> wrote:

> Hi Jing Ge, thanks for your suggestions.
>
> 1. Currently, the Flink Doris Connector is compatible with Flink versions
> 1.15-1.18. SupportsCommitter[1] seems to be introduced in Flink 1.19, and
> most users may not have upgraded their Flink environments to that version
> yet. Modifying it now could lead to incompatibilities. I think we can
> postpone the modification and make it together with other connectors. What
> do you think?
>
> 2. Yes, currently DorisSource only supports batch reading, typically used
> for data synchronization and ETL. Streaming reading is not supported yet,
> which requires the capability of Doris Binlog (mentioned in the Doris 2024
> RoadMap[2]). Streaming reading can be used to capture incremental events
> from the database, making it more convenient for users to process real-time
> data newly added to Doris.
>
> 3. E2ECase[3] for DorisSource have been added, and the TestPlan in the
> FLIP has been modified.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
> [2] https://github.com/apache/doris/issues/30669
> [3]
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
>
> Brs,
> di.wu
>
>
> > 2024年3月6日 06:12,Jing Ge  写道:
> >
> > Hi Di,
> >
> > Thanks for your proposal. +1 for the contribution. I'd like to know your
> > thoughts about the following questions:
> >
> > 1. According to your clarification of the exactly-once, thanks for it
> BTW,
> > no PreCommitTopology is required. Does it make sense to let DorisSink[1]
> > implement SupportsCommitter, since the TwoPhaseCommittingSink is
> > deprecated[2] before turning the Doris connector into a Flink connector?
> > 2. OLAP engines are commonly used as the tail/downstream of a data
> pipeline
> > to support further e.g. ad-hoc query or cube with feasible
> pre-aggregation.
> > Just out of curiosity, would you like to share some real use cases that
> > will use OLAP engines as the source of a streaming data pipeline? Or it
> > will only be used as the source for the batch?
> > 3. The E2E test only covered sink[3], if I am not mistaken. Would you
> like
> > to test the source in E2E too?
> >
> > [1]
> >
> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
> > [3]
> >
> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96
> >
> > Best regards,
> > Jing
> >
> > On Tue, Mar 5, 2024 at 11:18 AM wudi <676366...@qq.com.invalid> wrote:
> >
> >> Hi, Jeyhun Karimov.
> >> Thanks for your question.
> >>
> >> - How to ensure Exactly-Once?
> >> 1. When the Checkpoint Barrier arrives, DorisSink will trigger the
> >> precommit api of StreamLoad to complete the persistence of data in Doris
> >> (the data will not be visible at this time), and will also pass this
> TxnID
> >> to the Committer.
> >> 2. When this Checkpoint of the entire Job is completed, the Committer
> will
> >> call the commit api of StreamLoad and commit TxnID to complete the
> >> visibility of the transaction.
> >> 3. When the task is restarted, the Txn with successful precommit and
> >> failed commit will be aborted based on the label-prefix, and Doris'
> abort
> >> API will be called. (At the same time, Doris will also abort
> transactions
> >> that have not been committed for a long time)
> >>
> >> ps: At the same time, this part of the content has been updated in FLIP
> >>
> >> - Because the default table model in Doris is Duplicate (
> >> https://doris.apache.org/docs/data-table/data-model/), which does not
> >> have a primary key, batch writing may cause data duplication, but UNIQ
> The
> >> model has a primary key, which ensures the idempotence of writing, thus
> >> achieving Exactly-Once
> >>
> >> Brs,
> >> di.wu
> >>
> >>
> >>> 2024年3月2日 17:50,Jeyhun Karimov  写道:
> >>>
> >>> Hi,
> >>>
> >>> Thanks for the proposal. +1 for the FLIP.
> >>> I have a few questions:
> >>>
> >>> - How exactly the two (Stream Load's two-phase commit and Flink's
> >> two-phase
> >>> commit) combination will ensure the e2e exactly-once semantics?
> >>>
> >>> - The FLIP proposes to combine Doris's batch writing with the primary
> key
> >>> table to achieve Exactly-Once semantics. Could you elaborate more on
> >> that?
> >>> Why it is not the default behavior but a workaround?
> >>>
> >>> Regards,
> >>> Jeyhun
> >>>
> >>> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv 
> wrote:
> >>>
>  Thank

[jira] [Created] (FLINK-34621) Bump com.google.guava:guava from 31.1-jre to 32.0.0-jre in /flink-connector-hbase-base

2024-03-07 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-34621:
--

 Summary: Bump com.google.guava:guava from 31.1-jre to 32.0.0-jre 
in /flink-connector-hbase-base
 Key: FLINK-34621
 URL: https://issues.apache.org/jira/browse/FLINK-34621
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / HBase
Reporter: Martijn Visser
Assignee: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-07 Thread Jeyhun Karimov
Thanks for the explanation Di Wu.

Regards,
Jeyhun

On Thu, Mar 7, 2024 at 7:48 PM Jing Ge  wrote:

> Thanks for the clarification. It makes sense to me.
>
> Best regards,
> Jing
>
> On Wed, Mar 6, 2024 at 10:03 AM wudi <676366...@qq.com.invalid> wrote:
>
> > Hi Jing Ge, thanks for your suggestions.
> >
> > 1. Currently, the Flink Doris Connector is compatible with Flink versions
> > 1.15-1.18. SupportsCommitter[1] seems to be introduced in Flink 1.19, and
> > most users may not have upgraded their Flink environments to that version
> > yet. Modifying it now could lead to incompatibilities. I think we can
> > postpone the modification and make it together with other connectors.
> What
> > do you think?
> >
> > 2. Yes, currently DorisSource only supports batch reading, typically used
> > for data synchronization and ETL. Streaming reading is not supported yet,
> > which requires the capability of Doris Binlog (mentioned in the Doris
> 2024
> > RoadMap[2]). Streaming reading can be used to capture incremental events
> > from the database, making it more convenient for users to process
> real-time
> > data newly added to Doris.
> >
> > 3. E2ECase[3] for DorisSource have been added, and the TestPlan in the
> > FLIP has been modified.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
> > [2] https://github.com/apache/doris/issues/30669
> > [3]
> >
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java
> >
> > Brs,
> > di.wu
> >
> >
> > > 2024年3月6日 06:12,Jing Ge  写道:
> > >
> > > Hi Di,
> > >
> > > Thanks for your proposal. +1 for the contribution. I'd like to know
> your
> > > thoughts about the following questions:
> > >
> > > 1. According to your clarification of the exactly-once, thanks for it
> > BTW,
> > > no PreCommitTopology is required. Does it make sense to let
> DorisSink[1]
> > > implement SupportsCommitter, since the TwoPhaseCommittingSink is
> > > deprecated[2] before turning the Doris connector into a Flink
> connector?
> > > 2. OLAP engines are commonly used as the tail/downstream of a data
> > pipeline
> > > to support further e.g. ad-hoc query or cube with feasible
> > pre-aggregation.
> > > Just out of curiosity, would you like to share some real use cases that
> > > will use OLAP engines as the source of a streaming data pipeline? Or it
> > > will only be used as the source for the batch?
> > > 3. The E2E test only covered sink[3], if I am not mistaken. Would you
> > like
> > > to test the source in E2E too?
> > >
> > > [1]
> > >
> >
> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
> > > [3]
> > >
> >
> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Tue, Mar 5, 2024 at 11:18 AM wudi <676366...@qq.com.invalid> wrote:
> > >
> > >> Hi, Jeyhun Karimov.
> > >> Thanks for your question.
> > >>
> > >> - How to ensure Exactly-Once?
> > >> 1. When the Checkpoint Barrier arrives, DorisSink will trigger the
> > >> precommit api of StreamLoad to complete the persistence of data in
> Doris
> > >> (the data will not be visible at this time), and will also pass this
> > TxnID
> > >> to the Committer.
> > >> 2. When this Checkpoint of the entire Job is completed, the Committer
> > will
> > >> call the commit api of StreamLoad and commit TxnID to complete the
> > >> visibility of the transaction.
> > >> 3. When the task is restarted, the Txn with successful precommit and
> > >> failed commit will be aborted based on the label-prefix, and Doris'
> > abort
> > >> API will be called. (At the same time, Doris will also abort
> > transactions
> > >> that have not been committed for a long time)
> > >>
> > >> ps: At the same time, this part of the content has been updated in
> FLIP
> > >>
> > >> - Because the default table model in Doris is Duplicate (
> > >> https://doris.apache.org/docs/data-table/data-model/), which does not
> > >> have a primary key, batch writing may cause data duplication, but UNIQ
> > The
> > >> model has a primary key, which ensures the idempotence of writing,
> thus
> > >> achieving Exactly-Once
> > >>
> > >> Brs,
> > >> di.wu
> > >>
> > >>
> > >>> 2024年3月2日 17:50,Jeyhun Karimov  写道:
> > >>>
> > >>> Hi,
> > >>>
> > >>> Thanks for the proposal. +1 for the FLIP.
> > >>> I have a few questions:
> > >>>
> > >>> - How exactly the two (Stream Load's two-phase commit and Flink's
> > >> two-phase
> > >>> commit) combination will ensure the e2e exactly-once se

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread Zakelly Lan
Hi Weijie,

Thanks for your answer! Well I get your point. Since partitions are
first-class citizens, and redistribution means how states migrate when
partitions change, I'd be fine with deemphasizing the concept of
keyed/operator state if we highlight the definition of partition in the
document. Keeping `RedistributionMode` under `StateDeclaration` is also
fine with me, as I guess it is only for internal usage.
But still, from a user's point of view,  state can be characterized along
two relatively independent dimensions, how states redistribute and the data
structure. Thus I still suggest a chained-like configuration API that
configures one aspect on each call, such as:
```
# Keyed stream, no redistribution mode specified, the state will go with
partition (no redistribution).  Keyed state
StateDeclaration a = States.declare(name).listState(type);

# Keyed stream, redistribution strategy specified, the state follows the
specified redistribute strategy.   Operator state
StateDeclaration b =
States.declare(name).listState(type).redistributeBy(strategy);

# Non-keyed stream, redistribution strategy *must be* specified.
StateDeclaration c =
States.declare(name).listState(type).redistributeBy(strategy);

# Broadcast stream and state
StateDeclaration d = States.declare(name).mapState(typeK,
typeV).broadcast();
```
It can drive users to think about redistribution issues when needed. And it
also provides more flexibility to add more combinations such as
broadcasting list state, or chain more configurable aspects such as adding
`withTtl()` in future. WDYT?


Best,
Zakelly

On Thu, Mar 7, 2024 at 6:04 PM weijie guo  wrote:

> Hi Jinzhong,
>
> Thanks for the reply!
>
> > Overall, I think that the “Eager State Declaration” is a good proposal,
> which can enhance Flink's state management capabilities and provide
> possibilities for subsequent state optimizations.
>
> It's nice to see that people who are familiar with the state stuff like
> this proposal. :)
>
> >  When the user attempts to access an undeclared state at runtime, it is
> more reasonable to throw an exception rather than returning Option#empty,
> as Gyula mentioned above.
>
> Yes, I agree that this is better then a confused empty, and I have modified
> the corresponding part of this FLIP.
>
> > In addition, I'm not quite sure whether all of the existing usage in
> which states are registered at runtime dynamically can be migrated to the
> "Eager State Declaration" style with minimal cost?
>
> I think for most user functions, this is fairly straightforward to migrate.
> But states whose declarations depend on runtime information(e.g.
> RuntimeContext) are, in principle, not supported in the new API. Anyway,
> the old and new apis are completely incompatible, so rewriting jobs is
> inevitable. User can think about how to write a good process function that
> conforms to the eager declaration style.
>
> > For state TTL, should StateDeclaration also provide interfaces for users
> to declare state ttl?
>
> Of course, We can and we need to provide this one. But whether or not it's
> in this FLIP isn't very important for me, because we're mainly talking
> about the general principles and ways of declaring and accessing state in
> this FLIP. I promise we won't leave it out in the end D).
>
>
>
> Best regards,
>
> Weijie
>
>
> Jinzhong Li  于2024年3月7日周四 17:34写道:
>
> > Hi Weijie,
> >
> > Thanks for driving this!
> >
> > 1. Overall, I think that the “Eager State Declaration” is a good
> proposal,
> > which can enhance Flink's state management capabilities and provide
> > possibilities for subsequent state optimizations.
> >
> > 2. When the user attempts to access an undeclared state at runtime, it is
> > more reasonable to throw an exception rather than returning Option#empty,
> > as Gyula mentioned above.
> > In addition, I'm not quite sure whether all of the existing usage in
> which
> > states are registered at runtime dynamically can be migrated to the
> "Eager
> > State Declaration" style with minimal cost?
> >
> > 3. For state TTL, should StateDeclaration also provide interfaces for
> users
> > to declare state ttl?
> >
> > Best,
> > Jinzhong Li
> >
> >
> > On Thu, Mar 7, 2024 at 5:08 PM weijie guo 
> > wrote:
> >
> > > Hi Hangxiang,
> > >
> > > Thanks for your reply!
> > >
> > > > We have also discussed in FLIP-359/FLINK-32658 about limiting the
> user
> > > operation to avoid creating state when processElement. Could current
> > > interfaces also help this?
> > >
> > > I think so. It is illegal to create state at runtime in our proposal.
> > >
> > >
> > > > Could you provide more examples about how useStates() works ? Since
> > some
> > > operations may change their used states at runtime, the value this
> method
> > > returns will be modified at runtime, right?
> > >
> > >
> > > No, Introducing and declaring new state
> > > at runtime is something we want to explicitly disallow. You can simply
> > > assume that useState is only called when the JobGra

[jira] [Created] (FLINK-34622) Typo of execution_mode configuration name in Chinese document

2024-03-07 Thread Yu Chen (Jira)
Yu Chen created FLINK-34622:
---

 Summary: Typo of execution_mode configuration name in Chinese 
document
 Key: FLINK-34622
 URL: https://issues.apache.org/jira/browse/FLINK-34622
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Yu Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)