我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:

public class EarlyEmitter {
   public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);

      EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlink
      StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
settings);

      
tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled",
 true);
      
tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay",
 "1000 ms");

      Table table = tEnv.fromDataStream(
            env.addSource(new SourceData()), "generate_time, name, city, id, 
event_time.proctime");
      tEnv.createTemporaryView("person", table);

      String emit =
            "SELECT name, COUNT(DISTINCT id)" +
                  "FROM person " +
                  "GROUP BY TUMBLE(event_time, interval '10' second), name";

      Table result = tEnv.sqlQuery(emit);
      tEnv.toRetractStream(result, Row.class).print();

      env.execute("IncrementalGrouping");
   }

   private static final class SourceData implements SourceFunction<Tuple4<Long, 
String, String, Long>> {
      @Override
      public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) 
throws Exception {
         while (true) {
            long time = System.currentTimeMillis();
            ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
         }
      }

      @Override
      public void cancel() {

      }
   }
}




> 2020年3月27日 下午3:23,Benchao Li <libenc...@gmail.com> 写道:
> 
> Hi,
> 
> 对于第二个场景,可以尝试一下fast emit:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 5min
> 
> PS:
> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
> 
> Jingsong Li <jingsongl...@gmail.com> 于2020年3月27日周五 下午2:51写道:
> 
>> Hi,
>> 
>> For #1:
>> 创建级联的两级window:
>> - 1分钟窗口
>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>> 
>> Best,
>> Jingsong Lee
>> 
> 
> 
> -- 
> 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn

回复