Thank you for your response! I think it's not necessary to do that because i have a call to anyway:


which do exactly what you say. It set the watermark interval to 200ms .
I think i found the problem and it is the default event-time trigger attached 
to the assigner?.
According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html : "*all the event-time window assigners have an EventTimeTrigger as default trigger. This trigger simply fires once the watermark passes the end of a window.*" . All i have to do in order to trigger the computation is to send an event which will fall in "next" window.
So the question now is how can i set trigger to fire in regular intervals (e.g. 
every 5 seconds) using table API?

On 14.12.2017 17:57, Fabian Hueske wrote:

you are using a BoundedOutOfOrdernessTimestampExtractor to generate watermarks. The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured. Without watermarks, the query cannot "make progress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled).

Long story short: you need to configure the watermark interval: env.getConfig.setAutoWatermarkInterval(100L);

Best, Fabian

2017-12-14 16:30 GMT+01:00 Plamen Paskov


    I'm trying to run the following streaming program in my local
    flink 1.3.2 environment. The program compile and run without any
    errors but the print() call doesn't display anything. Once i stop
    the program i receive all aggregated data. Any ideas how to make
    it output regularly or when new data come/old data updated?

    package flink;

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.Slide;
    import org.apache.flink.table.api.java.StreamTableEnvironment;

    import java.sql.Timestamp;

    public class StreamingJob {
         public static void main(String[] args)throws Exception {
             StreamExecutionEnvironment env = 
             StreamTableEnvironment tEnv = 

             SingleOutputStreamOperator<WC> input = env
                     .map(new MapFunction<String, WC>() {
                         @Override public WC map(String value)throws Exception {
                             String[] row = value.split(",");
                             Timestamp timestamp = Timestamp.valueOf(row[2]);
                             return new WC(row[0], Long.valueOf(row[1]), 
BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
                         @Override public long extractTimestamp(WC element) {
                             return element.dt.getTime();

             tEnv.registerDataStream("WordCount", input,"word, frequency, 

             Table table = tEnv.scan("WordCount")
                     .groupBy("w, word")
                     .select("word, frequency.sum as frequency, w.start as 
dt");DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);


         public static class WC {
             public Stringword;
             public long frequency;
             public Timestampdt;

             public WC() {

             public WC(String word,long frequency, Timestamp dt) {
                 this.word = word;
                 this.frequency = frequency;
                 this.dt = dt;

             @Override public String toString() {
                 return "WC " +word +" " +frequency +" " +dt.getTime();

    Sample input:

    hello,1,2017-12-14 13:10:01
    ciao,1,2017-12-14 13:10:02
    hello,1,2017-12-14 13:10:03
    hello,1,2017-12-14 13:10:04


