I found that flink SQL use the specific default trigger, which will not
triggered until the window closes. But sometimes, we need to trigger before
window closes. As the class *WindowAssigner *provides method
*getDefaultTrigger *with parameter *StreamExecutionEnvironment*, how about
passing a custom trigger to *WindowAssigner *by *StreamExecutionEnvironment
*?
    If we can do this, SQL can support custom triggers. It is easy to
implement. All we need to do is just add new variables named like
*defaultTimeWindowTrigger *and *defaultGlobalWindowTrigger
*to*StreamExecutionEnvironment*, which can be set by public setter method.
Then *WindowAssigner *could get the *defaultWindowTrigger *or
*defaultGlobalWindowTrigger *from *StreamExecutionEnvironment *by
*getDefaultTrigger *method. 

Codes :
*StreamExecutionEnvironment*:
/** The default trigger used for creating a time window */
    private Trigger<Object, TimeWindow> defaultTimeWindowTrigger;

    /** The default trigger used for creating a global window */
    private Trigger<Object, GlobalWindow> defaultGlobalWindowTrigger;
/**
     * Get default trigger of time window
     * @return
     */
    public Trigger<Object, TimeWindow> getDefaultTimeWindowTrigger() {
return defaultTimeWindowTrigger; }

    /**
     * Set default trigger of time window
     * @param defaultTimeWindowTrigger
     */
    public void setDefaultTimeWindowTrigger(Trigger<Object, TimeWindow>
defaultTimeWindowTrigger) {
        this.defaultTimeWindowTrigger = defaultTimeWindowTrigger;
    }

    /**
     * Get default trigger of global window
     * @return
     */
    public Trigger<Object, GlobalWindow> getDefaultGlobalWindowTrigger() {
return defaultGlobalWindowTrigger; }

    /**
     * Set default trigger of global window
     * @param defaultGlobalWindowTrigger
     */
    public void setDefaultGlobalWindowTrigger(Trigger<Object, GlobalWindow>
defaultGlobalWindowTrigger) {
        this.defaultGlobalWindowTrigger = defaultGlobalWindowTrigger;
    }

*TumblingEventTimeWindows/ TumblingProcessingTimeWindows/…*
       @Override
    public Trigger<Object, TimeWindow>
getDefaultTrigger(StreamExecutionEnvironment env) {
        // Get default trigger from StreamExecutionEnvironment
        Trigger<Object, TimeWindow> defaultTrigger =
env.getDefaultTimeWindowTrigger();
        if (defaultTrigger != null) {
            return defaultTrigger;
        }
        return EventTimeTrigger.create();
}

*GlobalWindows*:
@Override
    public Trigger<Object, GlobalWindow>
getDefaultTrigger(StreamExecutionEnvironment env) {
        // Get default trigger from StreamExecutionEnvironment
        Trigger<Object, GlobalWindow> defaultTrigger =
env.getDefaultGlobalWindowTrigger();
        if (defaultTrigger != null) {
            return defaultTrigger;
        }

        return new NeverTrigger();
    }


*Look forward to your comments. I would really appreciate taking the time to
help me think about this.*





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to