Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-19 Thread weijie guo
Hi everyone,

Thanks for your discussion and feedback!

Our discussions have been going on for a while and there have been no new
concerns for several days. So I would like to start voting recently.

Best regards,

Weijie


Zakelly Lan  于2024年3月12日周二 17:40写道:

> Hi Weijie,
>
> Thanks for your reply!
>
> Overall I'd be fine with the builder pattern, but it is a little bit long
> when carrying explicit 'build()' and declaring the builder. Keeping the
> StateDeclaration immutable is OK, but it is a little bit inconvenient for
> overriding the undefined options by job configuration at runtime. I'd
> suggest providing some methods responsible for rebuilding a new
> StateDeclaration with new configurable options, just like the
> ConfigOptions#defaultValue does. Well, this is just a suggestion, I'm not
> going to insist on it.
>
>
> Best,
> Zakelly
>
> On Tue, Mar 12, 2024 at 2:07 PM weijie guo 
> wrote:
>
> > Hi Zakelly,
> >
> > > But still, from a user's point of view,  state can be characterized
> along
> > two relatively independent dimensions, how states redistribute and the
> data
> > structure. Thus I still suggest a chained-like configuration API that
> > configures one aspect on each call.
> >
> >
> > I think the chained-like style is a good suggestion. But I'm not going to
> > introduce any mutable-like API to StateDeclaration (even though we can
> > achieve immutability by returning a new object). For this reason, I
> decided
> > to use the builder pattern, which also has the benefit of chaining calls
> > and allows us to support further configurations such as setTTL in the
> > future. For ease of use, we'll also provide some shortcuts to avoid
> having
> > to go through a long build chain each time. Of course, I have updated the
> > the FLIP about this part.
> >
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > weijie guo  于2024年3月12日周二 14:00写道:
> >
> > > Hi Hangxiang,
> > >
> > > > So these operators only define all states they may use which could be
> > > explained by the caller, right ?
> > >
> > > Yes, you're right.
> > >
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > weijie guo  于2024年3月12日周二 13:59写道:
> > >
> > >> Hi Max,
> > >>
> > >> > In this thread it looks like the plan is to remove the old state
> > >> declaration API. I think we should consider keeping the old APIs to
> > >> avoid breaking too many jobs.
> > >>
> > >> We're not plan to remove any old apis, which means that changes made
> in
> > >> V2 won't affect any V1 DataStream jobs. But V2 is limited to the new
> > state
> > >> declaration API, and users who want to migrate to DataStream V2 will
> > need
> > >> to rewrite their jobs anyway.
> > >>
> > >> Best regards,
> > >>
> > >> Weijie
> > >>
> > >>
> > >> Hangxiang Yu  于2024年3月12日周二 10:26写道:
> > >>
> > >>> Hi, Weijie.
> > >>> Thanks for your answer!
> > >>>
> > >>> > No, Introducing and declaring new state
> > >>> > at runtime is something we want to explicitly disallow.
> > >>>
> > >>> I just thinked about how some operators define their useState() when
> > >>> their
> > >>> real used states may be changed at runtime, e.g. different state
> types
> > >>> for
> > >>> different state sizes.
> > >>> So these operators only define all states they may use which could be
> > >>> explained by the caller, right ?
> > >>>
> > >>> On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels 
> > >>> wrote:
> > >>>
> > >>> > The FLIP mentions: "The contents described in this FLIP are all new
> > >>> > APIs and do not involve compatibility issues."
> > >>> >
> > >>> > In this thread it looks like the plan is to remove the old state
> > >>> > declaration API. I think we should consider keeping the old APIs to
> > >>> > avoid breaking too many jobs. The new APIs will still be beneficial
> > >>> > for new jobs, e.g. for SQL jobs.
> > >>> >
> > >>> > -Max
> > >>> >
> > >>> > On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan 
> > >>> wrote:
> > >>> > >
> > >>> > > Hi Weijie,
> > >>> > >
> > >>> > > Thanks for your answer! Well I get your point. Since partitions
> are
> > >>> > > first-class citizens, and redistribution means how states migrate
> > >>> when
> > >>> > > partitions change, I'd be fine with deemphasizing the concept of
> > >>> > > keyed/operator state if we highlight the definition of partition
> in
> > >>> the
> > >>> > > document. Keeping `RedistributionMode` under `StateDeclaration`
> is
> > >>> also
> > >>> > > fine with me, as I guess it is only for internal usage.
> > >>> > > But still, from a user's point of view,  state can be
> characterized
> > >>> along
> > >>> > > two relatively independent dimensions, how states redistribute
> and
> > >>> the
> > >>> > data
> > >>> > > structure. Thus I still suggest a chained-like configuration API
> > that
> > >>> > > configures one aspect on each call, such as:
> > >>> > > ```
> > >>> > > # Keyed stream, no redistribution mode specified, the state will
> go
> > >>> with
> > >>> > > partition (no redistribution).  Keyed state
> 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-12 Thread Zakelly Lan
Hi Weijie,

Thanks for your reply!

Overall I'd be fine with the builder pattern, but it is a little bit long
when carrying explicit 'build()' and declaring the builder. Keeping the
StateDeclaration immutable is OK, but it is a little bit inconvenient for
overriding the undefined options by job configuration at runtime. I'd
suggest providing some methods responsible for rebuilding a new
StateDeclaration with new configurable options, just like the
ConfigOptions#defaultValue does. Well, this is just a suggestion, I'm not
going to insist on it.


Best,
Zakelly

On Tue, Mar 12, 2024 at 2:07 PM weijie guo 
wrote:

> Hi Zakelly,
>
> > But still, from a user's point of view,  state can be characterized along
> two relatively independent dimensions, how states redistribute and the data
> structure. Thus I still suggest a chained-like configuration API that
> configures one aspect on each call.
>
>
> I think the chained-like style is a good suggestion. But I'm not going to
> introduce any mutable-like API to StateDeclaration (even though we can
> achieve immutability by returning a new object). For this reason, I decided
> to use the builder pattern, which also has the benefit of chaining calls
> and allows us to support further configurations such as setTTL in the
> future. For ease of use, we'll also provide some shortcuts to avoid having
> to go through a long build chain each time. Of course, I have updated the
> the FLIP about this part.
>
>
>
> Best regards,
>
> Weijie
>
>
> weijie guo  于2024年3月12日周二 14:00写道:
>
> > Hi Hangxiang,
> >
> > > So these operators only define all states they may use which could be
> > explained by the caller, right ?
> >
> > Yes, you're right.
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > weijie guo  于2024年3月12日周二 13:59写道:
> >
> >> Hi Max,
> >>
> >> > In this thread it looks like the plan is to remove the old state
> >> declaration API. I think we should consider keeping the old APIs to
> >> avoid breaking too many jobs.
> >>
> >> We're not plan to remove any old apis, which means that changes made in
> >> V2 won't affect any V1 DataStream jobs. But V2 is limited to the new
> state
> >> declaration API, and users who want to migrate to DataStream V2 will
> need
> >> to rewrite their jobs anyway.
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >> Hangxiang Yu  于2024年3月12日周二 10:26写道:
> >>
> >>> Hi, Weijie.
> >>> Thanks for your answer!
> >>>
> >>> > No, Introducing and declaring new state
> >>> > at runtime is something we want to explicitly disallow.
> >>>
> >>> I just thinked about how some operators define their useState() when
> >>> their
> >>> real used states may be changed at runtime, e.g. different state types
> >>> for
> >>> different state sizes.
> >>> So these operators only define all states they may use which could be
> >>> explained by the caller, right ?
> >>>
> >>> On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels 
> >>> wrote:
> >>>
> >>> > The FLIP mentions: "The contents described in this FLIP are all new
> >>> > APIs and do not involve compatibility issues."
> >>> >
> >>> > In this thread it looks like the plan is to remove the old state
> >>> > declaration API. I think we should consider keeping the old APIs to
> >>> > avoid breaking too many jobs. The new APIs will still be beneficial
> >>> > for new jobs, e.g. for SQL jobs.
> >>> >
> >>> > -Max
> >>> >
> >>> > On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan 
> >>> wrote:
> >>> > >
> >>> > > Hi Weijie,
> >>> > >
> >>> > > Thanks for your answer! Well I get your point. Since partitions are
> >>> > > first-class citizens, and redistribution means how states migrate
> >>> when
> >>> > > partitions change, I'd be fine with deemphasizing the concept of
> >>> > > keyed/operator state if we highlight the definition of partition in
> >>> the
> >>> > > document. Keeping `RedistributionMode` under `StateDeclaration` is
> >>> also
> >>> > > fine with me, as I guess it is only for internal usage.
> >>> > > But still, from a user's point of view,  state can be characterized
> >>> along
> >>> > > two relatively independent dimensions, how states redistribute and
> >>> the
> >>> > data
> >>> > > structure. Thus I still suggest a chained-like configuration API
> that
> >>> > > configures one aspect on each call, such as:
> >>> > > ```
> >>> > > # Keyed stream, no redistribution mode specified, the state will go
> >>> with
> >>> > > partition (no redistribution).  Keyed state
> >>> > > StateDeclaration a = States.declare(name).listState(type);
> >>> > >
> >>> > > # Keyed stream, redistribution strategy specified, the state
> follows
> >>> the
> >>> > > specified redistribute strategy.   Operator state
> >>> > > StateDeclaration b =
> >>> > > States.declare(name).listState(type).redistributeBy(strategy);
> >>> > >
> >>> > > # Non-keyed stream, redistribution strategy *must be* specified.
> >>> > > StateDeclaration c =
> >>> > > States.declare(name).listState(type).redistributeBy(strategy);
> >>> > >
> >>> > > # 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-12 Thread weijie guo
Hi Zakelly,

