Re:会话窗口关闭时间的问题

2019-04-29 文章 邵志鹏
您好,下面是个人理解:


首先,这个问题不是Session窗口的问题,滚动和滑动是一样的。


时间窗口的计算输出是由时间特性确定的,目前
1. 只有processing-time即处理时间(没有水位线处理乱序)能够满足及时输出窗口的结果。
2. 
把eventtime的水位线时间戳设置为System.currentTimeMillis()也是可以及时输出窗口的结果,但是只会消费Flink程序启动后接收到的新消息,之前的消息是处理不到的,即便是新的消费者组group.id和earliest也无效【意思就是容错和重播失效,当然还可以再反复验证】。


目前EventTime-事件时间做到实时正确性的前提:数据的事件时间间隔小,或者小于窗口时间间隔就可以了,保证数据流不中断,这样就把不及时输出窗口的时间点无限推到无穷大的未来,即程序最终崩溃或者下线那一刻。


水位线是用来处理事件乱序的,水位线的增长依赖数据的输入,这个是很明显的咯,assignTimestampsAndWatermarks的时候根据事件时间推算的嘛,而且还会减掉一点时间,就是多掳一点数据,所以数据中断了,就是水位线停止增长了。


然后再来看,事件时间窗口默认使用的窗口触发源码:
onElement和onEventTime时才有机会TriggerResult.FIRE;
onElement时会判断水位线。


onEventTime时会根据水位线设置的时间戳定时器进行时间比较。
onEventTime往上找会找到InternalTimerServiceImpl#advanceWatermark 
再往上找会到AbstractStreamOperator#processWatermark,
也就是和新的数据进来有关。


结论就是,如果当前事件时间窗口的end时间还没到,然而水位线是小于这个end时间的,如果处理乱序的间隔比较大,甚至会有多个窗口的end时间都大于最近的水位线时间戳,那不就是把窗口往后退了嘛...只有更后面的数据到来,新的水位线增长上去,前面滞留的窗口数据才有机会输出。


所以我的想法是,在每一个时间窗口上面加上一个判断,只要当前窗口未关闭未触发,窗口的end时间大于或等于自然时间点就触发【保证只触发一次就好】,不需要等到下一次水位线增长。


另外,目前的事件时间是符合自然的实时流数据语义的,可是,业务数据有时候间隔还是蛮大的,毕竟有一些阶段数据比较密集,有一些阶段数据比较稀疏。


以上为个人理解,也遇到同样的问题,甚至认为事件时间在Flink这里毫无意义,如有哪里不对的地方,做梦都想肯定是哪里不对,欢迎讨论,如果真的不对,希望能给出正确的demo,这样就可以完美的用于生产了。


还有就是我默认为,窗口是根据事件已经确定好了的:
时间窗口的生成:


模板方法-处理水位线:AbstractStreamOperator#processWatermark


InternalTimerServiceImpl#advanceWatermark


默认的事件时间触发器:





在 2019-04-29 18:06:30,by1507...@buaa.edu.cn 写道:


各位大神,你们好:

   
最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束时发送这个数据,应该如何处理?万分感激

 

// 这里配置了kafka的信息,并进行数据流的输入

 

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

  FlinkKafkaConsumer010 kafkaSource = new 
FlinkKafkaConsumer010<>("rfid-input-topic",

new RfidRawDataSchema(), props);

  kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());

 

  DataStream dataStream = env.addSource(kafkaSource);

 

  // 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写的是窗口融合的方法

  DataStream outputStream = dataStream.keyBy("uniqueId")

  .window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new

  RfidReduceFunction());

  

  //通过kafka数据流的输出

outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new 
RfidRawDataSchema(), props));

 

  try {

 env.execute("Flink add data source");

  } catch (Exception e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

  }

会话窗口关闭时间的问题

2019-04-29 文章 by1507118
各位大神,你们好:

   最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束
窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束
时发送这个数据,应该如何处理?万分感激

 

// 这里配置了kafka的信息,并进行数据流的输入

 

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

  FlinkKafkaConsumer010 kafkaSource = new
FlinkKafkaConsumer010<>("rfid-input-topic",

new RfidRawDataSchema(), props);

  kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());

 

  DataStream dataStream = env.addSource(kafkaSource);

 

  // 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写
的是窗口融合的方法

  DataStream outputStream = dataStream.keyBy("uniqueId")

  .window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new

  RfidReduceFunction());

   

  //通过kafka数据流的输出

outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new
RfidRawDataSchema(), props));

 

  try {

 env.execute("Flink add data source");

  } catch (Exception e) {

 // TODO Auto-generated catch block

 e.printStackTrace();

  }