Hi Fabian,
Thank you for your response! I think it's not necessary to do that
because i have a call to anyway:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
which do exactly what you say. It set the watermark interval to 200ms .
I think i found the problem and it is the default event-time trigger attached
to the assigner?.
According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html : "*all the event-time window assigners have an EventTimeTrigger as default
trigger. This trigger simply fires once the watermark passes the end of
a window.*" . All i have to do in order to trigger the computation is to send an event which will fall in "next" window.
So the question now is how can i set trigger to fire in regular intervals (e.g.
every 5 seconds) using table API?
On 14.12.2017 17:57, Fabian Hueske wrote:
Hi,
you are using a BoundedOutOfOrdernessTimestampExtractor to generate
watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark
assigner and only generates watermarks if a watermark interval is
configured.
Without watermarks, the query cannot "make progress" and only computes
its result when the program is closed (sources emit a MAX_LONG
watermark when being canceled).
Long story short: you need to configure the watermark interval:
env.getConfig.setAutoWatermarkInterval(100L);
Best, Fabian
2017-12-14 16:30 GMT+01:00 Plamen Paskov
<plamen.pas...@next-stream.com <mailto:plamen.pas...@next-stream.com>>:
Hi,
I'm trying to run the following streaming program in my local
flink 1.3.2 environment. The program compile and run without any
errors but the print() call doesn't display anything. Once i stop
the program i receive all aggregated data. Any ideas how to make
it output regularly or when new data come/old data updated?
package flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import java.sql.Timestamp;
public class StreamingJob {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv =
StreamTableEnvironment.getTableEnvironment(env);
SingleOutputStreamOperator<WC> input = env
.socketTextStream("localhost",9000,"\n")
.map(new MapFunction<String, WC>() {
@Override public WC map(String value)throws Exception {
String[] row = value.split(",");
Timestamp timestamp = Timestamp.valueOf(row[2]);
return new WC(row[0], Long.valueOf(row[1]),
timestamp);
}
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
@Override public long extractTimestamp(WC element) {
return element.dt.getTime();
}
});
tEnv.registerDataStream("WordCount", input,"word, frequency,
dt.rowtime");
Table table = tEnv.scan("WordCount")
.window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
.groupBy("w, word")
.select("word, frequency.sum as frequency, w.start as
dt");DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
result.print();
env.execute();
}
public static class WC {
public Stringword;
public long frequency;
public Timestampdt;
public WC() {
}
public WC(String word,long frequency, Timestamp dt) {
this.word = word;
this.frequency = frequency;
this.dt = dt;
}
@Override public String toString() {
return "WC " +word +" " +frequency +" " +dt.getTime();
}
}
}
Sample input:
hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04
Thanks