Hi Fabian,

Thank you for your response! I think it's not necessary to do that because i have a call to anyway:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

which do exactly what you say. It set the watermark interval to 200ms .
I think i found the problem and it is the default event-time trigger attached 
to the assigner?.
According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html : "*all the event-time window assigners have an EventTimeTrigger as default trigger. This trigger simply fires once the watermark passes the end of a window.*" . All i have to do in order to trigger the computation is to send an event which will fall in "next" window.
So the question now is how can i set trigger to fire in regular intervals (e.g. 
every 5 seconds) using table API?


On 14.12.2017 17:57, Fabian Hueske wrote:
Hi,

you are using a BoundedOutOfOrdernessTimestampExtractor to generate watermarks. The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured. Without watermarks, the query cannot "make progress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled).

Long story short: you need to configure the watermark interval: env.getConfig.setAutoWatermarkInterval(100L);

Best, Fabian

2017-12-14 16:30 GMT+01:00 Plamen Paskov <plamen.pas...@next-stream.com <mailto:plamen.pas...@next-stream.com>>:

    Hi,

    I'm trying to run the following streaming program in my local
    flink 1.3.2 environment. The program compile and run without any
    errors but the print() call doesn't display anything. Once i stop
    the program i receive all aggregated data. Any ideas how to make
    it output regularly or when new data come/old data updated?

    package flink;

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.Slide;
    import org.apache.flink.table.api.java.StreamTableEnvironment;

    import java.sql.Timestamp;


    public class StreamingJob {
         public static void main(String[] args)throws Exception {
             StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
             env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
             StreamTableEnvironment tEnv = 
StreamTableEnvironment.getTableEnvironment(env);


             SingleOutputStreamOperator<WC> input = env
                     .socketTextStream("localhost",9000,"\n")
                     .map(new MapFunction<String, WC>() {
                         @Override public WC map(String value)throws Exception {
                             String[] row = value.split(",");
                             Timestamp timestamp = Timestamp.valueOf(row[2]);
                             return new WC(row[0], Long.valueOf(row[1]), 
timestamp);
                         }
                     })
                     .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
                         @Override public long extractTimestamp(WC element) {
                             return element.dt.getTime();
                         }
                     });


             tEnv.registerDataStream("WordCount", input,"word, frequency, 
dt.rowtime");

             Table table = tEnv.scan("WordCount")
                     
.window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
                     .groupBy("w, word")
                     .select("word, frequency.sum as frequency, w.start as 
dt");DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
             result.print();

             env.execute();
         }

         public static class WC {
             public Stringword;
             public long frequency;
             public Timestampdt;

             public WC() {
             }

             public WC(String word,long frequency, Timestamp dt) {
                 this.word = word;
                 this.frequency = frequency;
                 this.dt = dt;
             }

             @Override public String toString() {
                 return "WC " +word +" " +frequency +" " +dt.getTime();
             }
         }
    }


    Sample input:

    hello,1,2017-12-14 13:10:01
    ciao,1,2017-12-14 13:10:02
    hello,1,2017-12-14 13:10:03
    hello,1,2017-12-14 13:10:04


    Thanks



Reply via email to