Hi everyone, Thanks for your discussion and feedback!
Our discussions have been going on for a while and there have been no new concerns for several days. So I would like to start voting recently. Best regards, Weijie Zakelly Lan <zakelly....@gmail.com> 于2024年3月12日周二 17:40写道: > Hi Weijie, > > Thanks for your reply! > > Overall I'd be fine with the builder pattern, but it is a little bit long > when carrying explicit 'build()' and declaring the builder. Keeping the > StateDeclaration immutable is OK, but it is a little bit inconvenient for > overriding the undefined options by job configuration at runtime. I'd > suggest providing some methods responsible for rebuilding a new > StateDeclaration with new configurable options, just like the > ConfigOptions#defaultValue does. Well, this is just a suggestion, I'm not > going to insist on it. > > > Best, > Zakelly > > On Tue, Mar 12, 2024 at 2:07 PM weijie guo <guoweijieres...@gmail.com> > wrote: > > > Hi Zakelly, > > > > > But still, from a user's point of view, state can be characterized > along > > two relatively independent dimensions, how states redistribute and the > data > > structure. Thus I still suggest a chained-like configuration API that > > configures one aspect on each call. > > > > > > I think the chained-like style is a good suggestion. But I'm not going to > > introduce any mutable-like API to StateDeclaration (even though we can > > achieve immutability by returning a new object). For this reason, I > decided > > to use the builder pattern, which also has the benefit of chaining calls > > and allows us to support further configurations such as setTTL in the > > future. For ease of use, we'll also provide some shortcuts to avoid > having > > to go through a long build chain each time. Of course, I have updated the > > the FLIP about this part. > > > > > > > > Best regards, > > > > Weijie > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月12日周二 14:00写道: > > > > > Hi Hangxiang, > > > > > > > So these operators only define all states they may use which could be > > > explained by the caller, right ? > > > > > > Yes, you're right. > > > > > > > > > Best regards, > > > > > > Weijie > > > > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月12日周二 13:59写道: > > > > > >> Hi Max, > > >> > > >> > In this thread it looks like the plan is to remove the old state > > >> declaration API. I think we should consider keeping the old APIs to > > >> avoid breaking too many jobs. > > >> > > >> We're not plan to remove any old apis, which means that changes made > in > > >> V2 won't affect any V1 DataStream jobs. But V2 is limited to the new > > state > > >> declaration API, and users who want to migrate to DataStream V2 will > > need > > >> to rewrite their jobs anyway. > > >> > > >> Best regards, > > >> > > >> Weijie > > >> > > >> > > >> Hangxiang Yu <master...@gmail.com> 于2024年3月12日周二 10:26写道: > > >> > > >>> Hi, Weijie. > > >>> Thanks for your answer! > > >>> > > >>> > No, Introducing and declaring new state > > >>> > at runtime is something we want to explicitly disallow. > > >>> > > >>> I just thinked about how some operators define their useState() when > > >>> their > > >>> real used states may be changed at runtime, e.g. different state > types > > >>> for > > >>> different state sizes. > > >>> So these operators only define all states they may use which could be > > >>> explained by the caller, right ? > > >>> > > >>> On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels <m...@apache.org> > > >>> wrote: > > >>> > > >>> > The FLIP mentions: "The contents described in this FLIP are all new > > >>> > APIs and do not involve compatibility issues." > > >>> > > > >>> > In this thread it looks like the plan is to remove the old state > > >>> > declaration API. I think we should consider keeping the old APIs to > > >>> > avoid breaking too many jobs. The new APIs will still be beneficial > > >>> > for new jobs, e.g. for SQL jobs. > > >>> > > > >>> > -Max > > >>> > > > >>> > On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan <zakelly....@gmail.com> > > >>> wrote: > > >>> > > > > >>> > > Hi Weijie, > > >>> > > > > >>> > > Thanks for your answer! Well I get your point. Since partitions > are > > >>> > > first-class citizens, and redistribution means how states migrate > > >>> when > > >>> > > partitions change, I'd be fine with deemphasizing the concept of > > >>> > > keyed/operator state if we highlight the definition of partition > in > > >>> the > > >>> > > document. Keeping `RedistributionMode` under `StateDeclaration` > is > > >>> also > > >>> > > fine with me, as I guess it is only for internal usage. > > >>> > > But still, from a user's point of view, state can be > characterized > > >>> along > > >>> > > two relatively independent dimensions, how states redistribute > and > > >>> the > > >>> > data > > >>> > > structure. Thus I still suggest a chained-like configuration API > > that > > >>> > > configures one aspect on each call, such as: > > >>> > > ``` > > >>> > > # Keyed stream, no redistribution mode specified, the state will > go > > >>> with > > >>> > > partition (no redistribution). ---- Keyed state > > >>> > > StateDeclaration a = States.declare(name).listState(type); > > >>> > > > > >>> > > # Keyed stream, redistribution strategy specified, the state > > follows > > >>> the > > >>> > > specified redistribute strategy. ---- Operator state > > >>> > > StateDeclaration b = > > >>> > > States.declare(name).listState(type).redistributeBy(strategy); > > >>> > > > > >>> > > # Non-keyed stream, redistribution strategy *must be* specified. > > >>> > > StateDeclaration c = > > >>> > > States.declare(name).listState(type).redistributeBy(strategy); > > >>> > > > > >>> > > # Broadcast stream and state > > >>> > > StateDeclaration d = States.declare(name).mapState(typeK, > > >>> > > typeV).broadcast(); > > >>> > > ``` > > >>> > > It can drive users to think about redistribution issues when > > needed. > > >>> And > > >>> > it > > >>> > > also provides more flexibility to add more combinations such as > > >>> > > broadcasting list state, or chain more configurable aspects such > as > > >>> > adding > > >>> > > `withTtl()` in future. WDYT? > > >>> > > > > >>> > > > > >>> > > Best, > > >>> > > Zakelly > > >>> > > > > >>> > > On Thu, Mar 7, 2024 at 6:04 PM weijie guo < > > guoweijieres...@gmail.com > > >>> > > > >>> > wrote: > > >>> > > > > >>> > > > Hi Jinzhong, > > >>> > > > > > >>> > > > Thanks for the reply! > > >>> > > > > > >>> > > > > Overall, I think that the “Eager State Declaration” is a good > > >>> > proposal, > > >>> > > > which can enhance Flink's state management capabilities and > > provide > > >>> > > > possibilities for subsequent state optimizations. > > >>> > > > > > >>> > > > It's nice to see that people who are familiar with the state > > stuff > > >>> like > > >>> > > > this proposal. :) > > >>> > > > > > >>> > > > > When the user attempts to access an undeclared state at > > >>> runtime, it > > >>> > is > > >>> > > > more reasonable to throw an exception rather than returning > > >>> > Option#empty, > > >>> > > > as Gyula mentioned above. > > >>> > > > > > >>> > > > Yes, I agree that this is better then a confused empty, and I > > have > > >>> > modified > > >>> > > > the corresponding part of this FLIP. > > >>> > > > > > >>> > > > > In addition, I'm not quite sure whether all of the existing > > >>> usage in > > >>> > > > which states are registered at runtime dynamically can be > > migrated > > >>> to > > >>> > the > > >>> > > > "Eager State Declaration" style with minimal cost? > > >>> > > > > > >>> > > > I think for most user functions, this is fairly straightforward > > to > > >>> > migrate. > > >>> > > > But states whose declarations depend on runtime > information(e.g. > > >>> > > > RuntimeContext) are, in principle, not supported in the new > API. > > >>> > Anyway, > > >>> > > > the old and new apis are completely incompatible, so rewriting > > >>> jobs is > > >>> > > > inevitable. User can think about how to write a good process > > >>> function > > >>> > that > > >>> > > > conforms to the eager declaration style. > > >>> > > > > > >>> > > > > For state TTL, should StateDeclaration also provide > interfaces > > >>> for > > >>> > users > > >>> > > > to declare state ttl? > > >>> > > > > > >>> > > > Of course, We can and we need to provide this one. But whether > or > > >>> not > > >>> > it's > > >>> > > > in this FLIP isn't very important for me, because we're mainly > > >>> talking > > >>> > > > about the general principles and ways of declaring and > accessing > > >>> state > > >>> > in > > >>> > > > this FLIP. I promise we won't leave it out in the end D). > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > Best regards, > > >>> > > > > > >>> > > > Weijie > > >>> > > > > > >>> > > > > > >>> > > > Jinzhong Li <lijinzhong2...@gmail.com> 于2024年3月7日周四 17:34写道: > > >>> > > > > > >>> > > > > Hi Weijie, > > >>> > > > > > > >>> > > > > Thanks for driving this! > > >>> > > > > > > >>> > > > > 1. Overall, I think that the “Eager State Declaration” is a > > good > > >>> > > > proposal, > > >>> > > > > which can enhance Flink's state management capabilities and > > >>> provide > > >>> > > > > possibilities for subsequent state optimizations. > > >>> > > > > > > >>> > > > > 2. When the user attempts to access an undeclared state at > > >>> runtime, > > >>> > it is > > >>> > > > > more reasonable to throw an exception rather than returning > > >>> > Option#empty, > > >>> > > > > as Gyula mentioned above. > > >>> > > > > In addition, I'm not quite sure whether all of the existing > > >>> usage in > > >>> > > > which > > >>> > > > > states are registered at runtime dynamically can be migrated > to > > >>> the > > >>> > > > "Eager > > >>> > > > > State Declaration" style with minimal cost? > > >>> > > > > > > >>> > > > > 3. For state TTL, should StateDeclaration also provide > > >>> interfaces for > > >>> > > > users > > >>> > > > > to declare state ttl? > > >>> > > > > > > >>> > > > > Best, > > >>> > > > > Jinzhong Li > > >>> > > > > > > >>> > > > > > > >>> > > > > On Thu, Mar 7, 2024 at 5:08 PM weijie guo < > > >>> guoweijieres...@gmail.com > > >>> > > > > >>> > > > > wrote: > > >>> > > > > > > >>> > > > > > Hi Hangxiang, > > >>> > > > > > > > >>> > > > > > Thanks for your reply! > > >>> > > > > > > > >>> > > > > > > We have also discussed in FLIP-359/FLINK-32658 about > > >>> limiting the > > >>> > > > user > > >>> > > > > > operation to avoid creating state when processElement. > Could > > >>> > current > > >>> > > > > > interfaces also help this? > > >>> > > > > > > > >>> > > > > > I think so. It is illegal to create state at runtime in our > > >>> > proposal. > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > Could you provide more examples about how useStates() > > works ? > > >>> > Since > > >>> > > > > some > > >>> > > > > > operations may change their used states at runtime, the > value > > >>> this > > >>> > > > method > > >>> > > > > > returns will be modified at runtime, right? > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > No, Introducing and declaring new state > > >>> > > > > > at runtime is something we want to explicitly disallow. You > > can > > >>> > simply > > >>> > > > > > assume that useState is only called when the JobGraph is > > >>> generated, > > >>> > > > > > and any future changes to it are invalid and illegal. > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > IIUC, RedistributionMode/Strategy should not be used by > > >>> users, > > >>> > right > > >>> > > > ? > > >>> > > > > > If so, I'm +1 to move them to inner interfaces which seems > a > > >>> bit > > >>> > > > > confusing > > >>> > > > > > to users. > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > As for this question, I think my answer to Zakelly should > be > > >>> > helpful. > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > Best regards, > > >>> > > > > > > > >>> > > > > > Weijie > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月7日周四 > 16:58写道: > > >>> > > > > > > > >>> > > > > > > Hi Zakelly, > > >>> > > > > > > > > >>> > > > > > > Thanks for your reply! > > >>> > > > > > > > > >>> > > > > > > > My advice would be to conceal > RedistributionMode/Strategy > > >>> from > > >>> > the > > >>> > > > > > > standard user interface, particularly within the helper > > class > > >>> > > > 'State'. > > >>> > > > > > But > > >>> > > > > > > I'm OK to keep it in `StateDeclaration` since its > > interfaces > > >>> are > > >>> > > > > > basically > > >>> > > > > > > used by the framework. > > >>> > > > > > > > > >>> > > > > > > I'm sorry, I didn't mention some of the details/concepts > > >>> > introduced > > >>> > > > by > > >>> > > > > > the Umbrella FLIP and FLIP-409 in this FLIP. This might > make > > it > > >>> > hard to > > >>> > > > > > understand the motivation behind > > >>> > > > > > > RedistributionMode, I'll add more context in FLIP then. > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > Briefly, in V2, we explicitly define the concept of > > >>> partition. > > >>> > In the > > >>> > > > > > case of KeyedPartitionStream, one key corresponds to one > > >>> > partition.For > > >>> > > > > > NonKeyedPartitionStream one parallelism/subtask corresponds > > to > > >>> one > > >>> > > > > > partition. All states are considered to be confined within > > the > > >>> > > > partition. > > >>> > > > > > On this basis, an obvious question is whether and how the > > state > > >>> > should > > >>> > > > be > > >>> > > > > > redistribution when the partition changes? So we divide the > > >>> state > > >>> > into > > >>> > > > > > three categories: > > >>> > > > > > > > > >>> > > > > > > - Don't need to redistribute states when the partition > > >>> > changes. > > >>> > > > > > > - Has to decide how to distribute states when the > > >>> partition > > >>> > > > changes. > > >>> > > > > > > - Always has the same state across different > partitions. > > >>> > > > > > > > > >>> > > > > > > After introducing the concept of partition, the > > >>> redistribution > > >>> > > > > > pattern/mode of state is the more essential difference > > between > > >>> > states. > > >>> > > > > For > > >>> > > > > > this reason, we don't want to emphasize keyed/operator > state > > in > > >>> > the V2 > > >>> > > > > API > > >>> > > > > > > any > > >>> > > > > > > more. Keep in mind, partition are first-class citizens. > > And, > > >>> > even in > > >>> > > > > V1, > > >>> > > > > > we have to let the user know that split/union are two > > different > > >>> > > > > strategies > > >>> > > > > > for list state. > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > As for whether or not to expose RedistributionMode to > > users, > > >>> I > > >>> > have > > >>> > > > an > > >>> > > > > > open mind. But as I said just now, we still can't avoid > this > > >>> > problem in > > >>> > > > > the > > >>> > > > > > splitRedistributionListState and > > unionRedistributionListState. > > >>> IMO, > > >>> > > > it's > > >>> > > > > > better to explain it in the API level instead of avoiding > it. > > >>> WDTY? > > >>> > > > > > > > > >>> > > > > > > Best regards, > > >>> > > > > > > > > >>> > > > > > > Weijie > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月7日周四 > > 16:39写道: > > >>> > > > > > > > > >>> > > > > > >> Hi Gyula, > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> Thanks for your reply! > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> Let me answer these questions: > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > What is the semantics of the usesStates method? When > is > > it > > >>> > called? > > >>> > > > > Can > > >>> > > > > > >> the used state change dynamically at runtime? Can the > > logic > > >>> > depend > > >>> > > > on > > >>> > > > > > something computed in open(..) for example? > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> useStates is used to predefine all the states that the > > >>> process > > >>> > > > > function > > >>> > > > > > needs to access. In other words, we want to avoid declaring > > the > > >>> > state > > >>> > > > > > dynamically at runtime and this allows the SQL planner and > JM > > >>> to > > >>> > > > optimize > > >>> > > > > > the job better. As a result, this logic must be fully > > >>> available at > > >>> > > > > compile > > >>> > > > > > time (when the JobGraph is generated), so it can't rely on > > >>> > computations > > >>> > > > > > that are executed after deploy to TM. > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > > >>> > > > > > >> Currently state access is pretty dynamic in Flink and I > > >>> would > > >>> > assume > > >>> > > > > > many jobs create states on the fly based on some required > > >>> logic. > > >>> > Are we > > >>> > > > > > planning to address these use-cases? > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> It depends on what type of context we need. If the type > > and > > >>> > number > > >>> > > > of > > >>> > > > > > states depend on runtime context, that's something we want > to > > >>> > avoid. If > > >>> > > > > it > > >>> > > > > > only depended on information available at compile time, I > > >>> think we > > >>> > > > could > > >>> > > > > > support > > >>> > > > > > >> it. > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > > >>> > > > > > >> Are we planning to support deleting/dropping states that > > >>> are not > > >>> > > > > > required anymore? > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> We really don't want the user to be able to dynamically > > >>> > > > declare/delete > > >>> > > > > > a state at runtime, but if you just want to clear/clean the > > >>> value > > >>> > of > > >>> > > > > state, > > >>> > > > > > the new API works the same as the old API. > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > I think if a state is not declared or otherwise cannot > > be > > >>> > > > accessed, > > >>> > > > > an > > >>> > > > > > >> exceptions must be thrown. We cannot confuse empty value > > >>> with > > >>> > > > > something > > >>> > > > > > >> inaccessible. > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> After thinking about it a bit more, I think you have a > > >>> point! > > >>> > > > > > >> It's important to make a clear distinction between an > > empty > > >>> > state > > >>> > > > and > > >>> > > > > > illegal access, especially since flink currently discourage > > >>> > setting a > > >>> > > > > > non-null default value for the state. > > >>> > > > > > >> I will modify the proposal as you suggested then :) > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > The RedistributionMode enum sounds a bit strange to > me, > > >>> as it > > >>> > > > > doesn't > > >>> > > > > > >> actually specify a mode of redistribution. It feels more > > >>> like a > > >>> > > > flag. > > >>> > > > > > Can > > >>> > > > > > >> we simply have an Optional<RedistributionStrategy> > > instead? > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> We actually define three types RedistributionMode > instead > > >>> of two > > >>> > > > > because > > >>> > > > > > >> we don't want to think of IDENTICAL as a redistribution > > >>> > strategy, > > >>> > > > it's > > >>> > > > > > just > > >>> > > > > > >> an invariant: the State of that type is always the same > > >>> across > > >>> > > > > > partitions. > > >>> > > > > > >> If it only has None and REDISTRIBUTABLE, I think your > > >>> proposal > > >>> > is > > >>> > > > > > >> feasible then. But we don't want to confuse these three > > >>> > > > > semantics/modes. > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > BroadcastStates are currently very limited by only > > >>> Map-like > > >>> > > > states, > > >>> > > > > > and > > >>> > > > > > >> the new interface also enforces that. Can we remove this > > >>> > limitation? > > >>> > > > > If > > >>> > > > > > >> not, should broadcastState declaration extend mapstate > > >>> > declaration? > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> Personally, I don't want to make this restriction. This > is > > >>> also > > >>> > why > > >>> > > > > the > > >>> > > > > > method in StateManager to get BroadcastState has the > > parameter > > >>> of > > >>> > > > > > BroadcastStateDeclaration instead of MapStateDeclaration. > In > > >>> the > > >>> > > > future, > > >>> > > > > if > > >>> > > > > > the state backend supports other types of broadcast state, > we > > >>> can > > >>> > add a > > >>> > > > > > corresponding method to the States utility class to get the > > >>> > > > > > BroadcastSateDeclaration. > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> Best regards, > > >>> > > > > > >> > > >>> > > > > > >> Weijie > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> Hangxiang Yu <master...@gmail.com> 于2024年3月7日周四 > 11:55写道: > > >>> > > > > > >> > > >>> > > > > > >>> Hi, Weijie. > > >>> > > > > > >>> Thanks for your proposal. > > >>> > > > > > >>> I'd like to start the discussion with some questions: > > >>> > > > > > >>> 1. We have also discussed in FLIP-359/FLINK-32658 about > > >>> > limiting > > >>> > > > the > > >>> > > > > > user > > >>> > > > > > >>> operation to avoid creating state when processElement. > > >>> Could > > >>> > > > current > > >>> > > > > > >>> interfaces also help this? > > >>> > > > > > >>> > > >>> > > > > > >>> 2. Could you provide more examples about how > useStates() > > >>> works > > >>> > ? > > >>> > > > > Since > > >>> > > > > > >>> some > > >>> > > > > > >>> operations may change their used states at runtime, the > > >>> value > > >>> > this > > >>> > > > > > method > > >>> > > > > > >>> returns will be modified at runtime, right ? > > >>> > > > > > >>> If so, I'm thinking if we could get some deterministic > > >>> State > > >>> > > > > > Declaration > > >>> > > > > > >>> Set before running which could help a lot for some > state > > >>> > operations > > >>> > > > > > e.g. > > >>> > > > > > >>> pre-check schema compatibility, queryable schema. > > >>> > > > > > >>> > > >>> > > > > > >>> 3. IIUC, RedistributionMode/Strategy should not be used > > by > > >>> > users, > > >>> > > > > > right ? > > >>> > > > > > >>> If so, I'm +1 to move them to inner interfaces which > > seems > > >>> a > > >>> > bit > > >>> > > > > > >>> confusing > > >>> > > > > > >>> to users. > > >>> > > > > > >>> > > >>> > > > > > >>> On Thu, Mar 7, 2024 at 11:39 AM Zakelly Lan < > > >>> > zakelly....@gmail.com > > >>> > > > > > > >>> > > > > > >>> wrote: > > >>> > > > > > >>> > > >>> > > > > > >>> > Hi Weijie, > > >>> > > > > > >>> > > > >>> > > > > > >>> > Thanks for proposing this! > > >>> > > > > > >>> > > > >>> > > > > > >>> > Unifying and optimizing state definitions is a very > > good > > >>> > thing. I > > >>> > > > > > like > > >>> > > > > > >>> the > > >>> > > > > > >>> > idea of 'definition goes before using', so overall +1 > > for > > >>> > this > > >>> > > > > > >>> proposal. > > >>> > > > > > >>> > > > >>> > > > > > >>> > However, I think the current definition is somewhat > > >>> unclear. > > >>> > > > From a > > >>> > > > > > >>> user's > > >>> > > > > > >>> > point of view, I believe that state can be > > characterized > > >>> > along > > >>> > > > two > > >>> > > > > > >>> > relatively independent axes: the scenario (keyed, > > >>> non-keyed, > > >>> > or > > >>> > > > > > >>> broadcast) > > >>> > > > > > >>> > and the data structure (single value, list, map). I > > >>> recommend > > >>> > > > that > > >>> > > > > we > > >>> > > > > > >>> fully > > >>> > > > > > >>> > decouple these aspects, rather than linking the > nature > > >>> of the > > >>> > > > > > >>> definition to > > >>> > > > > > >>> > specific assumptions, such as equating broadcast > states > > >>> with > > >>> > > > maps, > > >>> > > > > or > > >>> > > > > > >>> > considering list states could be non-keyed. > > >>> > > > > > >>> > Furthermore, the concept of 'Redistribution' may > > impose a > > >>> > > > cognitive > > >>> > > > > > >>> burden > > >>> > > > > > >>> > on general users. My advice would be to conceal > > >>> > > > > > >>> RedistributionMode/Strategy > > >>> > > > > > >>> > from the standard user interface, particularly within > > the > > >>> > helper > > >>> > > > > > class > > >>> > > > > > >>> > 'State'. But I'm OK to keep it in `StateDeclaration` > > >>> since > > >>> > its > > >>> > > > > > >>> interfaces > > >>> > > > > > >>> > are basically used by the framework. My preferred > > syntax > > >>> > would > > >>> > > > be: > > >>> > > > > > >>> > ``` > > >>> > > > > > >>> > StateDeclaration a = > > >>> > State.declare(name).keyed().listState(type); > > >>> > > > > > >>> > StateDeclaration b = > > >>> > > > > State.declare(name).broadcast().mapState(typeK, > > >>> > > > > > >>> > typeV); > > >>> > > > > > >>> > StateDeclaration c = > > >>> > > > > > State.declare(name).keyed().aggregatingState(type, > > >>> > > > > > >>> > function); > > >>> > > > > > >>> > ``` > > >>> > > > > > >>> > WDYT? > > >>> > > > > > >>> > > > >>> > > > > > >>> > > > >>> > > > > > >>> > Best, > > >>> > > > > > >>> > Zakelly > > >>> > > > > > >>> > > > >>> > > > > > >>> > On Wed, Mar 6, 2024 at 11:04 PM Gyula Fóra < > > >>> > gyula.f...@gmail.com > > >>> > > > > > > >>> > > > > > >>> wrote: > > >>> > > > > > >>> > > > >>> > > > > > >>> > > Hi Weijie! > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > Thank you for the proposal. > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > I have some initial questions to start the > > discussion: > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > 1. What is the semantics of the usesStates method? > > >>> When is > > >>> > it > > >>> > > > > > >>> called? Can > > >>> > > > > > >>> > > the used state change dynamically at runtime? Can > the > > >>> logic > > >>> > > > > depend > > >>> > > > > > on > > >>> > > > > > >>> > > something computed in open(..) for example? > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > Currently state access is pretty dynamic in Flink > > and I > > >>> > would > > >>> > > > > > assume > > >>> > > > > > >>> many > > >>> > > > > > >>> > > jobs create states on the fly based on some > required > > >>> > logic. Are > > >>> > > > > we > > >>> > > > > > >>> > planning > > >>> > > > > > >>> > > to address these use-cases? > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > Are we planning to support deleting/dropping states > > >>> that > > >>> > are > > >>> > > > not > > >>> > > > > > >>> required > > >>> > > > > > >>> > > anymore? > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > 2. Get state now returns an optional, but you > mention > > >>> that: > > >>> > > > > > >>> > > " If you want to get a state that is not declared > or > > >>> has no > > >>> > > > > access, > > >>> > > > > > >>> > > Option#empty is returned." > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > I think if a state is not declared or otherwise > > cannot > > >>> be > > >>> > > > > accessed, > > >>> > > > > > >>> an > > >>> > > > > > >>> > > exceptions must be thrown. We cannot confuse empty > > >>> value > > >>> > with > > >>> > > > > > >>> something > > >>> > > > > > >>> > > inaccessible. > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > 3. The RedistributionMode enum sounds a bit strange > > to > > >>> me, > > >>> > as > > >>> > > > it > > >>> > > > > > >>> doesn't > > >>> > > > > > >>> > > actually specify a mode of redistribution. It feels > > >>> more > > >>> > like a > > >>> > > > > > >>> flag. Can > > >>> > > > > > >>> > > we simply have an Optional<RedistributionStrategy> > > >>> instead? > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > 4. BroadcastStates are currently very limited by > only > > >>> > Map-like > > >>> > > > > > >>> states, > > >>> > > > > > >>> > and > > >>> > > > > > >>> > > the new interface also enforces that. > > >>> > > > > > >>> > > Can we remove this limitation? If not, should > > >>> > broadcastState > > >>> > > > > > >>> declaration > > >>> > > > > > >>> > > extend mapstate declaration? > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > Cheers, > > >>> > > > > > >>> > > Gyula > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > Cheers > > >>> > > > > > >>> > > Gyuka > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > On Wed, Mar 6, 2024 at 11:18 AM weijie guo < > > >>> > > > > > >>> guoweijieres...@gmail.com> > > >>> > > > > > >>> > > wrote: > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > Hi devs, > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > I'd like to start a discussion about FLIP-433: > > State > > >>> > Access > > >>> > > > on > > >>> > > > > > >>> > > > DataStream API V2 > > >>> > > > > > >>> > > > [1]. This is the third sub-FLIP of DataStream API > > V2. > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > After FLIP-410 [2], we can already write a simple > > >>> > stateless > > >>> > > > job > > >>> > > > > > >>> using > > >>> > > > > > >>> > the > > >>> > > > > > >>> > > > DataStream V2 API. But as we all know, stateful > > >>> > computing is > > >>> > > > > > >>> Flink's > > >>> > > > > > >>> > > trump > > >>> > > > > > >>> > > > card. In this FLIP, we will discuss how to > declare > > >>> and > > >>> > access > > >>> > > > > > >>> state on > > >>> > > > > > >>> > > > DataStream API V2 and we manage to avoid some of > > the > > >>> > > > > shortcomings > > >>> > > > > > >>> of V1 > > >>> > > > > > >>> > > in > > >>> > > > > > >>> > > > this regard. > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > You can find more details in this FLIP. Its > > >>> relationship > > >>> > with > > >>> > > > > > other > > >>> > > > > > >>> > > > sub-FLIPs can be found in the umbrella FLIP > > >>> > > > > > >>> > > > [3]. Looking forward to hearing from you, thanks! > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > Best regards, > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > Weijie > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > [1] > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > >>> > > > > > >>> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2 > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > [2] > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > >>> > > > > > >>> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2 > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > [3] > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > >>> > > > > > >>> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > >>> > > > > > >>> > > > >>> > > > > > >>> > > >>> > > > > > >>> > > >>> > > > > > >>> -- > > >>> > > > > > >>> Best, > > >>> > > > > > >>> Hangxiang. > > >>> > > > > > >>> > > >>> > > > > > >> > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > >>> > > >>> > > >>> -- > > >>> Best, > > >>> Hangxiang. > > >>> > > >> > > >