Hi, Weijie. Thanks for your proposal. I'd like to start the discussion with some questions: 1. We have also discussed in FLIP-359/FLINK-32658 about limiting the user operation to avoid creating state when processElement. Could current interfaces also help this?
2. Could you provide more examples about how useStates() works ? Since some operations may change their used states at runtime, the value this method returns will be modified at runtime, right ? If so, I'm thinking if we could get some deterministic State Declaration Set before running which could help a lot for some state operations e.g. pre-check schema compatibility, queryable schema. 3. IIUC, RedistributionMode/Strategy should not be used by users, right ? If so, I'm +1 to move them to inner interfaces which seems a bit confusing to users. On Thu, Mar 7, 2024 at 11:39 AM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Weijie, > > Thanks for proposing this! > > Unifying and optimizing state definitions is a very good thing. I like the > idea of 'definition goes before using', so overall +1 for this proposal. > > However, I think the current definition is somewhat unclear. From a user's > point of view, I believe that state can be characterized along two > relatively independent axes: the scenario (keyed, non-keyed, or broadcast) > and the data structure (single value, list, map). I recommend that we fully > decouple these aspects, rather than linking the nature of the definition to > specific assumptions, such as equating broadcast states with maps, or > considering list states could be non-keyed. > Furthermore, the concept of 'Redistribution' may impose a cognitive burden > on general users. My advice would be to conceal RedistributionMode/Strategy > from the standard user interface, particularly within the helper class > 'State'. But I'm OK to keep it in `StateDeclaration` since its interfaces > are basically used by the framework. My preferred syntax would be: > ``` > StateDeclaration a = State.declare(name).keyed().listState(type); > StateDeclaration b = State.declare(name).broadcast().mapState(typeK, > typeV); > StateDeclaration c = State.declare(name).keyed().aggregatingState(type, > function); > ``` > WDYT? > > > Best, > Zakelly > > On Wed, Mar 6, 2024 at 11:04 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > > > Hi Weijie! > > > > Thank you for the proposal. > > > > I have some initial questions to start the discussion: > > > > 1. What is the semantics of the usesStates method? When is it called? Can > > the used state change dynamically at runtime? Can the logic depend on > > something computed in open(..) for example? > > > > Currently state access is pretty dynamic in Flink and I would assume many > > jobs create states on the fly based on some required logic. Are we > planning > > to address these use-cases? > > > > Are we planning to support deleting/dropping states that are not required > > anymore? > > > > 2. Get state now returns an optional, but you mention that: > > " If you want to get a state that is not declared or has no access, > > Option#empty is returned." > > > > I think if a state is not declared or otherwise cannot be accessed, an > > exceptions must be thrown. We cannot confuse empty value with something > > inaccessible. > > > > 3. The RedistributionMode enum sounds a bit strange to me, as it doesn't > > actually specify a mode of redistribution. It feels more like a flag. Can > > we simply have an Optional<RedistributionStrategy> instead? > > > > 4. BroadcastStates are currently very limited by only Map-like states, > and > > the new interface also enforces that. > > Can we remove this limitation? If not, should broadcastState declaration > > extend mapstate declaration? > > > > Cheers, > > Gyula > > > > Cheers > > Gyuka > > > > On Wed, Mar 6, 2024 at 11:18 AM weijie guo <guoweijieres...@gmail.com> > > wrote: > > > > > Hi devs, > > > > > > I'd like to start a discussion about FLIP-433: State Access on > > > DataStream API V2 > > > [1]. This is the third sub-FLIP of DataStream API V2. > > > > > > > > > After FLIP-410 [2], we can already write a simple stateless job using > the > > > DataStream V2 API. But as we all know, stateful computing is Flink's > > trump > > > card. In this FLIP, we will discuss how to declare and access state on > > > DataStream API V2 and we manage to avoid some of the shortcomings of V1 > > in > > > this regard. > > > > > > You can find more details in this FLIP. Its relationship with other > > > sub-FLIPs can be found in the umbrella FLIP > > > [3]. Looking forward to hearing from you, thanks! > > > > > > > > > Best regards, > > > > > > Weijie > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2 > > > > > > [2] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2 > > > > > > [3] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 > > > > > > -- Best, Hangxiang.