[ https://issues.apache.org/jira/browse/FLINK-25662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Thomas Weise updated FLINK-25662: --------------------------------- Fix Version/s: (was: 1.14.3) > Access KeyedStateStore in WindowAssigner > ---------------------------------------- > > Key: FLINK-25662 > URL: https://issues.apache.org/jira/browse/FLINK-25662 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.14.2 > Reporter: Sun Sheng > Priority: Major > > Currently, when implement our own WindowAssigner, by the > WindowAssignerContext, we can call only one function of > getCurrentProcessingTime(), but there's no function to access the > KeyedStateStore, in some cases I think it's necessary. > > In a stock trading, the events are controlled by it's status, such as > non-trading, continuous-trading, e.g. and the status is in separated events, > and we want to merge the windows from non-trading to continuous-trading as > one window, by assigning a long-window for non-trading and then merge them > for continuous-trading we can do it(using MergingWindowAssigner). > But for recovery, we have to save the status to the KeyedStateStore, because > the status is in separated events. > At present, we have to add a KeyedProcessFunction before windowing, to save > the status and send the status to the window assigner, it works but the code > become more bloat: > {code:java} > stream.keyBy(...) > .process(MyProcessFunction) > .keyBy(...) > .window(MyWindowAssigner) > ... {code} > > I have tried to modify the Flink source code locally, by adding a function to > WindowAssignerContext and implement it in WindowOperator: > WindowAssigner.java > {code:java} > public abstract static class WindowAssignerContext { > /** Returns the current processing time. */ > public abstract long getCurrentProcessingTime(); > public abstract KeyedStateStore globalState(); > }{code} > WindowOperator.java > {code:java} > ... > windowAssignerContext = new WindowAssigner.WindowAssignerContext() { > @Override > public long getCurrentProcessingTime() { > return internalTimerService.currentProcessingTime(); > } @Override > public KeyedStateStore globalState() { > return WindowOperator.this.getKeyedStateStore(); > } > }; > ...{code} > The implementation is same as the WindowContext, very simple. > > By testing locally everything works fine, and my code becomes more clean. > If it's useful I'm glad to work on it. :) -- This message was sent by Atlassian Jira (v8.20.1#820001)