> But still, from a user's point of view,  state can be characterized along
two relatively independent dimensions, how states redistribute and the data
structure. Thus I still suggest a chained-like configuration API that
configures one aspect on each call.


I think the chained-like style is a good suggestion. But I'm not going to
introduce any mutable-like API to StateDeclaration (even though we can
achieve immutability by returning a new object). For this reason, I decided
to use the builder pattern, which also has the benefit of chaining calls
and allows us to support further configurations such as setTTL in the
future. For ease of use, we'll also provide some shortcuts to avoid having
to go through a long build chain each time. Of course, I have updated the
the FLIP about this part.



Best regards,

Weijie


weijie guo  于2024年3月12日周二 14:00写道:

> Hi Hangxiang,
>
> > So these operators only define all states they may use which could be
> explained by the caller, right ?
>
> Yes, you're right.
>
>
> Best regards,
>
> Weijie
>
>
> weijie guo  于2024年3月12日周二 13:59写道:
>
>> Hi Max,
>>
>> > In this thread it looks like the plan is to remove the old state
>> declaration API. I think we should consider keeping the old APIs to
>> avoid breaking too many jobs.
>>
>> We're not plan to remove any old apis, which means that changes made in
>> V2 won't affect any V1 DataStream jobs. But V2 is limited to the new state
>> declaration API, and users who want to migrate to DataStream V2 will need
>> to rewrite their jobs anyway.
>>
>> Best regards,
>>
>> Weijie
>>
>>
>> Hangxiang Yu  于2024年3月12日周二 10:26写道:
>>
>>> Hi, Weijie.
>>> Thanks for your answer!
>>>
>>> > No, Introducing and declaring new state
>>> > at runtime is something we want to explicitly disallow.
>>>
>>> I just thinked about how some operators define their useState() when
>>> their
>>> real used states may be changed at runtime, e.g. different state types
>>> for
>>> different state sizes.
>>> So these operators only define all states they may use which could be
>>> explained by the caller, right ?
>>>
>>> On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels 
>>> wrote:
>>>
>>> > The FLIP mentions: "The contents described in this FLIP are all new
>>> > APIs and do not involve compatibility issues."
>>> >
>>> > In this thread it looks like the plan is to remove the old state
>>> > declaration API. I think we should consider keeping the old APIs to
>>> > avoid breaking too many jobs. The new APIs will still be beneficial
>>> > for new jobs, e.g. for SQL jobs.
>>> >
>>> > -Max
>>> >
>>> > On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan 
>>> wrote:
>>> > >
>>> > > Hi Weijie,
>>> > >
>>> > > Thanks for your answer! Well I get your point. Since partitions are
>>> > > first-class citizens, and redistribution means how states migrate
>>> when
>>> > > partitions change, I'd be fine with deemphasizing the concept of
>>> > > keyed/operator state if we highlight the definition of partition in
>>> the
>>> > > document. Keeping `RedistributionMode` under `StateDeclaration` is
>>> also
>>> > > fine with me, as I guess it is only for internal usage.
>>> > > But still, from a user's point of view,  state can be characterized
>>> along
>>> > > two relatively independent dimensions, how states redistribute and
>>> the
>>> > data
>>> > > structure. Thus I still suggest a chained-like configuration API that
>>> > > configures one aspect on each call, such as:
>>> > > ```
>>> > > # Keyed stream, no redistribution mode specified, the state will go
>>> with
>>> > > partition (no redistribution).  Keyed state
>>> > > StateDeclaration a = States.declare(name).listState(type);
>>> > >
>>> > > # Keyed stream, redistribution strategy specified, the state follows
>>> the
>>> > > specified redistribute strategy.   Operator state
>>> > > StateDeclaration b =
>>> > > States.declare(name).listState(type).redistributeBy(strategy);
>>> > >
>>> > > # Non-keyed stream, redistribution strategy *must be* specified.
>>> > > StateDeclaration c =
>>> > > States.declare(name).listState(type).redistributeBy(strategy);
>>> > >
>>> > > # Broadcast stream and state
>>> > > StateDeclaration d = States.declare(name).mapState(typeK,
>>> > > typeV).broadcast();
>>> > > ```
>>> > > It can drive users to think about redistribution issues when needed.
>>> And
>>> > it
>>> > > also provides more flexibility to add more combinations such as
>>> > > broadcasting list state, or chain more configurable aspects such as
>>> > adding
>>> > > `withTtl()` in future. WDYT?
>>> > >
>>> > >
>>> > > Best,
>>> > > Zakelly
>>> > >
>>> > > On Thu, Mar 7, 2024 at 6:04 PM weijie guo >> >
>>> > wrote:
>>> > >
>>> > > > Hi Jinzhong,
>>> > > >
>>> > > > Thanks for the reply!
>>> > > >
>>> > > > > Overall, I think that the “Eager State Declaration” is a good
>>> > proposal,
>>> > > > which can enhance Flink's state management capabilities and provide
>>> > > > possibilities for subsequent state optimizations.
>>> > 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-12 Thread weijie guo
Hi Hangxiang,

> So these operators only define all states they may use which could be
explained by the caller, right ?

Yes, you're right.


Best regards,

Weijie


weijie guo  于2024年3月12日周二 13:59写道:

> Hi Max,
>
> > In this thread it looks like the plan is to remove the old state
> declaration API. I think we should consider keeping the old APIs to
> avoid breaking too many jobs.
>
> We're not plan to remove any old apis, which means that changes made in V2
> won't affect any V1 DataStream jobs. But V2 is limited to the new state
> declaration API, and users who want to migrate to DataStream V2 will need
> to rewrite their jobs anyway.
>
> Best regards,
>
> Weijie
>
>
> Hangxiang Yu  于2024年3月12日周二 10:26写道:
>
>> Hi, Weijie.
>> Thanks for your answer!
>>
>> > No, Introducing and declaring new state
>> > at runtime is something we want to explicitly disallow.
>>
>> I just thinked about how some operators define their useState() when their
>> real used states may be changed at runtime, e.g. different state types for
>> different state sizes.
>> So these operators only define all states they may use which could be
>> explained by the caller, right ?
>>
>> On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels 
>> wrote:
>>
>> > The FLIP mentions: "The contents described in this FLIP are all new
>> > APIs and do not involve compatibility issues."
>> >
>> > In this thread it looks like the plan is to remove the old state
>> > declaration API. I think we should consider keeping the old APIs to
>> > avoid breaking too many jobs. The new APIs will still be beneficial
>> > for new jobs, e.g. for SQL jobs.
>> >
>> > -Max
>> >
>> > On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan 
>> wrote:
>> > >
>> > > Hi Weijie,
>> > >
>> > > Thanks for your answer! Well I get your point. Since partitions are
>> > > first-class citizens, and redistribution means how states migrate when
>> > > partitions change, I'd be fine with deemphasizing the concept of
>> > > keyed/operator state if we highlight the definition of partition in
>> the
>> > > document. Keeping `RedistributionMode` under `StateDeclaration` is
>> also
>> > > fine with me, as I guess it is only for internal usage.
>> > > But still, from a user's point of view,  state can be characterized
>> along
>> > > two relatively independent dimensions, how states redistribute and the
>> > data
>> > > structure. Thus I still suggest a chained-like configuration API that
>> > > configures one aspect on each call, such as:
>> > > ```
>> > > # Keyed stream, no redistribution mode specified, the state will go
>> with
>> > > partition (no redistribution).  Keyed state
>> > > StateDeclaration a = States.declare(name).listState(type);
>> > >
>> > > # Keyed stream, redistribution strategy specified, the state follows
>> the
>> > > specified redistribute strategy.   Operator state
>> > > StateDeclaration b =
>> > > States.declare(name).listState(type).redistributeBy(strategy);
>> > >
>> > > # Non-keyed stream, redistribution strategy *must be* specified.
>> > > StateDeclaration c =
>> > > States.declare(name).listState(type).redistributeBy(strategy);
>> > >
>> > > # Broadcast stream and state
>> > > StateDeclaration d = States.declare(name).mapState(typeK,
>> > > typeV).broadcast();
>> > > ```
>> > > It can drive users to think about redistribution issues when needed.
>> And
>> > it
>> > > also provides more flexibility to add more combinations such as
>> > > broadcasting list state, or chain more configurable aspects such as
>> > adding
>> > > `withTtl()` in future. WDYT?
>> > >
>> > >
>> > > Best,
>> > > Zakelly
>> > >
>> > > On Thu, Mar 7, 2024 at 6:04 PM weijie guo 
>> > wrote:
>> > >
>> > > > Hi Jinzhong,
>> > > >
>> > > > Thanks for the reply!
>> > > >
>> > > > > Overall, I think that the “Eager State Declaration” is a good
>> > proposal,
>> > > > which can enhance Flink's state management capabilities and provide
>> > > > possibilities for subsequent state optimizations.
>> > > >
>> > > > It's nice to see that people who are familiar with the state stuff
>> like
>> > > > this proposal. :)
>> > > >
>> > > > >  When the user attempts to access an undeclared state at runtime,
>> it
>> > is
>> > > > more reasonable to throw an exception rather than returning
>> > Option#empty,
>> > > > as Gyula mentioned above.
>> > > >
>> > > > Yes, I agree that this is better then a confused empty, and I have
>> > modified
>> > > > the corresponding part of this FLIP.
>> > > >
>> > > > > In addition, I'm not quite sure whether all of the existing usage
>> in
>> > > > which states are registered at runtime dynamically can be migrated
>> to
>> > the
>> > > > "Eager State Declaration" style with minimal cost?
>> > > >
>> > > > I think for most user functions, this is fairly straightforward to
>> > migrate.
>> > > > But states whose declarations depend on runtime information(e.g.
>> > > > RuntimeContext) are, in principle, not supported in the new API.
>> > Anyway,
>> > > > the old and new apis are 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-12 Thread weijie guo
Hi Max,

