[jira] [Updated] (FLINK-25662) Access KeyedStateStore in WindowAssigner
[ https://issues.apache.org/jira/browse/FLINK-25662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated FLINK-25662: - Fix Version/s: (was: 1.14.3) > Access KeyedStateStore in WindowAssigner > > > Key: FLINK-25662 > URL: https://issues.apache.org/jira/browse/FLINK-25662 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.14.2 >Reporter: Sun Sheng >Priority: Major > > Currently, when implement our own WindowAssigner, by the > WindowAssignerContext, we can call only one function of > getCurrentProcessingTime(), but there's no function to access the > KeyedStateStore, in some cases I think it's necessary. > > In a stock trading, the events are controlled by it's status, such as > non-trading, continuous-trading, e.g. and the status is in separated events, > and we want to merge the windows from non-trading to continuous-trading as > one window, by assigning a long-window for non-trading and then merge them > for continuous-trading we can do it(using MergingWindowAssigner). > But for recovery, we have to save the status to the KeyedStateStore, because > the status is in separated events. > At present, we have to add a KeyedProcessFunction before windowing, to save > the status and send the status to the window assigner, it works but the code > become more bloat: > {code:java} > stream.keyBy(...) > .process(MyProcessFunction) > .keyBy(...) > .window(MyWindowAssigner) > ... {code} > > I have tried to modify the Flink source code locally, by adding a function to > WindowAssignerContext and implement it in WindowOperator: > WindowAssigner.java > {code:java} > public abstract static class WindowAssignerContext { > /** Returns the current processing time. */ > public abstract long getCurrentProcessingTime(); > public abstract KeyedStateStore globalState(); > }{code} > WindowOperator.java > {code:java} > ... > windowAssignerContext = new WindowAssigner.WindowAssignerContext() { > @Override > public long getCurrentProcessingTime() { > return internalTimerService.currentProcessingTime(); > } @Override > public KeyedStateStore globalState() { > return WindowOperator.this.getKeyedStateStore(); > } > }; > ...{code} > The implementation is same as the WindowContext, very simple. > > By testing locally everything works fine, and my code becomes more clean. > If it's useful I'm glad to work on it. :) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25662) Access KeyedStateStore in WindowAssigner
[ https://issues.apache.org/jira/browse/FLINK-25662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sun Sheng updated FLINK-25662: -- Description: Currently, when implement our own WindowAssigner, by the WindowAssignerContext, we can call only one function of getCurrentProcessingTime(), but there's no function to access the KeyedStateStore, in some cases I think it's necessary. In a stock trading, the events are controlled by it's status, such as non-trading, continuous-trading, e.g. and the status is in separated events, and we want to merge the windows from non-trading to continuous-trading as one window, by assigning a long-window for non-trading and then merge them for continuous-trading we can do it(using MergingWindowAssigner). But for recovery, we have to save the status to the KeyedStateStore, because the status is in separated events. At present, we have to add a KeyedProcessFunction before windowing, to save the status and send the status to the window assigner, it works but the code become more bloat: {code:java} stream.keyBy(...) .process(MyProcessFunction) .keyBy(...) .window(MyWindowAssigner) ... {code} I have tried to modify the Flink source code locally, by adding a function to WindowAssignerContext and implement it in WindowOperator: WindowAssigner.java {code:java} public abstract static class WindowAssignerContext { /** Returns the current processing time. */ public abstract long getCurrentProcessingTime(); public abstract KeyedStateStore globalState(); }{code} WindowOperator.java {code:java} ... windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override public long getCurrentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } }; ...{code} The implementation is same as the WindowContext, very simple. By testing locally everything works fine, and my code becomes more clean. If it's useful I'm glad to work on it. :) was: Currently, when implement our own WindowAssigner, by the WindowAssignerContext, we can call only one function of getCurrentProcessingTime(), but there's no function to access the KeyedStateStore, in some cases I think it's necessary. In a stock trading, the events are controlled by it's status, such as non-trading, continuous-trading, e.g. and the status is in separated events, and we want to merge the windows from non-trading to continuous-trading as one window, by assigning a long-window for non-trading and then merge them for continuous-trading we can do it(using MergingWindowAssigner). But for recovery, we have to save the status to the KeyedStateStore, because the status is in separated events. At present, we have to add a KeyedProcessFunction before windowing, to save the status and send the status to the window assigner, it works but the code become more bloat: {code:java} stream.keyBy(...).process(MyProcessFunction).keyBy(...).window(MyWindowAssigner)... {code} I have tried to modify the Flink source code locally, by adding a function to WindowAssignerContex and implement it in WindowOperator: WindowAssigner.java {code:java} public abstract static class WindowAssignerContext { /** Returns the current processing time. */ public abstract long getCurrentProcessingTime(); public abstract KeyedStateStore globalState(); }{code} WindowOperator.java {code:java} windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override public long getCurrentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } }; {code} The implementation is same as the WindowContext, very simple. By testing locally everything works fine, and my code becomes more clean. If it's useful I'm glad to work on it. :) > Access KeyedStateStore in WindowAssigner > > > Key: FLINK-25662 > URL: https://issues.apache.org/jira/browse/FLINK-25662 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.14.2 >Reporter: Sun Sheng >Priority: Major > Fix For: 1.14.3 > > > Currently, when implement our own WindowAssigner, by the > WindowAssignerContext, we can call only one function of > getCurrentProcessingTime(), but there's no function to access the > KeyedStateStore, in some cases I think it's necessary. > > In a stock trading, the events are controlled by it's status, such as > non-trading, continuous-trading, e.g. and the status is in separated events, > and we want to merge the windows from non-trading to continu
[jira] [Updated] (FLINK-25662) Access KeyedStateStore in WindowAssigner
[ https://issues.apache.org/jira/browse/FLINK-25662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sun Sheng updated FLINK-25662: -- Description: Currently, when implement our own WindowAssigner, by the WindowAssignerContext, we can call only one function of getCurrentProcessingTime(), but there's no function to access the KeyedStateStore, in some cases I think it's necessary. In a stock trading, the events are controlled by it's status, such as non-trading, continuous-trading, e.g. and the status is in separated events, and we want to merge the windows from non-trading to continuous-trading as one window, by assigning a long-window for non-trading and then merge them for continuous-trading we can do it(using MergingWindowAssigner). But for recovery, we have to save the status to the KeyedStateStore, because the status is in separated events. At present, we have to add a KeyedProcessFunction before windowing, to save the status and send the status to the window assigner, it works but the code become more bloat: {code:java} stream.keyBy(...).process(MyProcessFunction).keyBy(...).window(MyWindowAssigner)... {code} I have tried to modify the Flink source code locally, by adding a function to WindowAssignerContex and implement it in WindowOperator: WindowAssigner.java {code:java} public abstract static class WindowAssignerContext { /** Returns the current processing time. */ public abstract long getCurrentProcessingTime(); public abstract KeyedStateStore globalState(); }{code} WindowOperator.java {code:java} windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override public long getCurrentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } }; {code} The implementation is same as the WindowContext, very simple. By testing locally everything works fine, and my code becomes more clean. If it's useful I'm glad to work on it. :) was: Currently, when implement our own WindowAssigner, by the WindowAssignerContext, we can call only one function of getCurrentProcessingTime(), but there's no function to access the KeyedStateStore, in some cases I think it's necessary. In a stock trading, the events are controlled by it's status, such as non-trading, continuous-trading, e.g. and the status is in separated events, and we want to merge the windows from non-trading to continuous-trading as one window, by assigning a long-window for non-trading and then merge them for continuous-trading we can do it(using MergingWindowAssigner). But for recovery, we have to save the status to the KeyedStateStore, because the status is in separated events. At present, we have to add a KeyedProcessFunction before windowing, to save the status and send the status to the window assigner, it works but the code become more bloat: stream.keyBy(...).process(MyProcessFunction).keyBy(...).window(MyWindowAssigner)... I have tried to modified the Flink source code locally, by adding a function to WindowAssignerContex and implement it in WindowOperator: // WindowAssigner.java public abstract static class WindowAssignerContext { /** Returns the current processing time. */ public abstract long getCurrentProcessingTime(); public abstract KeyedStateStore globalState(); } // WindowOperator.java windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override public long getCurrentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } }; The implementation is same as the WindowContext, very simple. By testing locally everything works fine, and my code becomes more clean. If it's useful I'm glad to work on it. :) > Access KeyedStateStore in WindowAssigner > > > Key: FLINK-25662 > URL: https://issues.apache.org/jira/browse/FLINK-25662 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.14.2 >Reporter: Sun Sheng >Priority: Major > Fix For: 1.14.3 > > > Currently, when implement our own WindowAssigner, by the > WindowAssignerContext, we can call only one function of > getCurrentProcessingTime(), but there's no function to access the > KeyedStateStore, in some cases I think it's necessary. > > In a stock trading, the events are controlled by it's status, such as > non-trading, continuous-trading, e.g. and the status is in separated events, > and we want to merge the windows from non-trading to continuous-trading as > one window, by assigning a long-window for non-tra
[jira] [Updated] (FLINK-25662) Access KeyedStateStore in WindowAssigner
[ https://issues.apache.org/jira/browse/FLINK-25662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sun Sheng updated FLINK-25662: -- Description: Currently, when implement our own WindowAssigner, by the WindowAssignerContext, we can call only one function of getCurrentProcessingTime(), but there's no function to access the KeyedStateStore, in some cases I think it's necessary. In a stock trading, the events are controlled by it's status, such as non-trading, continuous-trading, e.g. and the status is in separated events, and we want to merge the windows from non-trading to continuous-trading as one window, by assigning a long-window for non-trading and then merge them for continuous-trading we can do it(using MergingWindowAssigner). But for recovery, we have to save the status to the KeyedStateStore, because the status is in separated events. At present, we have to add a KeyedProcessFunction before windowing, to save the status and send the status to the window assigner, it works but the code become more bloat: stream.keyBy(...).process(MyProcessFunction).keyBy(...).window(MyWindowAssigner)... I have tried to modified the Flink source code locally, by adding a function to WindowAssignerContex and implement it in WindowOperator: // WindowAssigner.java public abstract static class WindowAssignerContext { /** Returns the current processing time. */ public abstract long getCurrentProcessingTime(); public abstract KeyedStateStore globalState(); } // WindowOperator.java windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override public long getCurrentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } }; The implementation is same as the WindowContext, very simple. By testing locally everything works fine, and my code becomes more clean. If it's useful I'm glad to work on it. :) was: Currently, when implement our own WindowAssigner, by the WindowAssignerContext, we can call only one function of getCurrentProcessingTime(), but there's no function to access the KeyedStateStore, in some cases I think it's necessary. In a stock trading, the events are controlled by it's status, such as non-trading, continuous-trading, e.g. and the status is in separated events, and we want to merge the windows from non-trading to continuous-trading as one window, by assigning a long-window for non-trading and then merge them for continuous-trading we can do it(using MergingWindowAssigner). But for recovery, we have to save the status to the KeyedStateStore, because the status is in separated events. At present, we have to add a KeyedProcessFunction before windowing, to save the status and send the status to the window assigner, it works but the code become more bloat: stream.keyBy(...).process(MyProcessFunction).keyBy(...).window(MyWindowAssigner)... I have tried to modified the Flink source code locally, by adding a function to WindowAssignerContex and implement it in WindowOperator: // WindowAssigner.java public abstract static class WindowAssignerContext { /** Returns the current processing time. */ public abstract long getCurrentProcessingTime(); public abstract KeyedStateStore globalState(); } // WindowOperator.java windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override public long getCurrentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } }; The implementation is same as the WindowContext, very simple. By testing locally everything works fine, and my code becomes more clean. If it's useful I'm glad to work on it. :) > Access KeyedStateStore in WindowAssigner > > > Key: FLINK-25662 > URL: https://issues.apache.org/jira/browse/FLINK-25662 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.14.2 >Reporter: Sun Sheng >Priority: Major > Fix For: 1.14.3 > > > Currently, when implement our own WindowAssigner, by the > WindowAssignerContext, we can call only one function of > getCurrentProcessingTime(), but there's no function to access the > KeyedStateStore, in some cases I think it's necessary. > > In a stock trading, the events are controlled by it's status, such as > non-trading, continuous-trading, e.g. and the status is in separated events, > and we want to merge the windows from non-trading to continuous-trading as > one window, by assigning a long-window for non-trading and then merge them > for continuous-trading we can do it(using MergingWindowAssi