Hi,

Although this solution looks straight-forward, custom triggers cannot be
added that easily.

The problem is that a window operator with a Trigger that emit early
results produces updates, i.e., results that have been emitted might be
updated later.
The default Trigger only emits the final result and hence does not produce
updates.
This is an important difference, because all following operators need to be
aware of the updates and be able to process them to prevent incorrect
results.

Therefore, the query planner needs to be aware of the semantics of the
Trigger. This would not be the case if it would be set via the
StreamExecutionEnvironment.
There is a proposal to add an EMIT clause to SQL queries to control the
rate at which results are emitted [1] that might be interesting.

Best,
Fabian

[1]
https://docs.google.com/document/d/1wrla8mF_mmq-NW9sdJHYVgMyZsgCmHumJJ5f5WUzTiM/edit?ts=59816a8b#heading=h.1zg4jlmqwzlr






2018-06-22 4:48 GMT+02:00 YennieChen88 <chenyanyi...@jd.com>:

>     I found that flink SQL use the specific default trigger, which will not
> triggered until the window closes. But sometimes, we need to trigger before
> window closes. As the class *WindowAssigner *provides method
> *getDefaultTrigger *with parameter *StreamExecutionEnvironment*, how about
> passing a custom trigger to *WindowAssigner *by *StreamExecutionEnvironment
> *?
>     If we can do this, SQL can support custom triggers. It is easy to
> implement. All we need to do is just add new variables named like
> *defaultTimeWindowTrigger *and *defaultGlobalWindowTrigger
> *to*StreamExecutionEnvironment*, which can be set by public setter method.
> Then *WindowAssigner *could get the *defaultWindowTrigger *or
> *defaultGlobalWindowTrigger *from *StreamExecutionEnvironment *by
> *getDefaultTrigger *method.
>
> Codes :
> *StreamExecutionEnvironment*:
> /** The default trigger used for creating a time window */
>     private Trigger<Object, TimeWindow> defaultTimeWindowTrigger;
>
>     /** The default trigger used for creating a global window */
>     private Trigger<Object, GlobalWindow> defaultGlobalWindowTrigger;
> /**
>      * Get default trigger of time window
>      * @return
>      */
>     public Trigger<Object, TimeWindow> getDefaultTimeWindowTrigger() {
> return defaultTimeWindowTrigger; }
>
>     /**
>      * Set default trigger of time window
>      * @param defaultTimeWindowTrigger
>      */
>     public void setDefaultTimeWindowTrigger(Trigger<Object, TimeWindow>
> defaultTimeWindowTrigger) {
>         this.defaultTimeWindowTrigger = defaultTimeWindowTrigger;
>     }
>
>     /**
>      * Get default trigger of global window
>      * @return
>      */
>     public Trigger<Object, GlobalWindow> getDefaultGlobalWindowTrigger() {
> return defaultGlobalWindowTrigger; }
>
>     /**
>      * Set default trigger of global window
>      * @param defaultGlobalWindowTrigger
>      */
>     public void setDefaultGlobalWindowTrigger(Trigger<Object,
> GlobalWindow>
> defaultGlobalWindowTrigger) {
>         this.defaultGlobalWindowTrigger = defaultGlobalWindowTrigger;
>     }
>
> *TumblingEventTimeWindows/ TumblingProcessingTimeWindows/…*
>        @Override
>     public Trigger<Object, TimeWindow>
> getDefaultTrigger(StreamExecutionEnvironment env) {
>         // Get default trigger from StreamExecutionEnvironment
>         Trigger<Object, TimeWindow> defaultTrigger =
> env.getDefaultTimeWindowTrigger();
>         if (defaultTrigger != null) {
>             return defaultTrigger;
>         }
>         return EventTimeTrigger.create();
> }
>
> *GlobalWindows*:
> @Override
>     public Trigger<Object, GlobalWindow>
> getDefaultTrigger(StreamExecutionEnvironment env) {
>         // Get default trigger from StreamExecutionEnvironment
>         Trigger<Object, GlobalWindow> defaultTrigger =
> env.getDefaultGlobalWindowTrigger();
>         if (defaultTrigger != null) {
>             return defaultTrigger;
>         }
>
>         return new NeverTrigger();
>     }
>
>
> *Look forward to your comments. I would really appreciate taking the time
> to
> help me think about this.*
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to