> In this thread it looks like the plan is to remove the old state
declaration API. I think we should consider keeping the old APIs to
avoid breaking too many jobs.

We're not plan to remove any old apis, which means that changes made in V2
won't affect any V1 DataStream jobs. But V2 is limited to the new state
declaration API, and users who want to migrate to DataStream V2 will need
to rewrite their jobs anyway.

Best regards,

Weijie


Hangxiang Yu  于2024年3月12日周二 10:26写道:

> Hi, Weijie.
> Thanks for your answer!
>
> > No, Introducing and declaring new state
> > at runtime is something we want to explicitly disallow.
>
> I just thinked about how some operators define their useState() when their
> real used states may be changed at runtime, e.g. different state types for
> different state sizes.
> So these operators only define all states they may use which could be
> explained by the caller, right ?
>
> On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels 
> wrote:
>
> > The FLIP mentions: "The contents described in this FLIP are all new
> > APIs and do not involve compatibility issues."
> >
> > In this thread it looks like the plan is to remove the old state
> > declaration API. I think we should consider keeping the old APIs to
> > avoid breaking too many jobs. The new APIs will still be beneficial
> > for new jobs, e.g. for SQL jobs.
> >
> > -Max
> >
> > On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan 
> wrote:
> > >
> > > Hi Weijie,
> > >
> > > Thanks for your answer! Well I get your point. Since partitions are
> > > first-class citizens, and redistribution means how states migrate when
> > > partitions change, I'd be fine with deemphasizing the concept of
> > > keyed/operator state if we highlight the definition of partition in the
> > > document. Keeping `RedistributionMode` under `StateDeclaration` is also
> > > fine with me, as I guess it is only for internal usage.
> > > But still, from a user's point of view,  state can be characterized
> along
> > > two relatively independent dimensions, how states redistribute and the
> > data
> > > structure. Thus I still suggest a chained-like configuration API that
> > > configures one aspect on each call, such as:
> > > ```
> > > # Keyed stream, no redistribution mode specified, the state will go
> with
> > > partition (no redistribution).  Keyed state
> > > StateDeclaration a = States.declare(name).listState(type);
> > >
> > > # Keyed stream, redistribution strategy specified, the state follows
> the
> > > specified redistribute strategy.   Operator state
> > > StateDeclaration b =
> > > States.declare(name).listState(type).redistributeBy(strategy);
> > >
> > > # Non-keyed stream, redistribution strategy *must be* specified.
> > > StateDeclaration c =
> > > States.declare(name).listState(type).redistributeBy(strategy);
> > >
> > > # Broadcast stream and state
> > > StateDeclaration d = States.declare(name).mapState(typeK,
> > > typeV).broadcast();
> > > ```
> > > It can drive users to think about redistribution issues when needed.
> And
> > it
> > > also provides more flexibility to add more combinations such as
> > > broadcasting list state, or chain more configurable aspects such as
> > adding
> > > `withTtl()` in future. WDYT?
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Thu, Mar 7, 2024 at 6:04 PM weijie guo 
> > wrote:
> > >
> > > > Hi Jinzhong,
> > > >
> > > > Thanks for the reply!
> > > >
> > > > > Overall, I think that the “Eager State Declaration” is a good
> > proposal,
> > > > which can enhance Flink's state management capabilities and provide
> > > > possibilities for subsequent state optimizations.
> > > >
> > > > It's nice to see that people who are familiar with the state stuff
> like
> > > > this proposal. :)
> > > >
> > > > >  When the user attempts to access an undeclared state at runtime,
> it
> > is
> > > > more reasonable to throw an exception rather than returning
> > Option#empty,
> > > > as Gyula mentioned above.
> > > >
> > > > Yes, I agree that this is better then a confused empty, and I have
> > modified
> > > > the corresponding part of this FLIP.
> > > >
> > > > > In addition, I'm not quite sure whether all of the existing usage
> in
> > > > which states are registered at runtime dynamically can be migrated to
> > the
> > > > "Eager State Declaration" style with minimal cost?
> > > >
> > > > I think for most user functions, this is fairly straightforward to
> > migrate.
> > > > But states whose declarations depend on runtime information(e.g.
> > > > RuntimeContext) are, in principle, not supported in the new API.
> > Anyway,
> > > > the old and new apis are completely incompatible, so rewriting jobs
> is
> > > > inevitable. User can think about how to write a good process function
> > that
> > > > conforms to the eager declaration style.
> > > >
> > > > > For state TTL, should StateDeclaration also provide interfaces for
> > users
> > > > to declare state ttl?
> > > >
> > > > Of course, We can and we need 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-11 Thread Hangxiang Yu
Hi, Weijie.
Thanks for your answer!

> No, Introducing and declaring new state
> at runtime is something we want to explicitly disallow.

I just thinked about how some operators define their useState() when their
real used states may be changed at runtime, e.g. different state types for
different state sizes.
So these operators only define all states they may use which could be
explained by the caller, right ?

On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels  wrote:

> The FLIP mentions: "The contents described in this FLIP are all new
> APIs and do not involve compatibility issues."
>
> In this thread it looks like the plan is to remove the old state
> declaration API. I think we should consider keeping the old APIs to
> avoid breaking too many jobs. The new APIs will still be beneficial
> for new jobs, e.g. for SQL jobs.
>
> -Max
>
> On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan  wrote:
> >
> > Hi Weijie,
> >
> > Thanks for your answer! Well I get your point. Since partitions are
> > first-class citizens, and redistribution means how states migrate when
> > partitions change, I'd be fine with deemphasizing the concept of
> > keyed/operator state if we highlight the definition of partition in the
> > document. Keeping `RedistributionMode` under `StateDeclaration` is also
> > fine with me, as I guess it is only for internal usage.
> > But still, from a user's point of view,  state can be characterized along
> > two relatively independent dimensions, how states redistribute and the
> data
> > structure. Thus I still suggest a chained-like configuration API that
> > configures one aspect on each call, such as:
> > ```
> > # Keyed stream, no redistribution mode specified, the state will go with
> > partition (no redistribution).  Keyed state
> > StateDeclaration a = States.declare(name).listState(type);
> >
> > # Keyed stream, redistribution strategy specified, the state follows the
> > specified redistribute strategy.   Operator state
> > StateDeclaration b =
> > States.declare(name).listState(type).redistributeBy(strategy);
> >
> > # Non-keyed stream, redistribution strategy *must be* specified.
> > StateDeclaration c =
> > States.declare(name).listState(type).redistributeBy(strategy);
> >
> > # Broadcast stream and state
> > StateDeclaration d = States.declare(name).mapState(typeK,
> > typeV).broadcast();
> > ```
> > It can drive users to think about redistribution issues when needed. And
> it
> > also provides more flexibility to add more combinations such as
> > broadcasting list state, or chain more configurable aspects such as
> adding
> > `withTtl()` in future. WDYT?
> >
> >
> > Best,
> > Zakelly
> >
> > On Thu, Mar 7, 2024 at 6:04 PM weijie guo 
> wrote:
> >
> > > Hi Jinzhong,
> > >
> > > Thanks for the reply!
> > >
> > > > Overall, I think that the “Eager State Declaration” is a good
> proposal,
> > > which can enhance Flink's state management capabilities and provide
> > > possibilities for subsequent state optimizations.
> > >
> > > It's nice to see that people who are familiar with the state stuff like
> > > this proposal. :)
> > >
> > > >  When the user attempts to access an undeclared state at runtime, it
> is
> > > more reasonable to throw an exception rather than returning
> Option#empty,
> > > as Gyula mentioned above.
> > >
> > > Yes, I agree that this is better then a confused empty, and I have
> modified
> > > the corresponding part of this FLIP.
> > >
> > > > In addition, I'm not quite sure whether all of the existing usage in
> > > which states are registered at runtime dynamically can be migrated to
> the
> > > "Eager State Declaration" style with minimal cost?
> > >
> > > I think for most user functions, this is fairly straightforward to
> migrate.
> > > But states whose declarations depend on runtime information(e.g.
> > > RuntimeContext) are, in principle, not supported in the new API.
> Anyway,
> > > the old and new apis are completely incompatible, so rewriting jobs is
> > > inevitable. User can think about how to write a good process function
> that
> > > conforms to the eager declaration style.
> > >
> > > > For state TTL, should StateDeclaration also provide interfaces for
> users
> > > to declare state ttl?
> > >
> > > Of course, We can and we need to provide this one. But whether or not
> it's
> > > in this FLIP isn't very important for me, because we're mainly talking
> > > about the general principles and ways of declaring and accessing state
> in
> > > this FLIP. I promise we won't leave it out in the end D).
> > >
> > >
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Jinzhong Li  于2024年3月7日周四 17:34写道:
> > >
> > > > Hi Weijie,
> > > >
> > > > Thanks for driving this!
> > > >
> > > > 1. Overall, I think that the “Eager State Declaration” is a good
> > > proposal,
> > > > which can enhance Flink's state management capabilities and provide
> > > > possibilities for subsequent state optimizations.
> > > >
> > > > 2. When the user attempts to access an 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-11 Thread Maximilian Michels
The FLIP mentions: "The contents described in this FLIP are all new
APIs and do not involve compatibility issues."

In this thread it looks like the plan is to remove the old state
declaration API. I think we should consider keeping the old APIs to
avoid breaking too many jobs. The new APIs will still be beneficial
for new jobs, e.g. for SQL jobs.

-Max

On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan  wrote:
>
> Hi Weijie,
>
> Thanks for your answer! Well I get your point. Since partitions are
> first-class citizens, and redistribution means how states migrate when
> partitions change, I'd be fine with deemphasizing the concept of
> keyed/operator state if we highlight the definition of partition in the
> document. Keeping `RedistributionMode` under `StateDeclaration` is also
> fine with me, as I guess it is only for internal usage.
> But still, from a user's point of view,  state can be characterized along
> two relatively independent dimensions, how states redistribute and the data
> structure. Thus I still suggest a chained-like configuration API that
> configures one aspect on each call, such as:
> ```
> # Keyed stream, no redistribution mode specified, the state will go with
> partition (no redistribution).  Keyed state
> StateDeclaration a = States.declare(name).listState(type);
>
> # Keyed stream, redistribution strategy specified, the state follows the
> specified redistribute strategy.   Operator state
> StateDeclaration b =
> States.declare(name).listState(type).redistributeBy(strategy);
>
> # Non-keyed stream, redistribution strategy *must be* specified.
> StateDeclaration c =
> States.declare(name).listState(type).redistributeBy(strategy);
>
> # Broadcast stream and state
> StateDeclaration d = States.declare(name).mapState(typeK,
> typeV).broadcast();
> ```
> It can drive users to think about redistribution issues when needed. And it
> also provides more flexibility to add more combinations such as
> broadcasting list state, or chain more configurable aspects such as adding
> `withTtl()` in future. WDYT?
>
>
> Best,
> Zakelly
>
> On Thu, Mar 7, 2024 at 6:04 PM weijie guo  wrote:
>
> > Hi Jinzhong,
> >
> > Thanks for the reply!
> >
> > > Overall, I think that the “Eager State Declaration” is a good proposal,
> > which can enhance Flink's state management capabilities and provide
> > possibilities for subsequent state optimizations.
> >
> > It's nice to see that people who are familiar with the state stuff like
> > this proposal. :)
> >
> > >  When the user attempts to access an undeclared state at runtime, it is
> > more reasonable to throw an exception rather than returning Option#empty,
> > as Gyula mentioned above.
> >
> > Yes, I agree that this is better then a confused empty, and I have modified
> > the corresponding part of this FLIP.
> >
> > > In addition, I'm not quite sure whether all of the existing usage in
> > which states are registered at runtime dynamically can be migrated to the
> > "Eager State Declaration" style with minimal cost?
> >
> > I think for most user functions, this is fairly straightforward to migrate.
> > But states whose declarations depend on runtime information(e.g.
> > RuntimeContext) are, in principle, not supported in the new API. Anyway,
> > the old and new apis are completely incompatible, so rewriting jobs is
> > inevitable. User can think about how to write a good process function that
> > conforms to the eager declaration style.
> >
> > > For state TTL, should StateDeclaration also provide interfaces for users
> > to declare state ttl?
> >
> > Of course, We can and we need to provide this one. But whether or not it's
> > in this FLIP isn't very important for me, because we're mainly talking
> > about the general principles and ways of declaring and accessing state in
> > this FLIP. I promise we won't leave it out in the end D).
> >
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Jinzhong Li  于2024年3月7日周四 17:34写道:
> >
> > > Hi Weijie,
> > >
> > > Thanks for driving this!
> > >
> > > 1. Overall, I think that the “Eager State Declaration” is a good
> > proposal,
> > > which can enhance Flink's state management capabilities and provide
> > > possibilities for subsequent state optimizations.
> > >
> > > 2. When the user attempts to access an undeclared state at runtime, it is
> > > more reasonable to throw an exception rather than returning Option#empty,
> > > as Gyula mentioned above.
> > > In addition, I'm not quite sure whether all of the existing usage in
> > which
> > > states are registered at runtime dynamically can be migrated to the
> > "Eager
> > > State Declaration" style with minimal cost?
> > >
> > > 3. For state TTL, should StateDeclaration also provide interfaces for
> > users
> > > to declare state ttl?
> > >
> > > Best,
> > > Jinzhong Li
> > >
> > >
> > > On Thu, Mar 7, 2024 at 5:08 PM weijie guo 
> > > wrote:
> > >
> > > > Hi Hangxiang,
> > > >
> > > > Thanks for your reply!
> > > >
> > > > > We have also discussed in 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread Zakelly Lan
Hi Weijie,

Thanks for your answer! Well I get your point. Since partitions are
first-class citizens, and redistribution means how states migrate when
partitions change, I'd be fine with deemphasizing the concept of
keyed/operator state if we highlight the definition of partition in the
document. Keeping `RedistributionMode` under `StateDeclaration` is also
fine with me, as I guess it is only for internal usage.
But still, from a user's point of view,  state can be characterized along
two relatively independent dimensions, how states redistribute and the data
structure. Thus I still suggest a chained-like configuration API that
configures one aspect on each call, such as:
```
# Keyed stream, no redistribution mode specified, the state will go with
partition (no redistribution).  Keyed state
StateDeclaration a = States.declare(name).listState(type);

# Keyed stream, redistribution strategy specified, the state follows the
specified redistribute strategy.   Operator state
StateDeclaration b =
States.declare(name).listState(type).redistributeBy(strategy);

# Non-keyed stream, redistribution strategy *must be* specified.
StateDeclaration c =
States.declare(name).listState(type).redistributeBy(strategy);

# Broadcast stream and state
StateDeclaration d = States.declare(name).mapState(typeK,
typeV).broadcast();
```
It can drive users to think about redistribution issues when needed. And it
also provides more flexibility to add more combinations such as
broadcasting list state, or chain more configurable aspects such as adding
`withTtl()` in future. WDYT?


Best,
Zakelly

On Thu, Mar 7, 2024 at 6:04 PM weijie guo  wrote:

