Hi everyone,

Thanks for your discussion and feedback!

Our discussions have been going on for a while and there have been no new
concerns for several days. So I would like to start voting recently.

Best regards,

Weijie


Zakelly Lan <zakelly....@gmail.com> 于2024年3月12日周二 17:40写道:

> Hi Weijie,
>
> Thanks for your reply!
>
> Overall I'd be fine with the builder pattern, but it is a little bit long
> when carrying explicit 'build()' and declaring the builder. Keeping the
> StateDeclaration immutable is OK, but it is a little bit inconvenient for
> overriding the undefined options by job configuration at runtime. I'd
> suggest providing some methods responsible for rebuilding a new
> StateDeclaration with new configurable options, just like the
> ConfigOptions#defaultValue does. Well, this is just a suggestion, I'm not
> going to insist on it.
>
>
> Best,
> Zakelly
>
> On Tue, Mar 12, 2024 at 2:07 PM weijie guo <guoweijieres...@gmail.com>
> wrote:
>
> > Hi Zakelly,
> >
> > > But still, from a user's point of view,  state can be characterized
> along
> > two relatively independent dimensions, how states redistribute and the
> data
> > structure. Thus I still suggest a chained-like configuration API that
> > configures one aspect on each call.
> >
> >
> > I think the chained-like style is a good suggestion. But I'm not going to
> > introduce any mutable-like API to StateDeclaration (even though we can
> > achieve immutability by returning a new object). For this reason, I
> decided
> > to use the builder pattern, which also has the benefit of chaining calls
> > and allows us to support further configurations such as setTTL in the
> > future. For ease of use, we'll also provide some shortcuts to avoid
> having
> > to go through a long build chain each time. Of course, I have updated the
> > the FLIP about this part.
> >
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > weijie guo <guoweijieres...@gmail.com> 于2024年3月12日周二 14:00写道:
> >
> > > Hi Hangxiang,
> > >
> > > > So these operators only define all states they may use which could be
> > > explained by the caller, right ?
> > >
> > > Yes, you're right.
> > >
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > weijie guo <guoweijieres...@gmail.com> 于2024年3月12日周二 13:59写道:
> > >
> > >> Hi Max,
> > >>
> > >> > In this thread it looks like the plan is to remove the old state
> > >> declaration API. I think we should consider keeping the old APIs to
> > >> avoid breaking too many jobs.
> > >>
> > >> We're not plan to remove any old apis, which means that changes made
> in
> > >> V2 won't affect any V1 DataStream jobs. But V2 is limited to the new
> > state
> > >> declaration API, and users who want to migrate to DataStream V2 will
> > need
> > >> to rewrite their jobs anyway.
> > >>
> > >> Best regards,
> > >>
> > >> Weijie
> > >>
> > >>
> > >> Hangxiang Yu <master...@gmail.com> 于2024年3月12日周二 10:26写道:
> > >>
> > >>> Hi, Weijie.
> > >>> Thanks for your answer!
> > >>>
> > >>> > No, Introducing and declaring new state
> > >>> > at runtime is something we want to explicitly disallow.
> > >>>
> > >>> I just thinked about how some operators define their useState() when
> > >>> their
> > >>> real used states may be changed at runtime, e.g. different state
> types
> > >>> for
> > >>> different state sizes.
> > >>> So these operators only define all states they may use which could be
> > >>> explained by the caller, right ?
> > >>>
> > >>> On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels <m...@apache.org>
> > >>> wrote:
> > >>>
> > >>> > The FLIP mentions: "The contents described in this FLIP are all new
> > >>> > APIs and do not involve compatibility issues."
> > >>> >
> > >>> > In this thread it looks like the plan is to remove the old state
> > >>> > declaration API. I think we should consider keeping the old APIs to
> > >>> > avoid breaking too many jobs. The new APIs will still be beneficial
> > >>> > for new jobs, e.g. for SQL jobs.
> > >>> >
> > >>> > -Max
> > >>> >
> > >>> > On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan <zakelly....@gmail.com>
> > >>> wrote:
> > >>> > >
> > >>> > > Hi Weijie,
> > >>> > >
> > >>> > > Thanks for your answer! Well I get your point. Since partitions
> are
> > >>> > > first-class citizens, and redistribution means how states migrate
> > >>> when
> > >>> > > partitions change, I'd be fine with deemphasizing the concept of
> > >>> > > keyed/operator state if we highlight the definition of partition
> in
> > >>> the
> > >>> > > document. Keeping `RedistributionMode` under `StateDeclaration`
> is
> > >>> also
> > >>> > > fine with me, as I guess it is only for internal usage.
> > >>> > > But still, from a user's point of view,  state can be
> characterized
> > >>> along
> > >>> > > two relatively independent dimensions, how states redistribute
> and
> > >>> the
> > >>> > data
> > >>> > > structure. Thus I still suggest a chained-like configuration API
> > that
> > >>> > > configures one aspect on each call, such as:
> > >>> > > ```
> > >>> > > # Keyed stream, no redistribution mode specified, the state will
> go
> > >>> with
> > >>> > > partition (no redistribution). ---- Keyed state
> > >>> > > StateDeclaration a = States.declare(name).listState(type);
> > >>> > >
> > >>> > > # Keyed stream, redistribution strategy specified, the state
> > follows
> > >>> the
> > >>> > > specified redistribute strategy.  ---- Operator state
> > >>> > > StateDeclaration b =
> > >>> > > States.declare(name).listState(type).redistributeBy(strategy);
> > >>> > >
> > >>> > > # Non-keyed stream, redistribution strategy *must be* specified.
> > >>> > > StateDeclaration c =
> > >>> > > States.declare(name).listState(type).redistributeBy(strategy);
> > >>> > >
> > >>> > > # Broadcast stream and state
> > >>> > > StateDeclaration d = States.declare(name).mapState(typeK,
> > >>> > > typeV).broadcast();
> > >>> > > ```
> > >>> > > It can drive users to think about redistribution issues when
> > needed.
> > >>> And
> > >>> > it
> > >>> > > also provides more flexibility to add more combinations such as
> > >>> > > broadcasting list state, or chain more configurable aspects such
> as
> > >>> > adding
> > >>> > > `withTtl()` in future. WDYT?
> > >>> > >
> > >>> > >
> > >>> > > Best,
> > >>> > > Zakelly
> > >>> > >
> > >>> > > On Thu, Mar 7, 2024 at 6:04 PM weijie guo <
> > guoweijieres...@gmail.com
> > >>> >
> > >>> > wrote:
> > >>> > >
> > >>> > > > Hi Jinzhong,
> > >>> > > >
> > >>> > > > Thanks for the reply!
> > >>> > > >
> > >>> > > > > Overall, I think that the “Eager State Declaration” is a good
> > >>> > proposal,
> > >>> > > > which can enhance Flink's state management capabilities and
> > provide
> > >>> > > > possibilities for subsequent state optimizations.
> > >>> > > >
> > >>> > > > It's nice to see that people who are familiar with the state
> > stuff
> > >>> like
> > >>> > > > this proposal. :)
> > >>> > > >
> > >>> > > > >  When the user attempts to access an undeclared state at
> > >>> runtime, it
> > >>> > is
> > >>> > > > more reasonable to throw an exception rather than returning
> > >>> > Option#empty,
> > >>> > > > as Gyula mentioned above.
> > >>> > > >
> > >>> > > > Yes, I agree that this is better then a confused empty, and I
> > have
> > >>> > modified
> > >>> > > > the corresponding part of this FLIP.
> > >>> > > >
> > >>> > > > > In addition, I'm not quite sure whether all of the existing
> > >>> usage in
> > >>> > > > which states are registered at runtime dynamically can be
> > migrated
> > >>> to
> > >>> > the
> > >>> > > > "Eager State Declaration" style with minimal cost?
> > >>> > > >
> > >>> > > > I think for most user functions, this is fairly straightforward
> > to
> > >>> > migrate.
> > >>> > > > But states whose declarations depend on runtime
> information(e.g.
> > >>> > > > RuntimeContext) are, in principle, not supported in the new
> API.
> > >>> > Anyway,
> > >>> > > > the old and new apis are completely incompatible, so rewriting
> > >>> jobs is
> > >>> > > > inevitable. User can think about how to write a good process
> > >>> function
> > >>> > that
> > >>> > > > conforms to the eager declaration style.
> > >>> > > >
> > >>> > > > > For state TTL, should StateDeclaration also provide
> interfaces
> > >>> for
> > >>> > users
> > >>> > > > to declare state ttl?
> > >>> > > >
> > >>> > > > Of course, We can and we need to provide this one. But whether
> or
> > >>> not
> > >>> > it's
> > >>> > > > in this FLIP isn't very important for me, because we're mainly
> > >>> talking
> > >>> > > > about the general principles and ways of declaring and
> accessing
> > >>> state
> > >>> > in
> > >>> > > > this FLIP. I promise we won't leave it out in the end D).
> > >>> > > >
> > >>> > > >
> > >>> > > >
> > >>> > > > Best regards,
> > >>> > > >
> > >>> > > > Weijie
> > >>> > > >
> > >>> > > >
> > >>> > > > Jinzhong Li <lijinzhong2...@gmail.com> 于2024年3月7日周四 17:34写道:
> > >>> > > >
> > >>> > > > > Hi Weijie,
> > >>> > > > >
> > >>> > > > > Thanks for driving this!
> > >>> > > > >
> > >>> > > > > 1. Overall, I think that the “Eager State Declaration” is a
> > good
> > >>> > > > proposal,
> > >>> > > > > which can enhance Flink's state management capabilities and
> > >>> provide
> > >>> > > > > possibilities for subsequent state optimizations.
> > >>> > > > >
> > >>> > > > > 2. When the user attempts to access an undeclared state at
> > >>> runtime,
> > >>> > it is
> > >>> > > > > more reasonable to throw an exception rather than returning
> > >>> > Option#empty,
> > >>> > > > > as Gyula mentioned above.
> > >>> > > > > In addition, I'm not quite sure whether all of the existing
> > >>> usage in
> > >>> > > > which
> > >>> > > > > states are registered at runtime dynamically can be migrated
> to
> > >>> the
> > >>> > > > "Eager
> > >>> > > > > State Declaration" style with minimal cost?
> > >>> > > > >
> > >>> > > > > 3. For state TTL, should StateDeclaration also provide
> > >>> interfaces for
> > >>> > > > users
> > >>> > > > > to declare state ttl?
> > >>> > > > >
> > >>> > > > > Best,
> > >>> > > > > Jinzhong Li
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > On Thu, Mar 7, 2024 at 5:08 PM weijie guo <
> > >>> guoweijieres...@gmail.com
> > >>> > >
> > >>> > > > > wrote:
> > >>> > > > >
> > >>> > > > > > Hi Hangxiang,
> > >>> > > > > >
> > >>> > > > > > Thanks for your reply!
> > >>> > > > > >
> > >>> > > > > > > We have also discussed in FLIP-359/FLINK-32658 about
> > >>> limiting the
> > >>> > > > user
> > >>> > > > > > operation to avoid creating state when processElement.
> Could
> > >>> > current
> > >>> > > > > > interfaces also help this?
> > >>> > > > > >
> > >>> > > > > > I think so. It is illegal to create state at runtime in our
> > >>> > proposal.
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > > Could you provide more examples about how useStates()
> > works ?
> > >>> > Since
> > >>> > > > > some
> > >>> > > > > > operations may change their used states at runtime, the
> value
> > >>> this
> > >>> > > > method
> > >>> > > > > > returns will be modified at runtime, right?
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > No, Introducing and declaring new state
> > >>> > > > > > at runtime is something we want to explicitly disallow. You
> > can
> > >>> > simply
> > >>> > > > > > assume that useState is only called when the JobGraph is
> > >>> generated,
> > >>> > > > > > and any future changes to it are invalid and illegal.
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > > IIUC, RedistributionMode/Strategy should not be used by
> > >>> users,
> > >>> > right
> > >>> > > > ?
> > >>> > > > > > If so, I'm +1 to move them to inner interfaces which seems
> a
> > >>> bit
> > >>> > > > > confusing
> > >>> > > > > > to users.
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > As for this question, I think my answer to Zakelly should
> be
> > >>> > helpful.
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > Best regards,
> > >>> > > > > >
> > >>> > > > > > Weijie
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月7日周四
> 16:58写道:
> > >>> > > > > >
> > >>> > > > > > > Hi Zakelly,
> > >>> > > > > > >
> > >>> > > > > > > Thanks for your reply!
> > >>> > > > > > >
> > >>> > > > > > > > My advice would be to conceal
> RedistributionMode/Strategy
> > >>> from
> > >>> > the
> > >>> > > > > > > standard user interface, particularly within the helper
> > class
> > >>> > > > 'State'.
> > >>> > > > > > But
> > >>> > > > > > > I'm OK to keep it in `StateDeclaration` since its
> > interfaces
> > >>> are
> > >>> > > > > > basically
> > >>> > > > > > > used by the framework.
> > >>> > > > > > >
> > >>> > > > > > > I'm sorry, I didn't mention some of the details/concepts
> > >>> > introduced
> > >>> > > > by
> > >>> > > > > > the Umbrella FLIP and FLIP-409 in this FLIP. This might
> make
> > it
> > >>> > hard to
> > >>> > > > > > understand the motivation behind
> > >>> > > > > > > RedistributionMode, I'll add more context in FLIP then.
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > Briefly, in V2, we explicitly define the concept of
> > >>> partition.
> > >>> > In the
> > >>> > > > > > case of KeyedPartitionStream, one key corresponds to one
> > >>> > partition.For
> > >>> > > > > > NonKeyedPartitionStream one parallelism/subtask corresponds
> > to
> > >>> one
> > >>> > > > > > partition. All states are considered to be confined within
> > the
> > >>> > > > partition.
> > >>> > > > > > On this basis, an obvious question is whether and how the
> > state
> > >>> > should
> > >>> > > > be
> > >>> > > > > > redistribution when the partition changes? So we divide the
> > >>> state
> > >>> > into
> > >>> > > > > > three categories:
> > >>> > > > > > >
> > >>> > > > > > >    - Don't need to redistribute states when the partition
> > >>> > changes.
> > >>> > > > > > >    - Has to decide how to distribute states when the
> > >>> partition
> > >>> > > > changes.
> > >>> > > > > > >    - Always has the same state across different
> partitions.
> > >>> > > > > > >
> > >>> > > > > > > After introducing the concept of partition, the
> > >>> redistribution
> > >>> > > > > > pattern/mode of state is the more essential difference
> > between
> > >>> > states.
> > >>> > > > > For
> > >>> > > > > > this reason, we don't want to emphasize keyed/operator
> state
> > in
> > >>> > the V2
> > >>> > > > > API
> > >>> > > > > > > any
> > >>> > > > > > > more. Keep in mind, partition are first-class citizens.
> > And,
> > >>> > even in
> > >>> > > > > V1,
> > >>> > > > > > we have to let the user know that split/union are two
> > different
> > >>> > > > > strategies
> > >>> > > > > > for list state.
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > As for whether or not to expose RedistributionMode to
> > users,
> > >>> I
> > >>> > have
> > >>> > > > an
> > >>> > > > > > open mind. But as I said just now, we still can't avoid
> this
> > >>> > problem in
> > >>> > > > > the
> > >>> > > > > > splitRedistributionListState and
> > unionRedistributionListState.
> > >>> IMO,
> > >>> > > > it's
> > >>> > > > > > better to explain it in the API level instead of avoiding
> it.
> > >>> WDTY?
> > >>> > > > > > >
> > >>> > > > > > > Best regards,
> > >>> > > > > > >
> > >>> > > > > > > Weijie
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月7日周四
> > 16:39写道:
> > >>> > > > > > >
> > >>> > > > > > >> Hi Gyula,
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> Thanks for your reply!
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> Let me answer these questions:
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> > What is the semantics of the usesStates method? When
> is
> > it
> > >>> > called?
> > >>> > > > > Can
> > >>> > > > > > >> the used state change dynamically at runtime? Can the
> > logic
> > >>> > depend
> > >>> > > > on
> > >>> > > > > > something computed in open(..) for example?
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> useStates is used to predefine all the states that the
> > >>> process
> > >>> > > > > function
> > >>> > > > > > needs to access. In other words, we want to avoid declaring
> > the
> > >>> > state
> > >>> > > > > > dynamically at runtime and this allows the SQL planner and
> JM
> > >>> to
> > >>> > > > optimize
> > >>> > > > > > the job better. As a result, this logic must be fully
> > >>> available at
> > >>> > > > > compile
> > >>> > > > > > time (when the JobGraph is generated), so it can't rely on
> > >>> > computations
> > >>> > > > > > that are executed after deploy to TM.
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> >
> > >>> > > > > > >> Currently state access is pretty dynamic in Flink and I
> > >>> would
> > >>> > assume
> > >>> > > > > > many jobs create states on the fly based on some required
> > >>> logic.
> > >>> > Are we
> > >>> > > > > > planning to address these use-cases?
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> It depends on what type of context we need. If the type
> > and
> > >>> > number
> > >>> > > > of
> > >>> > > > > > states depend on runtime context, that's something we want
> to
> > >>> > avoid. If
> > >>> > > > > it
> > >>> > > > > > only depended on information available at compile time, I
> > >>> think we
> > >>> > > > could
> > >>> > > > > > support
> > >>> > > > > > >> it.
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> >
> > >>> > > > > > >> Are we planning to support deleting/dropping states that
> > >>> are not
> > >>> > > > > > required anymore?
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> We really don't want the user to be able to dynamically
> > >>> > > > declare/delete
> > >>> > > > > > a state at runtime, but if you just want to clear/clean the
> > >>> value
> > >>> > of
> > >>> > > > > state,
> > >>> > > > > > the new API works the same as the old API.
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> > I think if a state is not declared or otherwise cannot
> > be
> > >>> > > > accessed,
> > >>> > > > > an
> > >>> > > > > > >> exceptions must be thrown. We cannot confuse empty value
> > >>> with
> > >>> > > > > something
> > >>> > > > > > >> inaccessible.
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> After thinking about it a bit more, I think you have a
> > >>> point!
> > >>> > > > > > >> It's important to make a clear distinction between an
> > empty
> > >>> > state
> > >>> > > > and
> > >>> > > > > > illegal access, especially since flink currently discourage
> > >>> > setting a
> > >>> > > > > > non-null default value for the state.
> > >>> > > > > > >> I will modify the proposal as you suggested then :)
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> > The RedistributionMode enum sounds a bit strange to
> me,
> > >>> as it
> > >>> > > > > doesn't
> > >>> > > > > > >> actually specify a mode of redistribution. It feels more
> > >>> like a
> > >>> > > > flag.
> > >>> > > > > > Can
> > >>> > > > > > >> we simply have an Optional<RedistributionStrategy>
> > instead?
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> We actually define three types RedistributionMode
> instead
> > >>> of two
> > >>> > > > > because
> > >>> > > > > > >> we don't want to think of IDENTICAL as a redistribution
> > >>> > strategy,
> > >>> > > > it's
> > >>> > > > > > just
> > >>> > > > > > >> an invariant: the State of that type is always the same
> > >>> across
> > >>> > > > > > partitions.
> > >>> > > > > > >> If it only has None and REDISTRIBUTABLE, I think your
> > >>> proposal
> > >>> > is
> > >>> > > > > > >> feasible then. But we don't want to confuse these three
> > >>> > > > > semantics/modes.
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> > BroadcastStates are currently very limited by only
> > >>> Map-like
> > >>> > > > states,
> > >>> > > > > > and
> > >>> > > > > > >> the new interface also enforces that. Can we remove this
> > >>> > limitation?
> > >>> > > > > If
> > >>> > > > > > >> not, should broadcastState declaration extend mapstate
> > >>> > declaration?
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> Personally, I don't want to make this restriction. This
> is
> > >>> also
> > >>> > why
> > >>> > > > > the
> > >>> > > > > > method in StateManager to get BroadcastState has the
> > parameter
> > >>> of
> > >>> > > > > > BroadcastStateDeclaration instead of MapStateDeclaration.
> In
> > >>> the
> > >>> > > > future,
> > >>> > > > > if
> > >>> > > > > > the state backend supports other types of broadcast state,
> we
> > >>> can
> > >>> > add a
> > >>> > > > > > corresponding method to the States utility class to get the
> > >>> > > > > > BroadcastSateDeclaration.
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> Best regards,
> > >>> > > > > > >>
> > >>> > > > > > >> Weijie
> > >>> > > > > > >>
> > >>> > > > > > >>
> > >>> > > > > > >> Hangxiang Yu <master...@gmail.com> 于2024年3月7日周四
> 11:55写道:
> > >>> > > > > > >>
> > >>> > > > > > >>> Hi, Weijie.
> > >>> > > > > > >>> Thanks for your proposal.
> > >>> > > > > > >>> I'd like to start the discussion with some questions:
> > >>> > > > > > >>> 1. We have also discussed in FLIP-359/FLINK-32658 about
> > >>> > limiting
> > >>> > > > the
> > >>> > > > > > user
> > >>> > > > > > >>> operation to avoid creating state when processElement.
> > >>> Could
> > >>> > > > current
> > >>> > > > > > >>> interfaces also help this?
> > >>> > > > > > >>>
> > >>> > > > > > >>> 2. Could you provide more examples about how
> useStates()
> > >>> works
> > >>> > ?
> > >>> > > > > Since
> > >>> > > > > > >>> some
> > >>> > > > > > >>> operations may change their used states at runtime, the
> > >>> value
> > >>> > this
> > >>> > > > > > method
> > >>> > > > > > >>> returns will be modified at runtime, right ?
> > >>> > > > > > >>> If so, I'm thinking if we could get some deterministic
> > >>> State
> > >>> > > > > > Declaration
> > >>> > > > > > >>> Set before running which could help a lot for some
> state
> > >>> > operations
> > >>> > > > > > e.g.
> > >>> > > > > > >>> pre-check schema compatibility, queryable schema.
> > >>> > > > > > >>>
> > >>> > > > > > >>> 3. IIUC, RedistributionMode/Strategy should not be used
> > by
> > >>> > users,
> > >>> > > > > > right ?
> > >>> > > > > > >>> If so, I'm +1 to move them to inner interfaces which
> > seems
> > >>> a
> > >>> > bit
> > >>> > > > > > >>> confusing
> > >>> > > > > > >>> to users.
> > >>> > > > > > >>>
> > >>> > > > > > >>> On Thu, Mar 7, 2024 at 11:39 AM Zakelly Lan <
> > >>> > zakelly....@gmail.com
> > >>> > > > >
> > >>> > > > > > >>> wrote:
> > >>> > > > > > >>>
> > >>> > > > > > >>> > Hi Weijie,
> > >>> > > > > > >>> >
> > >>> > > > > > >>> > Thanks for proposing this!
> > >>> > > > > > >>> >
> > >>> > > > > > >>> > Unifying and optimizing state definitions is a very
> > good
> > >>> > thing. I
> > >>> > > > > > like
> > >>> > > > > > >>> the
> > >>> > > > > > >>> > idea of 'definition goes before using', so overall +1
> > for
> > >>> > this
> > >>> > > > > > >>> proposal.
> > >>> > > > > > >>> >
> > >>> > > > > > >>> > However, I think the current definition is somewhat
> > >>> unclear.
> > >>> > > > From a
> > >>> > > > > > >>> user's
> > >>> > > > > > >>> > point of view, I believe that state can be
> > characterized
> > >>> > along
> > >>> > > > two
> > >>> > > > > > >>> > relatively independent axes: the scenario (keyed,
> > >>> non-keyed,
> > >>> > or
> > >>> > > > > > >>> broadcast)
> > >>> > > > > > >>> > and the data structure (single value, list, map). I
> > >>> recommend
> > >>> > > > that
> > >>> > > > > we
> > >>> > > > > > >>> fully
> > >>> > > > > > >>> > decouple these aspects, rather than linking the
> nature
> > >>> of the
> > >>> > > > > > >>> definition to
> > >>> > > > > > >>> > specific assumptions, such as equating broadcast
> states
> > >>> with
> > >>> > > > maps,
> > >>> > > > > or
> > >>> > > > > > >>> > considering list states could be non-keyed.
> > >>> > > > > > >>> > Furthermore, the concept of 'Redistribution' may
> > impose a
> > >>> > > > cognitive
> > >>> > > > > > >>> burden
> > >>> > > > > > >>> > on general users. My advice would be to conceal
> > >>> > > > > > >>> RedistributionMode/Strategy
> > >>> > > > > > >>> > from the standard user interface, particularly within
> > the
> > >>> > helper
> > >>> > > > > > class
> > >>> > > > > > >>> > 'State'. But I'm OK to keep it in `StateDeclaration`
> > >>> since
> > >>> > its
> > >>> > > > > > >>> interfaces
> > >>> > > > > > >>> > are basically used by the framework. My preferred
> > syntax
> > >>> > would
> > >>> > > > be:
> > >>> > > > > > >>> > ```
> > >>> > > > > > >>> > StateDeclaration a =
> > >>> > State.declare(name).keyed().listState(type);
> > >>> > > > > > >>> > StateDeclaration b =
> > >>> > > > > State.declare(name).broadcast().mapState(typeK,
> > >>> > > > > > >>> > typeV);
> > >>> > > > > > >>> > StateDeclaration c =
> > >>> > > > > > State.declare(name).keyed().aggregatingState(type,
> > >>> > > > > > >>> > function);
> > >>> > > > > > >>> > ```
> > >>> > > > > > >>> > WDYT?
> > >>> > > > > > >>> >
> > >>> > > > > > >>> >
> > >>> > > > > > >>> > Best,
> > >>> > > > > > >>> > Zakelly
> > >>> > > > > > >>> >
> > >>> > > > > > >>> > On Wed, Mar 6, 2024 at 11:04 PM Gyula Fóra <
> > >>> > gyula.f...@gmail.com
> > >>> > > > >
> > >>> > > > > > >>> wrote:
> > >>> > > > > > >>> >
> > >>> > > > > > >>> > > Hi Weijie!
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > Thank you for the proposal.
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > I have some initial questions to start the
> > discussion:
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > 1. What is the semantics of the usesStates method?
> > >>> When is
> > >>> > it
> > >>> > > > > > >>> called? Can
> > >>> > > > > > >>> > > the used state change dynamically at runtime? Can
> the
> > >>> logic
> > >>> > > > > depend
> > >>> > > > > > on
> > >>> > > > > > >>> > > something computed in open(..) for example?
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > Currently state access is pretty dynamic in Flink
> > and I
> > >>> > would
> > >>> > > > > > assume
> > >>> > > > > > >>> many
> > >>> > > > > > >>> > > jobs create states on the fly based on some
> required
> > >>> > logic. Are
> > >>> > > > > we
> > >>> > > > > > >>> > planning
> > >>> > > > > > >>> > > to address these use-cases?
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > Are we planning to support deleting/dropping states
> > >>> that
> > >>> > are
> > >>> > > > not
> > >>> > > > > > >>> required
> > >>> > > > > > >>> > > anymore?
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > 2. Get state now returns an optional, but you
> mention
> > >>> that:
> > >>> > > > > > >>> > > " If you want to get a state that is not declared
> or
> > >>> has no
> > >>> > > > > access,
> > >>> > > > > > >>> > > Option#empty is returned."
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > I think if a state is not declared or otherwise
> > cannot
> > >>> be
> > >>> > > > > accessed,
> > >>> > > > > > >>> an
> > >>> > > > > > >>> > > exceptions must be thrown. We cannot confuse empty
> > >>> value
> > >>> > with
> > >>> > > > > > >>> something
> > >>> > > > > > >>> > > inaccessible.
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > 3. The RedistributionMode enum sounds a bit strange
> > to
> > >>> me,
> > >>> > as
> > >>> > > > it
> > >>> > > > > > >>> doesn't
> > >>> > > > > > >>> > > actually specify a mode of redistribution. It feels
> > >>> more
> > >>> > like a
> > >>> > > > > > >>> flag. Can
> > >>> > > > > > >>> > > we simply have an Optional<RedistributionStrategy>
> > >>> instead?
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > 4. BroadcastStates are currently very limited by
> only
> > >>> > Map-like
> > >>> > > > > > >>> states,
> > >>> > > > > > >>> > and
> > >>> > > > > > >>> > > the new interface also enforces that.
> > >>> > > > > > >>> > > Can we remove this limitation? If not, should
> > >>> > broadcastState
> > >>> > > > > > >>> declaration
> > >>> > > > > > >>> > > extend mapstate declaration?
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > Cheers,
> > >>> > > > > > >>> > > Gyula
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > Cheers
> > >>> > > > > > >>> > > Gyuka
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > On Wed, Mar 6, 2024 at 11:18 AM weijie guo <
> > >>> > > > > > >>> guoweijieres...@gmail.com>
> > >>> > > > > > >>> > > wrote:
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> > > > Hi devs,
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > > I'd like to start a discussion about FLIP-433:
> > State
> > >>> > Access
> > >>> > > > on
> > >>> > > > > > >>> > > > DataStream API V2
> > >>> > > > > > >>> > > > [1]. This is the third sub-FLIP of DataStream API
> > V2.
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > > After FLIP-410 [2], we can already write a simple
> > >>> > stateless
> > >>> > > > job
> > >>> > > > > > >>> using
> > >>> > > > > > >>> > the
> > >>> > > > > > >>> > > > DataStream V2 API.  But as we all know, stateful
> > >>> > computing is
> > >>> > > > > > >>> Flink's
> > >>> > > > > > >>> > > trump
> > >>> > > > > > >>> > > > card. In this FLIP, we will discuss how to
> declare
> > >>> and
> > >>> > access
> > >>> > > > > > >>> state on
> > >>> > > > > > >>> > > > DataStream API V2 and we manage to avoid some of
> > the
> > >>> > > > > shortcomings
> > >>> > > > > > >>> of V1
> > >>> > > > > > >>> > > in
> > >>> > > > > > >>> > > > this regard.
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > > You can find more details in this FLIP. Its
> > >>> relationship
> > >>> > with
> > >>> > > > > > other
> > >>> > > > > > >>> > > > sub-FLIPs can be found in the umbrella FLIP
> > >>> > > > > > >>> > > > [3]. Looking forward to hearing from you, thanks!
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > > Best regards,
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > > Weijie
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > > [1]
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> >
> > >>> > > > > > >>>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > > [2]
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> >
> > >>> > > > > > >>>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > > [3]
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> >
> > >>> > > > > > >>>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2
> > >>> > > > > > >>> > > >
> > >>> > > > > > >>> > >
> > >>> > > > > > >>> >
> > >>> > > > > > >>>
> > >>> > > > > > >>>
> > >>> > > > > > >>> --
> > >>> > > > > > >>> Best,
> > >>> > > > > > >>> Hangxiang.
> > >>> > > > > > >>>
> > >>> > > > > > >>
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> >
> > >>>
> > >>>
> > >>> --
> > >>> Best,
> > >>> Hangxiang.
> > >>>
> > >>
> >
>

Reply via email to