Sun Sheng created FLINK-25662:
---------------------------------

             Summary: Access KeyedStateStore in WindowAssigner
                 Key: FLINK-25662
                 URL: https://issues.apache.org/jira/browse/FLINK-25662
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream
    Affects Versions: 1.14.2
            Reporter: Sun Sheng
             Fix For: 1.14.3


Currently, when implement our own WindowAssigner, by the WindowAssignerContext, 
we can call only one function of getCurrentProcessingTime(), but there's no 
function to access the KeyedStateStore, in some cases I think it's necessary.

 

In a stock trading, the events are controlled by it's status, such as 
non-trading, continuous-trading, e.g. and the status is in separated events, 
and we want to merge the windows from non-trading to continuous-trading as one 
window, by assigning a long-window for non-trading and then merge them for 
continuous-trading we can do it(using MergingWindowAssigner).

But for recovery, we have to save the status to the KeyedStateStore, because 
the status is in separated events.

At present, we have to add a KeyedProcessFunction before windowing, to save the 
status and send the status to the window assigner, it works but the code become 
more bloat:

stream.keyBy(...).process(MyProcessFunction).keyBy(...).window(MyWindowAssigner)...

 

I have tried to modified the Flink source code locally, by adding a function to 
WindowAssignerContex and implement it in WindowOperator:

// WindowAssigner.java

public abstract static class WindowAssignerContext {
    /** Returns the current processing time. */
    public abstract long getCurrentProcessingTime();
    public abstract KeyedStateStore globalState();
}

// WindowOperator.java

windowAssignerContext =
new WindowAssigner.WindowAssignerContext() {
@Override
public long getCurrentProcessingTime() {
return internalTimerService.currentProcessingTime();
}
@Override
public KeyedStateStore globalState() {
return WindowOperator.this.getKeyedStateStore();
}
};

The implementation is same as the WindowContext, very simple.

 

By testing locally everything works fine, and my code becomes more clean.

If it's useful I'm glad to work on it. :)

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to