> Hi Jinzhong,
>
> Thanks for the reply!
>
> > Overall, I think that the “Eager State Declaration” is a good proposal,
> which can enhance Flink's state management capabilities and provide
> possibilities for subsequent state optimizations.
>
> It's nice to see that people who are familiar with the state stuff like
> this proposal. :)
>
> >  When the user attempts to access an undeclared state at runtime, it is
> more reasonable to throw an exception rather than returning Option#empty,
> as Gyula mentioned above.
>
> Yes, I agree that this is better then a confused empty, and I have modified
> the corresponding part of this FLIP.
>
> > In addition, I'm not quite sure whether all of the existing usage in
> which states are registered at runtime dynamically can be migrated to the
> "Eager State Declaration" style with minimal cost?
>
> I think for most user functions, this is fairly straightforward to migrate.
> But states whose declarations depend on runtime information(e.g.
> RuntimeContext) are, in principle, not supported in the new API. Anyway,
> the old and new apis are completely incompatible, so rewriting jobs is
> inevitable. User can think about how to write a good process function that
> conforms to the eager declaration style.
>
> > For state TTL, should StateDeclaration also provide interfaces for users
> to declare state ttl?
>
> Of course, We can and we need to provide this one. But whether or not it's
> in this FLIP isn't very important for me, because we're mainly talking
> about the general principles and ways of declaring and accessing state in
> this FLIP. I promise we won't leave it out in the end D).
>
>
>
> Best regards,
>
> Weijie
>
>
> Jinzhong Li  于2024年3月7日周四 17:34写道:
>
> > Hi Weijie,
> >
> > Thanks for driving this!
> >
> > 1. Overall, I think that the “Eager State Declaration” is a good
> proposal,
> > which can enhance Flink's state management capabilities and provide
> > possibilities for subsequent state optimizations.
> >
> > 2. When the user attempts to access an undeclared state at runtime, it is
> > more reasonable to throw an exception rather than returning Option#empty,
> > as Gyula mentioned above.
> > In addition, I'm not quite sure whether all of the existing usage in
> which
> > states are registered at runtime dynamically can be migrated to the
> "Eager
> > State Declaration" style with minimal cost?
> >
> > 3. For state TTL, should StateDeclaration also provide interfaces for
> users
> > to declare state ttl?
> >
> > Best,
> > Jinzhong Li
> >
> >
> > On Thu, Mar 7, 2024 at 5:08 PM weijie guo 
> > wrote:
> >
> > > Hi Hangxiang,
> > >
> > > Thanks for your reply!
> > >
> > > > We have also discussed in FLIP-359/FLINK-32658 about limiting the
> user
> > > operation to avoid creating state when processElement. Could current
> > > interfaces also help this?
> > >
> > > I think so. It is illegal to create state at runtime in our proposal.
> > >
> > >
> > > > Could you provide more examples about how useStates() works ? Since
> > some
> > > operations may change their used states at runtime, the value this
> method
> > > returns will be modified at runtime, right?
> > >
> > >
> > > No, Introducing and declaring new state
> > > at runtime is something we want to explicitly disallow. You can simply
> > > assume that useState is only called when the 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread weijie guo
Hi Jinzhong,

Thanks for the reply!

> Overall, I think that the “Eager State Declaration” is a good proposal,
which can enhance Flink's state management capabilities and provide
possibilities for subsequent state optimizations.

It's nice to see that people who are familiar with the state stuff like
this proposal. :)

>  When the user attempts to access an undeclared state at runtime, it is
more reasonable to throw an exception rather than returning Option#empty,
as Gyula mentioned above.

Yes, I agree that this is better then a confused empty, and I have modified
the corresponding part of this FLIP.

> In addition, I'm not quite sure whether all of the existing usage in
which states are registered at runtime dynamically can be migrated to the
"Eager State Declaration" style with minimal cost?

I think for most user functions, this is fairly straightforward to migrate.
But states whose declarations depend on runtime information(e.g.
RuntimeContext) are, in principle, not supported in the new API. Anyway,
the old and new apis are completely incompatible, so rewriting jobs is
inevitable. User can think about how to write a good process function that
conforms to the eager declaration style.

> For state TTL, should StateDeclaration also provide interfaces for users
to declare state ttl?

Of course, We can and we need to provide this one. But whether or not it's
in this FLIP isn't very important for me, because we're mainly talking
about the general principles and ways of declaring and accessing state in
this FLIP. I promise we won't leave it out in the end D).



Best regards,

Weijie


Jinzhong Li  于2024年3月7日周四 17:34写道:

> Hi Weijie,
>
> Thanks for driving this!
>
> 1. Overall, I think that the “Eager State Declaration” is a good proposal,
> which can enhance Flink's state management capabilities and provide
> possibilities for subsequent state optimizations.
>
> 2. When the user attempts to access an undeclared state at runtime, it is
> more reasonable to throw an exception rather than returning Option#empty,
> as Gyula mentioned above.
> In addition, I'm not quite sure whether all of the existing usage in which
> states are registered at runtime dynamically can be migrated to the "Eager
> State Declaration" style with minimal cost?
>
> 3. For state TTL, should StateDeclaration also provide interfaces for users
> to declare state ttl?
>
> Best,
> Jinzhong Li
>
>
> On Thu, Mar 7, 2024 at 5:08 PM weijie guo 
> wrote:
>
> > Hi Hangxiang,
> >
> > Thanks for your reply!
> >
> > > We have also discussed in FLIP-359/FLINK-32658 about limiting the user
> > operation to avoid creating state when processElement. Could current
> > interfaces also help this?
> >
> > I think so. It is illegal to create state at runtime in our proposal.
> >
> >
> > > Could you provide more examples about how useStates() works ? Since
> some
> > operations may change their used states at runtime, the value this method
> > returns will be modified at runtime, right?
> >
> >
> > No, Introducing and declaring new state
> > at runtime is something we want to explicitly disallow. You can simply
> > assume that useState is only called when the JobGraph is generated,
> > and any future changes to it are invalid and illegal.
> >
> >
> > > IIUC, RedistributionMode/Strategy should not be used by users, right ?
> > If so, I'm +1 to move them to inner interfaces which seems a bit
> confusing
> > to users.
> >
> >
> > As for this question, I think my answer to Zakelly should be helpful.
> >
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > weijie guo  于2024年3月7日周四 16:58写道:
> >
> > > Hi Zakelly,
> > >
> > > Thanks for your reply!
> > >
> > > > My advice would be to conceal RedistributionMode/Strategy from the
> > > standard user interface, particularly within the helper class 'State'.
> > But
> > > I'm OK to keep it in `StateDeclaration` since its interfaces are
> > basically
> > > used by the framework.
> > >
> > > I'm sorry, I didn't mention some of the details/concepts introduced by
> > the Umbrella FLIP and FLIP-409 in this FLIP. This might make it hard to
> > understand the motivation behind
> > > RedistributionMode, I'll add more context in FLIP then.
> > >
> > >
> > >
> > > Briefly, in V2, we explicitly define the concept of partition. In the
> > case of KeyedPartitionStream, one key corresponds to one partition.For
> > NonKeyedPartitionStream one parallelism/subtask corresponds to one
> > partition. All states are considered to be confined within the partition.
> > On this basis, an obvious question is whether and how the state should be
> > redistribution when the partition changes? So we divide the state into
> > three categories:
> > >
> > >- Don't need to redistribute states when the partition changes.
> > >- Has to decide how to distribute states when the partition changes.
> > >- Always has the same state across different partitions.
> > >
> > > After introducing the concept of partition, the redistribution
> > 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread Jinzhong Li
Hi Weijie,

Thanks for driving this!

1. Overall, I think that the “Eager State Declaration” is a good proposal,
which can enhance Flink's state management capabilities and provide
possibilities for subsequent state optimizations.

2. When the user attempts to access an undeclared state at runtime, it is
more reasonable to throw an exception rather than returning Option#empty,
as Gyula mentioned above.
In addition, I'm not quite sure whether all of the existing usage in which
states are registered at runtime dynamically can be migrated to the "Eager
State Declaration" style with minimal cost?

3. For state TTL, should StateDeclaration also provide interfaces for users
to declare state ttl?

Best,
Jinzhong Li


On Thu, Mar 7, 2024 at 5:08 PM weijie guo  wrote:

