我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
public class EarlyEmitter { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlink StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true); tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1000 ms"); Table table = tEnv.fromDataStream( env.addSource(new SourceData()), "generate_time, name, city, id, event_time.proctime"); tEnv.createTemporaryView("person", table); String emit = "SELECT name, COUNT(DISTINCT id)" + "FROM person " + "GROUP BY TUMBLE(event_time, interval '10' second), name"; Table result = tEnv.sqlQuery(emit); tEnv.toRetractStream(result, Row.class).print(); env.execute("IncrementalGrouping"); } private static final class SourceData implements SourceFunction<Tuple4<Long, String, String, Long>> { @Override public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) throws Exception { while (true) { long time = System.currentTimeMillis(); ctx.collect(Tuple4.of(time, "flink", "bj", 1L)); } } @Override public void cancel() { } } } > 2020年3月27日 下午3:23,Benchao Li <libenc...@gmail.com> 写道: > > Hi, > > 对于第二个场景,可以尝试一下fast emit: > table.exec.emit.early-fire.enabled = true > table.exec.emit.early-fire.delay = 5min > > PS: > 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature > 2. window加了emit之后,会由原来输出append结果变成输出retract结果 > > Jingsong Li <jingsongl...@gmail.com> 于2020年3月27日周五 下午2:51写道: > >> Hi, >> >> For #1: >> 创建级联的两级window: >> - 1分钟窗口 >> - 5分钟窗口,计算只是保存数据,发送明细数据结果 >> >> Best, >> Jingsong Lee >> > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn