Hi Hangxiang, > So these operators only define all states they may use which could be explained by the caller, right ?
Yes, you're right. Best regards, Weijie weijie guo <guoweijieres...@gmail.com> 于2024年3月12日周二 13:59写道: > Hi Max, > > > In this thread it looks like the plan is to remove the old state > declaration API. I think we should consider keeping the old APIs to > avoid breaking too many jobs. > > We're not plan to remove any old apis, which means that changes made in V2 > won't affect any V1 DataStream jobs. But V2 is limited to the new state > declaration API, and users who want to migrate to DataStream V2 will need > to rewrite their jobs anyway. > > Best regards, > > Weijie > > > Hangxiang Yu <master...@gmail.com> 于2024年3月12日周二 10:26写道: > >> Hi, Weijie. >> Thanks for your answer! >> >> > No, Introducing and declaring new state >> > at runtime is something we want to explicitly disallow. >> >> I just thinked about how some operators define their useState() when their >> real used states may be changed at runtime, e.g. different state types for >> different state sizes. >> So these operators only define all states they may use which could be >> explained by the caller, right ? >> >> On Mon, Mar 11, 2024 at 10:57 PM Maximilian Michels <m...@apache.org> >> wrote: >> >> > The FLIP mentions: "The contents described in this FLIP are all new >> > APIs and do not involve compatibility issues." >> > >> > In this thread it looks like the plan is to remove the old state >> > declaration API. I think we should consider keeping the old APIs to >> > avoid breaking too many jobs. The new APIs will still be beneficial >> > for new jobs, e.g. for SQL jobs. >> > >> > -Max >> > >> > On Fri, Mar 8, 2024 at 4:39 AM Zakelly Lan <zakelly....@gmail.com> >> wrote: >> > > >> > > Hi Weijie, >> > > >> > > Thanks for your answer! Well I get your point. Since partitions are >> > > first-class citizens, and redistribution means how states migrate when >> > > partitions change, I'd be fine with deemphasizing the concept of >> > > keyed/operator state if we highlight the definition of partition in >> the >> > > document. Keeping `RedistributionMode` under `StateDeclaration` is >> also >> > > fine with me, as I guess it is only for internal usage. >> > > But still, from a user's point of view, state can be characterized >> along >> > > two relatively independent dimensions, how states redistribute and the >> > data >> > > structure. Thus I still suggest a chained-like configuration API that >> > > configures one aspect on each call, such as: >> > > ``` >> > > # Keyed stream, no redistribution mode specified, the state will go >> with >> > > partition (no redistribution). ---- Keyed state >> > > StateDeclaration a = States.declare(name).listState(type); >> > > >> > > # Keyed stream, redistribution strategy specified, the state follows >> the >> > > specified redistribute strategy. ---- Operator state >> > > StateDeclaration b = >> > > States.declare(name).listState(type).redistributeBy(strategy); >> > > >> > > # Non-keyed stream, redistribution strategy *must be* specified. >> > > StateDeclaration c = >> > > States.declare(name).listState(type).redistributeBy(strategy); >> > > >> > > # Broadcast stream and state >> > > StateDeclaration d = States.declare(name).mapState(typeK, >> > > typeV).broadcast(); >> > > ``` >> > > It can drive users to think about redistribution issues when needed. >> And >> > it >> > > also provides more flexibility to add more combinations such as >> > > broadcasting list state, or chain more configurable aspects such as >> > adding >> > > `withTtl()` in future. WDYT? >> > > >> > > >> > > Best, >> > > Zakelly >> > > >> > > On Thu, Mar 7, 2024 at 6:04 PM weijie guo <guoweijieres...@gmail.com> >> > wrote: >> > > >> > > > Hi Jinzhong, >> > > > >> > > > Thanks for the reply! >> > > > >> > > > > Overall, I think that the “Eager State Declaration” is a good >> > proposal, >> > > > which can enhance Flink's state management capabilities and provide >> > > > possibilities for subsequent state optimizations. >> > > > >> > > > It's nice to see that people who are familiar with the state stuff >> like >> > > > this proposal. :) >> > > > >> > > > > When the user attempts to access an undeclared state at runtime, >> it >> > is >> > > > more reasonable to throw an exception rather than returning >> > Option#empty, >> > > > as Gyula mentioned above. >> > > > >> > > > Yes, I agree that this is better then a confused empty, and I have >> > modified >> > > > the corresponding part of this FLIP. >> > > > >> > > > > In addition, I'm not quite sure whether all of the existing usage >> in >> > > > which states are registered at runtime dynamically can be migrated >> to >> > the >> > > > "Eager State Declaration" style with minimal cost? >> > > > >> > > > I think for most user functions, this is fairly straightforward to >> > migrate. >> > > > But states whose declarations depend on runtime information(e.g. >> > > > RuntimeContext) are, in principle, not supported in the new API. >> > Anyway, >> > > > the old and new apis are completely incompatible, so rewriting jobs >> is >> > > > inevitable. User can think about how to write a good process >> function >> > that >> > > > conforms to the eager declaration style. >> > > > >> > > > > For state TTL, should StateDeclaration also provide interfaces for >> > users >> > > > to declare state ttl? >> > > > >> > > > Of course, We can and we need to provide this one. But whether or >> not >> > it's >> > > > in this FLIP isn't very important for me, because we're mainly >> talking >> > > > about the general principles and ways of declaring and accessing >> state >> > in >> > > > this FLIP. I promise we won't leave it out in the end D). >> > > > >> > > > >> > > > >> > > > Best regards, >> > > > >> > > > Weijie >> > > > >> > > > >> > > > Jinzhong Li <lijinzhong2...@gmail.com> 于2024年3月7日周四 17:34写道: >> > > > >> > > > > Hi Weijie, >> > > > > >> > > > > Thanks for driving this! >> > > > > >> > > > > 1. Overall, I think that the “Eager State Declaration” is a good >> > > > proposal, >> > > > > which can enhance Flink's state management capabilities and >> provide >> > > > > possibilities for subsequent state optimizations. >> > > > > >> > > > > 2. When the user attempts to access an undeclared state at >> runtime, >> > it is >> > > > > more reasonable to throw an exception rather than returning >> > Option#empty, >> > > > > as Gyula mentioned above. >> > > > > In addition, I'm not quite sure whether all of the existing usage >> in >> > > > which >> > > > > states are registered at runtime dynamically can be migrated to >> the >> > > > "Eager >> > > > > State Declaration" style with minimal cost? >> > > > > >> > > > > 3. For state TTL, should StateDeclaration also provide interfaces >> for >> > > > users >> > > > > to declare state ttl? >> > > > > >> > > > > Best, >> > > > > Jinzhong Li >> > > > > >> > > > > >> > > > > On Thu, Mar 7, 2024 at 5:08 PM weijie guo < >> guoweijieres...@gmail.com >> > > >> > > > > wrote: >> > > > > >> > > > > > Hi Hangxiang, >> > > > > > >> > > > > > Thanks for your reply! >> > > > > > >> > > > > > > We have also discussed in FLIP-359/FLINK-32658 about limiting >> the >> > > > user >> > > > > > operation to avoid creating state when processElement. Could >> > current >> > > > > > interfaces also help this? >> > > > > > >> > > > > > I think so. It is illegal to create state at runtime in our >> > proposal. >> > > > > > >> > > > > > >> > > > > > > Could you provide more examples about how useStates() works ? >> > Since >> > > > > some >> > > > > > operations may change their used states at runtime, the value >> this >> > > > method >> > > > > > returns will be modified at runtime, right? >> > > > > > >> > > > > > >> > > > > > No, Introducing and declaring new state >> > > > > > at runtime is something we want to explicitly disallow. You can >> > simply >> > > > > > assume that useState is only called when the JobGraph is >> generated, >> > > > > > and any future changes to it are invalid and illegal. >> > > > > > >> > > > > > >> > > > > > > IIUC, RedistributionMode/Strategy should not be used by users, >> > right >> > > > ? >> > > > > > If so, I'm +1 to move them to inner interfaces which seems a bit >> > > > > confusing >> > > > > > to users. >> > > > > > >> > > > > > >> > > > > > As for this question, I think my answer to Zakelly should be >> > helpful. >> > > > > > >> > > > > > >> > > > > > >> > > > > > Best regards, >> > > > > > >> > > > > > Weijie >> > > > > > >> > > > > > >> > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月7日周四 16:58写道: >> > > > > > >> > > > > > > Hi Zakelly, >> > > > > > > >> > > > > > > Thanks for your reply! >> > > > > > > >> > > > > > > > My advice would be to conceal RedistributionMode/Strategy >> from >> > the >> > > > > > > standard user interface, particularly within the helper class >> > > > 'State'. >> > > > > > But >> > > > > > > I'm OK to keep it in `StateDeclaration` since its interfaces >> are >> > > > > > basically >> > > > > > > used by the framework. >> > > > > > > >> > > > > > > I'm sorry, I didn't mention some of the details/concepts >> > introduced >> > > > by >> > > > > > the Umbrella FLIP and FLIP-409 in this FLIP. This might make it >> > hard to >> > > > > > understand the motivation behind >> > > > > > > RedistributionMode, I'll add more context in FLIP then. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Briefly, in V2, we explicitly define the concept of partition. >> > In the >> > > > > > case of KeyedPartitionStream, one key corresponds to one >> > partition.For >> > > > > > NonKeyedPartitionStream one parallelism/subtask corresponds to >> one >> > > > > > partition. All states are considered to be confined within the >> > > > partition. >> > > > > > On this basis, an obvious question is whether and how the state >> > should >> > > > be >> > > > > > redistribution when the partition changes? So we divide the >> state >> > into >> > > > > > three categories: >> > > > > > > >> > > > > > > - Don't need to redistribute states when the partition >> > changes. >> > > > > > > - Has to decide how to distribute states when the partition >> > > > changes. >> > > > > > > - Always has the same state across different partitions. >> > > > > > > >> > > > > > > After introducing the concept of partition, the redistribution >> > > > > > pattern/mode of state is the more essential difference between >> > states. >> > > > > For >> > > > > > this reason, we don't want to emphasize keyed/operator state in >> > the V2 >> > > > > API >> > > > > > > any >> > > > > > > more. Keep in mind, partition are first-class citizens. And, >> > even in >> > > > > V1, >> > > > > > we have to let the user know that split/union are two different >> > > > > strategies >> > > > > > for list state. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > As for whether or not to expose RedistributionMode to users, I >> > have >> > > > an >> > > > > > open mind. But as I said just now, we still can't avoid this >> > problem in >> > > > > the >> > > > > > splitRedistributionListState and unionRedistributionListState. >> IMO, >> > > > it's >> > > > > > better to explain it in the API level instead of avoiding it. >> WDTY? >> > > > > > > >> > > > > > > Best regards, >> > > > > > > >> > > > > > > Weijie >> > > > > > > >> > > > > > > >> > > > > > > weijie guo <guoweijieres...@gmail.com> 于2024年3月7日周四 16:39写道: >> > > > > > > >> > > > > > >> Hi Gyula, >> > > > > > >> >> > > > > > >> >> > > > > > >> Thanks for your reply! >> > > > > > >> >> > > > > > >> >> > > > > > >> Let me answer these questions: >> > > > > > >> >> > > > > > >> >> > > > > > >> > What is the semantics of the usesStates method? When is it >> > called? >> > > > > Can >> > > > > > >> the used state change dynamically at runtime? Can the logic >> > depend >> > > > on >> > > > > > something computed in open(..) for example? >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> useStates is used to predefine all the states that the >> process >> > > > > function >> > > > > > needs to access. In other words, we want to avoid declaring the >> > state >> > > > > > dynamically at runtime and this allows the SQL planner and JM to >> > > > optimize >> > > > > > the job better. As a result, this logic must be fully available >> at >> > > > > compile >> > > > > > time (when the JobGraph is generated), so it can't rely on >> > computations >> > > > > > that are executed after deploy to TM. >> > > > > > >> >> > > > > > >> >> > > > > > >> > >> > > > > > >> Currently state access is pretty dynamic in Flink and I would >> > assume >> > > > > > many jobs create states on the fly based on some required logic. >> > Are we >> > > > > > planning to address these use-cases? >> > > > > > >> >> > > > > > >> >> > > > > > >> It depends on what type of context we need. If the type and >> > number >> > > > of >> > > > > > states depend on runtime context, that's something we want to >> > avoid. If >> > > > > it >> > > > > > only depended on information available at compile time, I think >> we >> > > > could >> > > > > > support >> > > > > > >> it. >> > > > > > >> >> > > > > > >> >> > > > > > >> > >> > > > > > >> Are we planning to support deleting/dropping states that are >> not >> > > > > > required anymore? >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> We really don't want the user to be able to dynamically >> > > > declare/delete >> > > > > > a state at runtime, but if you just want to clear/clean the >> value >> > of >> > > > > state, >> > > > > > the new API works the same as the old API. >> > > > > > >> >> > > > > > >> >> > > > > > >> > I think if a state is not declared or otherwise cannot be >> > > > accessed, >> > > > > an >> > > > > > >> exceptions must be thrown. We cannot confuse empty value with >> > > > > something >> > > > > > >> inaccessible. >> > > > > > >> >> > > > > > >> >> > > > > > >> After thinking about it a bit more, I think you have a point! >> > > > > > >> It's important to make a clear distinction between an empty >> > state >> > > > and >> > > > > > illegal access, especially since flink currently discourage >> > setting a >> > > > > > non-null default value for the state. >> > > > > > >> I will modify the proposal as you suggested then :) >> > > > > > >> >> > > > > > >> >> > > > > > >> > The RedistributionMode enum sounds a bit strange to me, as >> it >> > > > > doesn't >> > > > > > >> actually specify a mode of redistribution. It feels more >> like a >> > > > flag. >> > > > > > Can >> > > > > > >> we simply have an Optional<RedistributionStrategy> instead? >> > > > > > >> >> > > > > > >> >> > > > > > >> We actually define three types RedistributionMode instead of >> two >> > > > > because >> > > > > > >> we don't want to think of IDENTICAL as a redistribution >> > strategy, >> > > > it's >> > > > > > just >> > > > > > >> an invariant: the State of that type is always the same >> across >> > > > > > partitions. >> > > > > > >> If it only has None and REDISTRIBUTABLE, I think your >> proposal >> > is >> > > > > > >> feasible then. But we don't want to confuse these three >> > > > > semantics/modes. >> > > > > > >> >> > > > > > >> >> > > > > > >> > BroadcastStates are currently very limited by only Map-like >> > > > states, >> > > > > > and >> > > > > > >> the new interface also enforces that. Can we remove this >> > limitation? >> > > > > If >> > > > > > >> not, should broadcastState declaration extend mapstate >> > declaration? >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> Personally, I don't want to make this restriction. This is >> also >> > why >> > > > > the >> > > > > > method in StateManager to get BroadcastState has the parameter >> of >> > > > > > BroadcastStateDeclaration instead of MapStateDeclaration. In the >> > > > future, >> > > > > if >> > > > > > the state backend supports other types of broadcast state, we >> can >> > add a >> > > > > > corresponding method to the States utility class to get the >> > > > > > BroadcastSateDeclaration. >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> Best regards, >> > > > > > >> >> > > > > > >> Weijie >> > > > > > >> >> > > > > > >> >> > > > > > >> Hangxiang Yu <master...@gmail.com> 于2024年3月7日周四 11:55写道: >> > > > > > >> >> > > > > > >>> Hi, Weijie. >> > > > > > >>> Thanks for your proposal. >> > > > > > >>> I'd like to start the discussion with some questions: >> > > > > > >>> 1. We have also discussed in FLIP-359/FLINK-32658 about >> > limiting >> > > > the >> > > > > > user >> > > > > > >>> operation to avoid creating state when processElement. Could >> > > > current >> > > > > > >>> interfaces also help this? >> > > > > > >>> >> > > > > > >>> 2. Could you provide more examples about how useStates() >> works >> > ? >> > > > > Since >> > > > > > >>> some >> > > > > > >>> operations may change their used states at runtime, the >> value >> > this >> > > > > > method >> > > > > > >>> returns will be modified at runtime, right ? >> > > > > > >>> If so, I'm thinking if we could get some deterministic State >> > > > > > Declaration >> > > > > > >>> Set before running which could help a lot for some state >> > operations >> > > > > > e.g. >> > > > > > >>> pre-check schema compatibility, queryable schema. >> > > > > > >>> >> > > > > > >>> 3. IIUC, RedistributionMode/Strategy should not be used by >> > users, >> > > > > > right ? >> > > > > > >>> If so, I'm +1 to move them to inner interfaces which seems a >> > bit >> > > > > > >>> confusing >> > > > > > >>> to users. >> > > > > > >>> >> > > > > > >>> On Thu, Mar 7, 2024 at 11:39 AM Zakelly Lan < >> > zakelly....@gmail.com >> > > > > >> > > > > > >>> wrote: >> > > > > > >>> >> > > > > > >>> > Hi Weijie, >> > > > > > >>> > >> > > > > > >>> > Thanks for proposing this! >> > > > > > >>> > >> > > > > > >>> > Unifying and optimizing state definitions is a very good >> > thing. I >> > > > > > like >> > > > > > >>> the >> > > > > > >>> > idea of 'definition goes before using', so overall +1 for >> > this >> > > > > > >>> proposal. >> > > > > > >>> > >> > > > > > >>> > However, I think the current definition is somewhat >> unclear. >> > > > From a >> > > > > > >>> user's >> > > > > > >>> > point of view, I believe that state can be characterized >> > along >> > > > two >> > > > > > >>> > relatively independent axes: the scenario (keyed, >> non-keyed, >> > or >> > > > > > >>> broadcast) >> > > > > > >>> > and the data structure (single value, list, map). I >> recommend >> > > > that >> > > > > we >> > > > > > >>> fully >> > > > > > >>> > decouple these aspects, rather than linking the nature of >> the >> > > > > > >>> definition to >> > > > > > >>> > specific assumptions, such as equating broadcast states >> with >> > > > maps, >> > > > > or >> > > > > > >>> > considering list states could be non-keyed. >> > > > > > >>> > Furthermore, the concept of 'Redistribution' may impose a >> > > > cognitive >> > > > > > >>> burden >> > > > > > >>> > on general users. My advice would be to conceal >> > > > > > >>> RedistributionMode/Strategy >> > > > > > >>> > from the standard user interface, particularly within the >> > helper >> > > > > > class >> > > > > > >>> > 'State'. But I'm OK to keep it in `StateDeclaration` since >> > its >> > > > > > >>> interfaces >> > > > > > >>> > are basically used by the framework. My preferred syntax >> > would >> > > > be: >> > > > > > >>> > ``` >> > > > > > >>> > StateDeclaration a = >> > State.declare(name).keyed().listState(type); >> > > > > > >>> > StateDeclaration b = >> > > > > State.declare(name).broadcast().mapState(typeK, >> > > > > > >>> > typeV); >> > > > > > >>> > StateDeclaration c = >> > > > > > State.declare(name).keyed().aggregatingState(type, >> > > > > > >>> > function); >> > > > > > >>> > ``` >> > > > > > >>> > WDYT? >> > > > > > >>> > >> > > > > > >>> > >> > > > > > >>> > Best, >> > > > > > >>> > Zakelly >> > > > > > >>> > >> > > > > > >>> > On Wed, Mar 6, 2024 at 11:04 PM Gyula Fóra < >> > gyula.f...@gmail.com >> > > > > >> > > > > > >>> wrote: >> > > > > > >>> > >> > > > > > >>> > > Hi Weijie! >> > > > > > >>> > > >> > > > > > >>> > > Thank you for the proposal. >> > > > > > >>> > > >> > > > > > >>> > > I have some initial questions to start the discussion: >> > > > > > >>> > > >> > > > > > >>> > > 1. What is the semantics of the usesStates method? When >> is >> > it >> > > > > > >>> called? Can >> > > > > > >>> > > the used state change dynamically at runtime? Can the >> logic >> > > > > depend >> > > > > > on >> > > > > > >>> > > something computed in open(..) for example? >> > > > > > >>> > > >> > > > > > >>> > > Currently state access is pretty dynamic in Flink and I >> > would >> > > > > > assume >> > > > > > >>> many >> > > > > > >>> > > jobs create states on the fly based on some required >> > logic. Are >> > > > > we >> > > > > > >>> > planning >> > > > > > >>> > > to address these use-cases? >> > > > > > >>> > > >> > > > > > >>> > > Are we planning to support deleting/dropping states that >> > are >> > > > not >> > > > > > >>> required >> > > > > > >>> > > anymore? >> > > > > > >>> > > >> > > > > > >>> > > 2. Get state now returns an optional, but you mention >> that: >> > > > > > >>> > > " If you want to get a state that is not declared or >> has no >> > > > > access, >> > > > > > >>> > > Option#empty is returned." >> > > > > > >>> > > >> > > > > > >>> > > I think if a state is not declared or otherwise cannot >> be >> > > > > accessed, >> > > > > > >>> an >> > > > > > >>> > > exceptions must be thrown. We cannot confuse empty value >> > with >> > > > > > >>> something >> > > > > > >>> > > inaccessible. >> > > > > > >>> > > >> > > > > > >>> > > 3. The RedistributionMode enum sounds a bit strange to >> me, >> > as >> > > > it >> > > > > > >>> doesn't >> > > > > > >>> > > actually specify a mode of redistribution. It feels more >> > like a >> > > > > > >>> flag. Can >> > > > > > >>> > > we simply have an Optional<RedistributionStrategy> >> instead? >> > > > > > >>> > > >> > > > > > >>> > > 4. BroadcastStates are currently very limited by only >> > Map-like >> > > > > > >>> states, >> > > > > > >>> > and >> > > > > > >>> > > the new interface also enforces that. >> > > > > > >>> > > Can we remove this limitation? If not, should >> > broadcastState >> > > > > > >>> declaration >> > > > > > >>> > > extend mapstate declaration? >> > > > > > >>> > > >> > > > > > >>> > > Cheers, >> > > > > > >>> > > Gyula >> > > > > > >>> > > >> > > > > > >>> > > Cheers >> > > > > > >>> > > Gyuka >> > > > > > >>> > > >> > > > > > >>> > > On Wed, Mar 6, 2024 at 11:18 AM weijie guo < >> > > > > > >>> guoweijieres...@gmail.com> >> > > > > > >>> > > wrote: >> > > > > > >>> > > >> > > > > > >>> > > > Hi devs, >> > > > > > >>> > > > >> > > > > > >>> > > > I'd like to start a discussion about FLIP-433: State >> > Access >> > > > on >> > > > > > >>> > > > DataStream API V2 >> > > > > > >>> > > > [1]. This is the third sub-FLIP of DataStream API V2. >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > After FLIP-410 [2], we can already write a simple >> > stateless >> > > > job >> > > > > > >>> using >> > > > > > >>> > the >> > > > > > >>> > > > DataStream V2 API. But as we all know, stateful >> > computing is >> > > > > > >>> Flink's >> > > > > > >>> > > trump >> > > > > > >>> > > > card. In this FLIP, we will discuss how to declare and >> > access >> > > > > > >>> state on >> > > > > > >>> > > > DataStream API V2 and we manage to avoid some of the >> > > > > shortcomings >> > > > > > >>> of V1 >> > > > > > >>> > > in >> > > > > > >>> > > > this regard. >> > > > > > >>> > > > >> > > > > > >>> > > > You can find more details in this FLIP. Its >> relationship >> > with >> > > > > > other >> > > > > > >>> > > > sub-FLIPs can be found in the umbrella FLIP >> > > > > > >>> > > > [3]. Looking forward to hearing from you, thanks! >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > Best regards, >> > > > > > >>> > > > >> > > > > > >>> > > > Weijie >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > [1] >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > >> > > > > > >>> > >> > > > > > >>> >> > > > > > >> > > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-433%3A+State+Access+on+DataStream+API+V2 >> > > > > > >>> > > > >> > > > > > >>> > > > [2] >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > >> > > > > > >>> > >> > > > > > >>> >> > > > > > >> > > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-410%3A++Config%2C+Context+and+Processing+Timer+Service+of+DataStream+API+V2 >> > > > > > >>> > > > >> > > > > > >>> > > > [3] >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > >> > > > > > >>> > >> > > > > > >>> >> > > > > > >> > > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2 >> > > > > > >>> > > > >> > > > > > >>> > > >> > > > > > >>> > >> > > > > > >>> >> > > > > > >>> >> > > > > > >>> -- >> > > > > > >>> Best, >> > > > > > >>> Hangxiang. >> > > > > > >>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > >> >> >> -- >> Best, >> Hangxiang. >> >