> Hi Hangxiang,
>
> Thanks for your reply!
>
> > We have also discussed in FLIP-359/FLINK-32658 about limiting the user
> operation to avoid creating state when processElement. Could current
> interfaces also help this?
>
> I think so. It is illegal to create state at runtime in our proposal.
>
>
> > Could you provide more examples about how useStates() works ? Since some
> operations may change their used states at runtime, the value this method
> returns will be modified at runtime, right?
>
>
> No, Introducing and declaring new state
> at runtime is something we want to explicitly disallow. You can simply
> assume that useState is only called when the JobGraph is generated,
> and any future changes to it are invalid and illegal.
>
>
> > IIUC, RedistributionMode/Strategy should not be used by users, right ?
> If so, I'm +1 to move them to inner interfaces which seems a bit confusing
> to users.
>
>
> As for this question, I think my answer to Zakelly should be helpful.
>
>
>
> Best regards,
>
> Weijie
>
>
> weijie guo  于2024年3月7日周四 16:58写道:
>
> > Hi Zakelly,
> >
> > Thanks for your reply!
> >
> > > My advice would be to conceal RedistributionMode/Strategy from the
> > standard user interface, particularly within the helper class 'State'.
> But
> > I'm OK to keep it in `StateDeclaration` since its interfaces are
> basically
> > used by the framework.
> >
> > I'm sorry, I didn't mention some of the details/concepts introduced by
> the Umbrella FLIP and FLIP-409 in this FLIP. This might make it hard to
> understand the motivation behind
> > RedistributionMode, I'll add more context in FLIP then.
> >
> >
> >
> > Briefly, in V2, we explicitly define the concept of partition. In the
> case of KeyedPartitionStream, one key corresponds to one partition.For
> NonKeyedPartitionStream one parallelism/subtask corresponds to one
> partition. All states are considered to be confined within the partition.
> On this basis, an obvious question is whether and how the state should be
> redistribution when the partition changes? So we divide the state into
> three categories:
> >
> >- Don't need to redistribute states when the partition changes.
> >- Has to decide how to distribute states when the partition changes.
> >- Always has the same state across different partitions.
> >
> > After introducing the concept of partition, the redistribution
> pattern/mode of state is the more essential difference between states. For
> this reason, we don't want to emphasize keyed/operator state in the V2 API
> > any
> > more. Keep in mind, partition are first-class citizens. And, even in V1,
> we have to let the user know that split/union are two different strategies
> for list state.
> >
> >
> >
> > As for whether or not to expose RedistributionMode to users, I have an
> open mind. But as I said just now, we still can't avoid this problem in the
> splitRedistributionListState and unionRedistributionListState. IMO, it's
> better to explain it in the API level instead of avoiding it. WDTY?
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > weijie guo  于2024年3月7日周四 16:39写道:
> >
> >> Hi Gyula,
> >>
> >>
> >> Thanks for your reply!
> >>
> >>
> >> Let me answer these questions:
> >>
> >>
> >> > What is the semantics of the usesStates method? When is it called? Can
> >> the used state change dynamically at runtime? Can the logic depend on
> something computed in open(..) for example?
> >>
> >>
> >>
> >> useStates is used to predefine all the states that the process function
> needs to access. In other words, we want to avoid declaring the state
> dynamically at runtime and this allows the SQL planner and JM to optimize
> the job better. As a result, this logic must be fully available at compile
> time (when the JobGraph is generated), so it can't rely on computations
> that are executed after deploy to TM.
> >>
> >>
> >> >
> >> Currently state access is pretty dynamic in Flink and I would assume
> many jobs create states on the fly based on some required logic. Are we
> planning to address these use-cases?
> >>
> >>
> >> It depends on what type of context we need. If the type and number of
> states depend on runtime context, that's something we want to avoid. If it
> only 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread weijie guo
Hi Hangxiang,

Thanks for your reply!

> We have also discussed in FLIP-359/FLINK-32658 about limiting the user
operation to avoid creating state when processElement. Could current
interfaces also help this?

I think so. It is illegal to create state at runtime in our proposal.


> Could you provide more examples about how useStates() works ? Since some
operations may change their used states at runtime, the value this method
returns will be modified at runtime, right?


No, Introducing and declaring new state
at runtime is something we want to explicitly disallow. You can simply
assume that useState is only called when the JobGraph is generated,
and any future changes to it are invalid and illegal.


> IIUC, RedistributionMode/Strategy should not be used by users, right ?
If so, I'm +1 to move them to inner interfaces which seems a bit confusing
to users.


As for this question, I think my answer to Zakelly should be helpful.



Best regards,

Weijie


weijie guo  于2024年3月7日周四 16:58写道:

> Hi Zakelly,
>
> Thanks for your reply!
>
> > My advice would be to conceal RedistributionMode/Strategy from the
> standard user interface, particularly within the helper class 'State'. But
> I'm OK to keep it in `StateDeclaration` since its interfaces are basically
> used by the framework.
>
> I'm sorry, I didn't mention some of the details/concepts introduced by the 
> Umbrella FLIP and FLIP-409 in this FLIP. This might make it hard to 
> understand the motivation behind
> RedistributionMode, I'll add more context in FLIP then.
>
>
>
> Briefly, in V2, we explicitly define the concept of partition. In the case of 
> KeyedPartitionStream, one key corresponds to one partition.For 
> NonKeyedPartitionStream one parallelism/subtask corresponds to one partition. 
> All states are considered to be confined within the partition. On this basis, 
> an obvious question is whether and how the state should be redistribution 
> when the partition changes? So we divide the state into three categories:
>
>- Don't need to redistribute states when the partition changes.
>- Has to decide how to distribute states when the partition changes.
>- Always has the same state across different partitions.
>
> After introducing the concept of partition, the redistribution pattern/mode 
> of state is the more essential difference between states. For this reason, we 
> don't want to emphasize keyed/operator state in the V2 API
> any
> more. Keep in mind, partition are first-class citizens. And, even in V1, we 
> have to let the user know that split/union are two different strategies for 
> list state.
>
>
>
> As for whether or not to expose RedistributionMode to users, I have an open 
> mind. But as I said just now, we still can't avoid this problem in the 
> splitRedistributionListState and unionRedistributionListState. IMO, it's 
> better to explain it in the API level instead of avoiding it. WDTY?
>
> Best regards,
>
> Weijie
>
>
> weijie guo  于2024年3月7日周四 16:39写道:
>
>> Hi Gyula,
>>
>>
>> Thanks for your reply!
>>
>>
>> Let me answer these questions:
>>
>>
>> > What is the semantics of the usesStates method? When is it called? Can
>> the used state change dynamically at runtime? Can the logic depend on 
>> something computed in open(..) for example?
>>
>>
>>
>> useStates is used to predefine all the states that the process function 
>> needs to access. In other words, we want to avoid declaring the state 
>> dynamically at runtime and this allows the SQL planner and JM to optimize 
>> the job better. As a result, this logic must be fully available at compile 
>> time (when the JobGraph is generated), so it can't rely on computations that 
>> are executed after deploy to TM.
>>
>>
>> >
>> Currently state access is pretty dynamic in Flink and I would assume many 
>> jobs create states on the fly based on some required logic. Are we planning 
>> to address these use-cases?
>>
>>
>> It depends on what type of context we need. If the type and number of states 
>> depend on runtime context, that's something we want to avoid. If it only 
>> depended on information available at compile time, I think we could support
>> it.
>>
>>
>> >
>> Are we planning to support deleting/dropping states that are not required 
>> anymore?
>>
>>
>>
>> We really don't want the user to be able to dynamically declare/delete a 
>> state at runtime, but if you just want to clear/clean the value of state, 
>> the new API works the same as the old API.
>>
>>
>> > I think if a state is not declared or otherwise cannot be accessed, an
>> exceptions must be thrown. We cannot confuse empty value with something
>> inaccessible.
>>
>>
>> After thinking about it a bit more, I think you have a point!
>> It's important to make a clear distinction between an empty state and 
>> illegal access, especially since flink currently discourage setting a 
>> non-null default value for the state.
>> I will modify the proposal as you suggested then :)
>>
>>
>> > The RedistributionMode 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread weijie guo
Hi Zakelly,

Thanks for your reply!

> My advice would be to conceal RedistributionMode/Strategy from the
standard user interface, particularly within the helper class 'State'. But
I'm OK to keep it in `StateDeclaration` since its interfaces are basically
used by the framework.

I'm sorry, I didn't mention some of the details/concepts introduced by
the Umbrella FLIP and FLIP-409 in this FLIP. This might make it hard
to understand the motivation behind
RedistributionMode, I'll add more context in FLIP then.


Briefly, in V2, we explicitly define the concept of partition. In the
case of KeyedPartitionStream, one key corresponds to one partition.For
NonKeyedPartitionStream one parallelism/subtask corresponds to one
partition. All states are considered to be confined within the
partition. On this basis, an obvious question is whether and how the
state should be redistribution when the partition changes? So we
divide the state into three categories:

   - Don't need to redistribute states when the partition changes.
   - Has to decide how to distribute states when the partition changes.
   - Always has the same state across different partitions.

After introducing the concept of partition, the redistribution
pattern/mode of state is the more essential difference between states.
For this reason, we don't want to emphasize keyed/operator state in
the V2 API
any
more. Keep in mind, partition are first-class citizens. And, even in
V1, we have to let the user know that split/union are two different
strategies for list state.


