[ 
https://issues.apache.org/jira/browse/FLINK-25662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Weise updated FLINK-25662:
---------------------------------
    Fix Version/s:     (was: 1.14.3)

> Access KeyedStateStore in WindowAssigner
> ----------------------------------------
>
>                 Key: FLINK-25662
>                 URL: https://issues.apache.org/jira/browse/FLINK-25662
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.14.2
>            Reporter: Sun Sheng
>            Priority: Major
>
> Currently, when implement our own WindowAssigner, by the 
> WindowAssignerContext, we can call only one function of 
> getCurrentProcessingTime(), but there's no function to access the 
> KeyedStateStore, in some cases I think it's necessary.
>  
> In a stock trading, the events are controlled by it's status, such as 
> non-trading, continuous-trading, e.g. and the status is in separated events, 
> and we want to merge the windows from non-trading to continuous-trading as 
> one window, by assigning a long-window for non-trading and then merge them 
> for continuous-trading we can do it(using MergingWindowAssigner).
> But for recovery, we have to save the status to the KeyedStateStore, because 
> the status is in separated events.
> At present, we have to add a KeyedProcessFunction before windowing, to save 
> the status and send the status to the window assigner, it works but the code 
> become more bloat:
> {code:java}
> stream.keyBy(...)
>     .process(MyProcessFunction)
>     .keyBy(...)
>     .window(MyWindowAssigner)
>     ... {code}
>  
> I have tried to modify the Flink source code locally, by adding a function to 
> WindowAssignerContext and implement it in WindowOperator:
> WindowAssigner.java
> {code:java}
> public abstract static class WindowAssignerContext {     
>     /** Returns the current processing time. */
>     public abstract long getCurrentProcessingTime();
>     public abstract KeyedStateStore globalState();
> }{code}
> WindowOperator.java
> {code:java}
> ...
> windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
>     @Override
>     public long getCurrentProcessingTime() {
>         return internalTimerService.currentProcessingTime();
>     }    @Override
>     public KeyedStateStore globalState() {
>         return WindowOperator.this.getKeyedStateStore();
>     }
> }; 
> ...{code}
> The implementation is same as the WindowContext, very simple.
>  
> By testing locally everything works fine, and my code becomes more clean.
> If it's useful I'm glad to work on it. :)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to