As for whether or not to expose RedistributionMode to users, I have an
open mind. But as I said just now, we still can't avoid this problem
in the splitRedistributionListState and unionRedistributionListState.
IMO, it's better to explain it in the API level instead of avoiding
it. WDTY?

Best regards,

Weijie


weijie guo  于2024年3月7日周四 16:39写道:

> Hi Gyula,
>
>
> Thanks for your reply!
>
>
> Let me answer these questions:
>
>
> > What is the semantics of the usesStates method? When is it called? Can
> the used state change dynamically at runtime? Can the logic depend on 
> something computed in open(..) for example?
>
>
>
> useStates is used to predefine all the states that the process function needs 
> to access. In other words, we want to avoid declaring the state dynamically 
> at runtime and this allows the SQL planner and JM to optimize the job better. 
> As a result, this logic must be fully available at compile time (when the 
> JobGraph is generated), so it can't rely on computations that are executed 
> after deploy to TM.
>
>
> >
> Currently state access is pretty dynamic in Flink and I would assume many 
> jobs create states on the fly based on some required logic. Are we planning 
> to address these use-cases?
>
>
> It depends on what type of context we need. If the type and number of states 
> depend on runtime context, that's something we want to avoid. If it only 
> depended on information available at compile time, I think we could support
> it.
>
>
> >
> Are we planning to support deleting/dropping states that are not required 
> anymore?
>
>
>
> We really don't want the user to be able to dynamically declare/delete a 
> state at runtime, but if you just want to clear/clean the value of state, the 
> new API works the same as the old API.
>
>
> > I think if a state is not declared or otherwise cannot be accessed, an
> exceptions must be thrown. We cannot confuse empty value with something
> inaccessible.
>
>
> After thinking about it a bit more, I think you have a point!
> It's important to make a clear distinction between an empty state and illegal 
> access, especially since flink currently discourage setting a non-null 
> default value for the state.
> I will modify the proposal as you suggested then :)
>
>
> > The RedistributionMode enum sounds a bit strange to me, as it doesn't
> actually specify a mode of redistribution. It feels more like a flag. Can
> we simply have an Optional instead?
>
>
> We actually define three types RedistributionMode instead of two because
> we don't want to think of IDENTICAL as a redistribution strategy, it's just
> an invariant: the State of that type is always the same across partitions.
> If it only has None and REDISTRIBUTABLE, I think your proposal is
> feasible then. But we don't want to confuse these three semantics/modes.
>
>
> > BroadcastStates are currently very limited by only Map-like states, and
> the new interface also enforces that. Can we remove this limitation? If
> not, should broadcastState declaration extend mapstate declaration?
>
>
>
> Personally, I don't want to make this restriction. This is also why the 
> method in StateManager to get BroadcastState has the parameter of 
> BroadcastStateDeclaration instead of MapStateDeclaration. In the future, if 
> the state backend supports other types of broadcast state, we can add a 
> corresponding method to the States utility class to get the 
> BroadcastSateDeclaration.
>
>

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-07 Thread weijie guo
Hi Gyula,


Thanks for your reply!


Let me answer these questions:


> What is the semantics of the usesStates method? When is it called? Can
the used state change dynamically at runtime? Can the logic depend on
something computed in open(..) for example?


useStates is used to predefine all the states that the process
function needs to access. In other words, we want to avoid declaring
the state dynamically at runtime and this allows the SQL planner and
JM to optimize the job better. As a result, this logic must be fully
available at compile time (when the JobGraph is generated), so it
can't rely on computations that are executed after deploy to TM.


>
Currently state access is pretty dynamic in Flink and I would assume
many jobs create states on the fly based on some required logic. Are
we planning to address these use-cases?


It depends on what type of context we need. If the type and number of
states depend on runtime context, that's something we want to avoid.
If it only depended on information available at compile time, I think
we could support
it.


>
Are we planning to support deleting/dropping states that are not
required anymore?


We really don't want the user to be able to dynamically declare/delete
a state at runtime, but if you just want to clear/clean the value of
state, the new API works the same as the old API.


> I think if a state is not declared or otherwise cannot be accessed, an
exceptions must be thrown. We cannot confuse empty value with something
inaccessible.


After thinking about it a bit more, I think you have a point!
It's important to make a clear distinction between an empty state and
illegal access, especially since flink currently discourage setting a
non-null default value for the state.
I will modify the proposal as you suggested then :)


> The RedistributionMode enum sounds a bit strange to me, as it doesn't
actually specify a mode of redistribution. It feels more like a flag. Can
we simply have an Optional instead?


We actually define three types RedistributionMode instead of two because we
don't want to think of IDENTICAL as a redistribution strategy, it's just an
invariant: the State of that type is always the same across partitions. If
it only has None and REDISTRIBUTABLE, I think your proposal is feasible
then. But we don't want to confuse these three semantics/modes.


> BroadcastStates are currently very limited by only Map-like states, and
the new interface also enforces that. Can we remove this limitation? If
not, should broadcastState declaration extend mapstate declaration?


Personally, I don't want to make this restriction. This is also why
the method in StateManager to get BroadcastState has the parameter of
BroadcastStateDeclaration instead of MapStateDeclaration. In the
future, if the state backend supports other types of broadcast state,
we can add a corresponding method to the States utility class to get
the BroadcastSateDeclaration.



Best regards,

Weijie


Hangxiang Yu  于2024年3月7日周四 11:55写道:

> Hi, Weijie.
> Thanks for your proposal.
> I'd like to start the discussion with some questions:
> 1. We have also discussed in FLIP-359/FLINK-32658 about limiting the user
> operation to avoid creating state when processElement. Could current
> interfaces also help this?
>
> 2. Could you provide more examples about how useStates() works ? Since some
> operations may change their used states at runtime, the value this method
> returns will be modified at runtime, right ?
> If so, I'm thinking if we could get some deterministic State Declaration
> Set before running which could help a lot for some state operations e.g.
> pre-check schema compatibility, queryable schema.
>
> 3. IIUC, RedistributionMode/Strategy should not be used by users, right ?
> If so, I'm +1 to move them to inner interfaces which seems a bit confusing
> to users.
>
> On Thu, Mar 7, 2024 at 11:39 AM Zakelly Lan  wrote:
>
> > Hi Weijie,
> >
> > Thanks for proposing this!
> >
> > Unifying and optimizing state definitions is a very good thing. I like
> the
> > idea of 'definition goes before using', so overall +1 for this proposal.
> >
> > However, I think the current definition is somewhat unclear. From a
> user's
> > point of view, I believe that state can be characterized along two
> > relatively independent axes: the scenario (keyed, non-keyed, or
> broadcast)
> > and the data structure (single value, list, map). I recommend that we
> fully
> > decouple these aspects, rather than linking the nature of the definition
> to
> > specific assumptions, such as equating broadcast states with maps, or
> > considering list states could be non-keyed.
> > Furthermore, the concept of 'Redistribution' may impose a cognitive
> burden
> > on general users. My advice would be to conceal
> RedistributionMode/Strategy
> > from the standard user interface, particularly within the helper class
> > 'State'. But I'm OK to keep it in `StateDeclaration` since its interfaces
> > are basically used by the 

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-06 Thread Hangxiang Yu
Hi, Weijie.
Thanks for your proposal.
I'd like to start the discussion with some questions:
1. We have also discussed in FLIP-359/FLINK-32658 about limiting the user
operation to avoid creating state when processElement. Could current
interfaces also help this?

2. Could you provide more examples about how useStates() works ? Since some
operations may change their used states at runtime, the value this method
returns will be modified at runtime, right ?
If so, I'm thinking if we could get some deterministic State Declaration
Set before running which could help a lot for some state operations e.g.
pre-check schema compatibility, queryable schema.

3. IIUC, RedistributionMode/Strategy should not be used by users, right ?
If so, I'm +1 to move them to inner interfaces which seems a bit confusing
to users.

On Thu, Mar 7, 2024 at 11:39 AM Zakelly Lan  wrote:

> Hi Weijie,
>
> Thanks for proposing this!
>
> Unifying and optimizing state definitions is a very good thing. I like the
> idea of 'definition goes before using', so overall +1 for this proposal.
>
> However, I think the current definition is somewhat unclear. From a user's
> point of view, I believe that state can be characterized along two
> relatively independent axes: the scenario (keyed, non-keyed, or broadcast)
> and the data structure (single value, list, map). I recommend that we fully
> decouple these aspects, rather than linking the nature of the definition to
> specific assumptions, such as equating broadcast states with maps, or
> considering list states could be non-keyed.
> Furthermore, the concept of 'Redistribution' may impose a cognitive burden
> on general users. My advice would be to conceal RedistributionMode/Strategy
> from the standard user interface, particularly within the helper class
> 'State'. But I'm OK to keep it in `StateDeclaration` since its interfaces
> are basically used by the framework. My preferred syntax would be:
> ```
> StateDeclaration a = State.declare(name).keyed().listState(type);
> StateDeclaration b = State.declare(name).broadcast().mapState(typeK,
> typeV);
> StateDeclaration c = State.declare(name).keyed().aggregatingState(type,
> function);
> ```
> WDYT?
>
>
> Best,
> Zakelly
>
> On Wed, Mar 6, 2024 at 11:04 PM Gyula Fóra  wrote:
>
> > Hi Weijie!
> >
> > Thank you for the proposal.
> >
> > I have some initial questions to start the discussion:
> >
> > 1. What is the semantics of the usesStates method? When is it called? Can
> > the used state change dynamically at runtime? Can the logic depend on
> > something computed in open(..) for example?
> >
> > Currently state access is pretty dynamic in Flink and I would assume many
> > jobs create states on the fly based on some required logic. Are we
> planning
> > to address these use-cases?
> >
> > Are we planning to support deleting/dropping states that are not required
> > anymore?
> >
> > 2. Get state now returns an optional, but you mention that:
> > " If you want to get a state that is not declared or has no access,
> > Option#empty is returned."
> >
> > I think if a state is not declared or otherwise cannot be accessed, an
> > exceptions must be thrown. We cannot confuse empty value with something
> > inaccessible.
> >
> > 3. The RedistributionMode enum sounds a bit strange to me, as it doesn't
> > actually specify a mode of redistribution. It feels more like a flag. Can
> > we simply have an Optional instead?
> >
> > 4. BroadcastStates are currently very limited by only Map-like states,
> and
> > the new interface also enforces that.
> > Can we remove this limitation? If not, should broadcastState declaration
> > extend mapstate declaration?
> >
> > Cheers,
> > Gyula
> >
> > Cheers
> > Gyuka
> >
> > On Wed, Mar 6, 2024 at 11:18 AM weijie guo 
> > wrote:
> >
> > > Hi devs,
> > >
> > > I'd like to start a discussion about FLIP-433: State Access on
> > > DataStream API V2
> > > [1]. This is the third sub-FLIP of DataStream API V2.
> > >
> > >
> > > After FLIP-410 [2], we can already write a simple stateless job using
> the
> > > DataStream V2 API.  But as we all know, stateful computing is Flink's
> > trump
> > > card. In this FLIP, we will discuss how to declare and access state on
> > > DataStream API V2 and we manage to avoid some of the shortcomings of V1
> > in
> > > this regard.
> > >
> > > You can find more details in this FLIP. Its relationship with other
> > > sub-FLIPs can be found in the umbrella FLIP
> > > [3]. Looking forward to hearing from you, thanks!
> > >
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2
> > >
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2
> > >
> > > [3]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
> > >

Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-06 Thread Zakelly Lan
Hi Weijie,

Thanks for proposing this!

Unifying and optimizing state definitions is a very good thing. I like the
idea of 'definition goes before using', so overall +1 for this proposal.

However, I think the current definition is somewhat unclear. From a user's
point of view, I believe that state can be characterized along two
relatively independent axes: the scenario (keyed, non-keyed, or broadcast)
and the data structure (single value, list, map). I recommend that we fully
decouple these aspects, rather than linking the nature of the definition to
specific assumptions, such as equating broadcast states with maps, or
considering list states could be non-keyed.
Furthermore, the concept of 'Redistribution' may impose a cognitive burden
on general users. My advice would be to conceal RedistributionMode/Strategy
from the standard user interface, particularly within the helper class
'State'. But I'm OK to keep it in `StateDeclaration` since its interfaces
are basically used by the framework. My preferred syntax would be:
```
StateDeclaration a = State.declare(name).keyed().listState(type);
StateDeclaration b = State.declare(name).broadcast().mapState(typeK, typeV);
StateDeclaration c = State.declare(name).keyed().aggregatingState(type,
function);
```
WDYT?


Best,
Zakelly

On Wed, Mar 6, 2024 at 11:04 PM Gyula Fóra  wrote:

> Hi Weijie!
>
> Thank you for the proposal.
>
> I have some initial questions to start the discussion:
>
> 1. What is the semantics of the usesStates method? When is it called? Can
> the used state change dynamically at runtime? Can the logic depend on
> something computed in open(..) for example?
>
> Currently state access is pretty dynamic in Flink and I would assume many
> jobs create states on the fly based on some required logic. Are we planning
> to address these use-cases?
>
> Are we planning to support deleting/dropping states that are not required
> anymore?
>
> 2. Get state now returns an optional, but you mention that:
> " If you want to get a state that is not declared or has no access,
> Option#empty is returned."
>
> I think if a state is not declared or otherwise cannot be accessed, an
> exceptions must be thrown. We cannot confuse empty value with something
> inaccessible.
>
> 3. The RedistributionMode enum sounds a bit strange to me, as it doesn't
> actually specify a mode of redistribution. It feels more like a flag. Can
> we simply have an Optional instead?
>
> 4. BroadcastStates are currently very limited by only Map-like states, and
> the new interface also enforces that.
> Can we remove this limitation? If not, should broadcastState declaration
> extend mapstate declaration?
>
> Cheers,
> Gyula
>
> Cheers
> Gyuka
>
> On Wed, Mar 6, 2024 at 11:18 AM weijie guo 
> wrote:
>
> > Hi devs,
> >
> > I'd like to start a discussion about FLIP-433: State Access on
> > DataStream API V2
> > [1]. This is the third sub-FLIP of DataStream API V2.
> >
> >
> > After FLIP-410 [2], we can already write a simple stateless job using the
> > DataStream V2 API.  But as we all know, stateful computing is Flink's
> trump
> > card. In this FLIP, we will discuss how to declare and access state on
> > DataStream API V2 and we manage to avoid some of the shortcomings of V1
> in
> > this regard.
> >
> > You can find more details in this FLIP. Its relationship with other
> > sub-FLIPs can be found in the umbrella FLIP
> > [3]. Looking forward to hearing from you, thanks!
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2
> >
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2
> >
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
> >
>


Re: [DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-06 Thread Gyula Fóra
Hi Weijie!

Thank you for the proposal.

I have some initial questions to start the discussion:

1. What is the semantics of the usesStates method? When is it called? Can
the used state change dynamically at runtime? Can the logic depend on
something computed in open(..) for example?

Currently state access is pretty dynamic in Flink and I would assume many
jobs create states on the fly based on some required logic. Are we planning
to address these use-cases?

Are we planning to support deleting/dropping states that are not required
anymore?

2. Get state now returns an optional, but you mention that:
" If you want to get a state that is not declared or has no access,
Option#empty is returned."

I think if a state is not declared or otherwise cannot be accessed, an
exceptions must be thrown. We cannot confuse empty value with something
inaccessible.

3. The RedistributionMode enum sounds a bit strange to me, as it doesn't
actually specify a mode of redistribution. It feels more like a flag. Can
we simply have an Optional instead?

4. BroadcastStates are currently very limited by only Map-like states, and
the new interface also enforces that.
Can we remove this limitation? If not, should broadcastState declaration
extend mapstate declaration?

Cheers,
Gyula

Cheers
Gyuka

On Wed, Mar 6, 2024 at 11:18 AM weijie guo 
wrote:

> Hi devs,
>
> I'd like to start a discussion about FLIP-433: State Access on
> DataStream API V2
> [1]. This is the third sub-FLIP of DataStream API V2.
>
>
> After FLIP-410 [2], we can already write a simple stateless job using the
> DataStream V2 API.  But as we all know, stateful computing is Flink's trump
> card. In this FLIP, we will discuss how to declare and access state on
> DataStream API V2 and we manage to avoid some of the shortcomings of V1 in
> this regard.
>
> You can find more details in this FLIP. Its relationship with other
> sub-FLIPs can be found in the umbrella FLIP
> [3]. Looking forward to hearing from you, thanks!
>
>
> Best regards,
>
> Weijie
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2
>
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2
>
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
>


[DISCUSS] FLIP-433: State Access on DataStream API V2

2024-03-06 Thread weijie guo
Hi devs,

I'd like to start a discussion about FLIP-433: State Access on
DataStream API V2
[1]. This is the third sub-FLIP of DataStream API V2.


After FLIP-410 [2], we can already write a simple stateless job using the
DataStream V2 API.  But as we all know, stateful computing is Flink's trump
card. In this FLIP, we will discuss how to declare and access state on
DataStream API V2 and we manage to avoid some of the shortcomings of V1 in
this regard.

You can find more details in this FLIP. Its relationship with other
sub-FLIPs can be found in the umbrella FLIP
[3]. Looking forward to hearing from you, thanks!


Best regards,

Weijie


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2

[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2

[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2