你可以了解下触发器,默认的触发器是按照你发现的做,如果你要实时输出,可以吧触发器更改为ContinuonsEventTimeTrigger ,然后设置你的时间间隔。
发件人: 刘 文 发送时间: 2019年3月6日 22:55 收件人: user-zh@flink.apache.org 抄送: qcx978132...@gmail.com 主题: Re: Flink 在什么情况下产生乱序问题? ).在验证EventTime 加watermark 处理中,我发现往socket发送的数据,不能及时输出或没有输出 ).验证发现,只有当前发送的数据的 getCurrentWatermark()的时间戳 > TimeWindow + maxOutOfOrderness 时,才会触发结束上一次window ).可是最新的记录是不能及时被处理,或者是不能被处理 ).请问这个问题怎么处理? ------------------------------------------------------------------------------- > 在 2019年3月6日,下午10:29,刘 文 <thinktothi...@yahoo.com.INVALID> 写道: > > 该问题,明白一点了,整理成文档供大家参考 > ——————————————————————————————————————————————————————— > > Flink 1.7.2 业务时间戳分析流式数据源码分析: > https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md> > > > ——————————————————————————————————————————————————————— > > <Flink 1.7.2 业务时间戳分析流式数据源码分析.001.jpeg> > > Flink 1.7.2 业务时间戳分析流式数据源码分析 > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#%E6%BA%90%E7%A0%81>源码 > > https://github.com/opensourceteams/flink-maven-scala > <https://github.com/opensourceteams/flink-maven-scala> > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#%E6%A6%82%E8%BF%B0>概述 > > 由于Flink默认的ProcessTime是按Window收到Source发射过来的数据的时间,来算了,也就是按Flink程序接收的时间来进行计算,但实际业务,处理周期性的数据时,每5分钟内的数据,每1个小时内的数据进行分析,实际是业务源发生的时间来做为实际时间,所以用Flink的EventTime和Watermark来处理这个问题 > 指定Env为EventTime > 调置数据流assignTimestampsAndWatermarks函数,由AssignerWithPeriodicWatermarks中的extractTimestamp()函数提取实际业务时间,getCurrentWatermark得到最新的时间,这个会对每个元素算一次,拿最大的当做计算时间,如果当前时间,大于上一次的时间间隔 > + 这里设置的延时时间,就会结束上一个Window,也就是对这一段时间的Window进行操作 > 本程序以指定业务时间,来做为统计时间 > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#%E7%A8%8B%E5%BA%8F>程序 > > package > com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime > > import java.util.{Date, Properties} > > import com.alibaba.fastjson.JSON > import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil > import org.apache.flink.api.common.serialization.SimpleStringSchema > import org.apache.flink.configuration.Configuration > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction > import org.apache.flink.streaming.api.watermark.Watermark > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.windowing.windows.TimeWindow > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer > import org.apache.flink.util.Collector > > > object SockWordCountRun { > > > > def main(args: Array[String]): Unit = { > > > // get the execution environment > // val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > > > val configuration : Configuration = > ConfigurationUtil.getConfiguration(true) > > val env:StreamExecutionEnvironment = > StreamExecutionEnvironment.createLocalEnvironment(1,configuration) > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > > > import org.apache.flink.streaming.api.scala._ > val dataStream = env.socketTextStream("localhost", 1234, '\n') > > // .setParallelism(3) > > > dataStream.assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[String] { > > val maxOutOfOrderness = 2 * 1000L // 3.5 seconds > var currentMaxTimestamp: Long = _ > var currentTimestamp: Long = _ > > override def getCurrentWatermark: Watermark = new > Watermark(currentMaxTimestamp - maxOutOfOrderness) > > override def extractTimestamp(element: String, > previousElementTimestamp: Long): Long = { > val jsonObject = JSON.parseObject(element) > > val timestamp = jsonObject.getLongValue("extract_data_time") > currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp) > currentTimestamp = timestamp > > /* println("===========watermark begin===========") > println() > println(new Date(currentMaxTimestamp - 20 * 1000)) > println(jsonObject) > println("===========watermark end===========") > println()*/ > timestamp > } > > }) > .timeWindowAll(Time.seconds(3)) > > .process(new ProcessAllWindowFunction[String,String,TimeWindow]() { > override def process(context: Context, elements: Iterable[String], out: > Collector[String]): Unit = { > > > println() > println("开始提交window") > println(new Date()) > for(e <- elements) out.collect(e) > println("结束提交window") > println(new Date()) > println() > } > }) > > .print() > //.setParallelism(3) > > > > > > > println("==================================以下为执行计划==================================") > println("执行地址(firefox效果更好):https://flink.apache.org/visualizer > <https://flink.apache.org/visualizer>") > //执行计划 > println(env.getStreamGraph.getStreamingPlanAsJSON) > println("==================================以上为执行计划 > JSON串==================================\n") > > > env.execute("Socket 水印作业") > > > > > > > println("结束") > > } > > > // Data type for words with count > case class WordWithCount(word: String, count: Long){ > //override def toString: String = Thread.currentThread().getName + word + > " : " + count > } > > > def getConfiguration(isDebug:Boolean = false):Configuration = { > > val configuration : Configuration = new Configuration() > > if(isDebug){ > val timeout = "100000 s" > val timeoutHeartbeatPause = "1000000 s" > configuration.setString("akka.ask.timeout",timeout) > configuration.setString("akka.lookup.timeout",timeout) > configuration.setString("akka.tcp.timeout",timeout) > configuration.setString("akka.transport.heartbeat.interval",timeout) > > configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) > configuration.setString("akka.watch.heartbeat.pause",timeout) > configuration.setInteger("heartbeat.interval",10000000) > configuration.setInteger("heartbeat.timeout",50000000) > } > > > configuration > } > > > } > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#%E8%BE%93%E5%85%A5%E6%95%B0%E6%8D%AE>输入数据 > > 下一次输入数据,只要比上一次输入数据,大5秒,就可以输出上一window数据,时间以data_time > 字段为准,转成字符串格式为extract_data_time > nc -lk 123 来输入数据 > {"no":0,"extract_data_time":"2019-03-06 > 11:40:38","data_time":1551843638000,"message":"0 2019-03-06 > 11:40:38","send_data_string":"2019-03-06 21:03:43"} > {"no":1,"extract_data_time":"2019-03-06 > 11:40:39","data_time":1551843639000,"message":"1 2019-03-06 > 11:40:39","send_data_string":"2019-03-06 21:03:43"} > {"no":2,"extract_data_time":"2019-03-06 > 11:40:40","data_time":1551843640000,"message":"2 2019-03-06 > 11:40:40","send_data_string":"2019-03-06 21:03:43"} > {"no":3,"extract_data_time":"2019-03-06 > 11:40:41","data_time":1551843641000,"message":"3 2019-03-06 > 11:40:41","send_data_string":"2019-03-06 21:03:43"} > {"no":4,"extract_data_time":"2019-03-06 > 11:40:42","data_time":1551843642000,"message":"4 2019-03-06 > 11:40:42","send_data_string":"2019-03-06 21:03:43"} > {"no":5,"extract_data_time":"2019-03-06 > 11:40:43","data_time":1551843643000,"message":"5 2019-03-06 > 11:40:43","send_data_string":"2019-03-06 21:03:43"} > {"no":6,"extract_data_time":"2019-03-06 > 11:40:44","data_time":1551843644000,"message":"6 2019-03-06 > 11:40:44","send_data_string":"2019-03-06 21:03:43"} > {"no":7,"extract_data_time":"2019-03-06 > 11:40:45","data_time":1551843645000,"message":"7 2019-03-06 > 11:40:45","send_data_string":"2019-03-06 21:03:43"} > {"no":8,"extract_data_time":"2019-03-06 > 11:40:46","data_time":1551843646000,"message":"8 2019-03-06 > 11:40:46","send_data_string":"2019-03-06 21:03:43"} > {"no":9,"extract_data_time":"2019-03-06 > 11:40:47","data_time":1551843647000,"message":"9 2019-03-06 > 11:40:47","send_data_string":"2019-03-06 21:03:43"} > {"no":10,"extract_data_time":"2019-03-06 > 11:40:48","data_time":1551843648000,"message":"10 2019-03-06 > 11:40:48","send_data_string":"2019-03-06 21:03:43"} > {"no":11,"extract_data_time":"2019-03-06 > 11:40:49","data_time":1551843649000,"message":"11 2019-03-06 > 11:40:49","send_data_string":"2019-03-06 21:03:43"} > {"no":12,"extract_data_time":"2019-03-06 > 11:40:50","data_time":1551843650000,"message":"12 2019-03-06 > 11:40:50","send_data_string":"2019-03-06 21:03:43"} > {"no":13,"extract_data_time":"2019-03-06 > 11:40:51","data_time":1551843651000,"message":"13 2019-03-06 > 11:40:51","send_data_string":"2019-03-06 21:03:43"} > {"no":14,"extract_data_time":"2019-03-06 > 11:40:52","data_time":1551843652000,"message":"14 2019-03-06 > 11:40:52","send_data_string":"2019-03-06 21:03:43"} > {"no":15,"extract_data_time":"2019-03-06 > 11:40:53","data_time":1551843653000,"message":"15 2019-03-06 > 11:40:53","send_data_string":"2019-03-06 21:03:43"} > {"no":16,"extract_data_time":"2019-03-06 > 11:40:54","data_time":1551843654000,"message":"16 2019-03-06 > 11:40:54","send_data_string":"2019-03-06 21:03:43"} > {"no":17,"extract_data_time":"2019-03-06 > 11:40:55","data_time":1551843655000,"message":"17 2019-03-06 > 11:40:55","send_data_string":"2019-03-06 21:03:43"} > {"no":18,"extract_data_time":"2019-03-06 > 11:40:56","data_time":1551843656000,"message":"18 2019-03-06 > 11:40:56","send_data_string":"2019-03-06 21:03:43"} > {"no":19,"extract_data_time":"2019-03-06 > 11:40:57","data_time":1551843657000,"message":"19 2019-03-06 > 11:40:57","send_data_string":"2019-03-06 21:03:43"} > {"no":20,"extract_data_time":"2019-03-06 > 11:40:58","data_time":1551843658000,"message":"20 2019-03-06 > 11:40:58","send_data_string":"2019-03-06 21:03:43"} > {"no":21,"extract_data_time":"2019-03-06 > 11:40:59","data_time":1551843659000,"message":"21 2019-03-06 > 11:40:59","send_data_string":"2019-03-06 21:03:43"} > {"no":22,"extract_data_time":"2019-03-06 > 11:41:00","data_time":1551843660000,"message":"22 2019-03-06 > 11:41:00","send_data_string":"2019-03-06 21:03:43"} > {"no":23,"extract_data_time":"2019-03-06 > 11:41:01","data_time":1551843661000,"message":"23 2019-03-06 > 11:41:01","send_data_string":"2019-03-06 21:03:43"} > {"no":24,"extract_data_time":"2019-03-06 > 11:41:02","data_time":1551843662000,"message":"24 2019-03-06 > 11:41:02","send_data_string":"2019-03-06 21:03:43"} > {"no":25,"extract_data_time":"2019-03-06 > 11:41:03","data_time":1551843663000,"message":"25 2019-03-06 > 11:41:03","send_data_string":"2019-03-06 21:03:43"} > {"no":26,"extract_data_time":"2019-03-06 > 11:41:04","data_time":1551843664000,"message":"26 2019-03-06 > 11:41:04","send_data_string":"2019-03-06 21:03:43"} > {"no":27,"extract_data_time":"2019-03-06 > 11:41:05","data_time":1551843665000,"message":"27 2019-03-06 > 11:41:05","send_data_string":"2019-03-06 21:03:43"} > {"no":28,"extract_data_time":"2019-03-06 > 11:41:06","data_time":1551843666000,"message":"28 2019-03-06 > 11:41:06","send_data_string":"2019-03-06 21:03:43"} > {"no":29,"extract_data_time":"2019-03-06 > 11:41:07","data_time":1551843667000,"message":"29 2019-03-06 > 11:41:07","send_data_string":"2019-03-06 21:03:43"} > {"no":30,"extract_data_time":"2019-03-06 > 11:41:08","data_time":1551843668000,"message":"30 2019-03-06 > 11:41:08","send_data_string":"2019-03-06 21:03:43"} > {"no":31,"extract_data_time":"2019-03-06 > 11:41:09","data_time":1551843669000,"message":"31 2019-03-06 > 11:41:09","send_data_string":"2019-03-06 21:03:43"} > {"no":32,"extract_data_time":"2019-03-06 > 11:41:10","data_time":1551843670000,"message":"32 2019-03-06 > 11:41:10","send_data_string":"2019-03-06 21:03:43"} > {"no":33,"extract_data_time":"2019-03-06 > 11:41:11","data_time":1551843671000,"message":"33 2019-03-06 > 11:41:11","send_data_string":"2019-03-06 21:03:43"} > {"no":34,"extract_data_time":"2019-03-06 > 11:41:12","data_time":1551843672000,"message":"34 2019-03-06 > 11:41:12","send_data_string":"2019-03-06 21:03:43"} > {"no":35,"extract_data_time":"2019-03-06 > 11:41:13","data_time":1551843673000,"message":"35 2019-03-06 > 11:41:13","send_data_string":"2019-03-06 21:03:43"} > {"no":36,"extract_data_time":"2019-03-06 > 11:41:14","data_time":1551843674000,"message":"36 2019-03-06 > 11:41:14","send_data_string":"2019-03-06 21:03:43"} > {"no":37,"extract_data_time":"2019-03-06 > 11:41:15","data_time":1551843675000,"message":"37 2019-03-06 > 11:41:15","send_data_string":"2019-03-06 21:03:43"} > {"no":38,"extract_data_time":"2019-03-06 > 11:41:16","data_time":1551843676000,"message":"38 2019-03-06 > 11:41:16","send_data_string":"2019-03-06 21:03:43"} > {"no":39,"extract_data_time":"2019-03-06 > 11:41:17","data_time":1551843677000,"message":"39 2019-03-06 > 11:41:17","send_data_string":"2019-03-06 21:03:43"} > {"no":40,"extract_data_time":"2019-03-06 > 11:41:18","data_time":1551843678000,"message":"40 2019-03-06 > 11:41:18","send_data_string":"2019-03-06 21:03:43"} > {"no":41,"extract_data_time":"2019-03-06 > 11:41:19","data_time":1551843679000,"message":"41 2019-03-06 > 11:41:19","send_data_string":"2019-03-06 21:03:43"} > {"no":42,"extract_data_time":"2019-03-06 > 11:41:20","data_time":1551843680000,"message":"42 2019-03-06 > 11:41:20","send_data_string":"2019-03-06 21:03:43"} > {"no":43,"extract_data_time":"2019-03-06 > 11:41:21","data_time":1551843681000,"message":"43 2019-03-06 > 11:41:21","send_data_string":"2019-03-06 21:03:43"} > {"no":44,"extract_data_time":"2019-03-06 > 11:41:22","data_time":1551843682000,"message":"44 2019-03-06 > 11:41:22","send_data_string":"2019-03-06 21:03:43"} > {"no":45,"extract_data_time":"2019-03-06 > 11:41:23","data_time":1551843683000,"message":"45 2019-03-06 > 11:41:23","send_data_string":"2019-03-06 21:03:43"} > {"no":46,"extract_data_time":"2019-03-06 > 11:41:24","data_time":1551843684000,"message":"46 2019-03-06 > 11:41:24","send_data_string":"2019-03-06 21:03:43"} > {"no":47,"extract_data_time":"2019-03-06 > 11:41:25","data_time":1551843685000,"message":"47 2019-03-06 > 11:41:25","send_data_string":"2019-03-06 21:03:43"} > {"no":48,"extract_data_time":"2019-03-06 > 11:41:26","data_time":1551843686000,"message":"48 2019-03-06 > 11:41:26","send_data_string":"2019-03-06 21:03:43"} > {"no":49,"extract_data_time":"2019-03-06 > 11:41:27","data_time":1551843687000,"message":"49 2019-03-06 > 11:41:27","send_data_string":"2019-03-06 21:03:43"} > {"no":50,"extract_data_time":"2019-03-06 > 11:41:28","data_time":1551843688000,"message":"50 2019-03-06 > 11:41:28","send_data_string":"2019-03-06 21:03:43"} > {"no":51,"extract_data_time":"2019-03-06 > 11:41:29","data_time":1551843689000,"message":"51 2019-03-06 > 11:41:29","send_data_string":"2019-03-06 21:03:43"} > {"no":52,"extract_data_time":"2019-03-06 > 11:41:30","data_time":1551843690000,"message":"52 2019-03-06 > 11:41:30","send_data_string":"2019-03-06 21:03:43"} > {"no":53,"extract_data_time":"2019-03-06 > 11:41:31","data_time":1551843691000,"message":"53 2019-03-06 > 11:41:31","send_data_string":"2019-03-06 21:03:43"} > {"no":54,"extract_data_time":"2019-03-06 > 11:41:32","data_time":1551843692000,"message":"54 2019-03-06 > 11:41:32","send_data_string":"2019-03-06 21:03:43"} > {"no":55,"extract_data_time":"2019-03-06 > 11:41:33","data_time":1551843693000,"message":"55 2019-03-06 > 11:41:33","send_data_string":"2019-03-06 21:03:43"} > {"no":56,"extract_data_time":"2019-03-06 > 11:41:34","data_time":1551843694000,"message":"56 2019-03-06 > 11:41:34","send_data_string":"2019-03-06 21:03:43"} > {"no":57,"extract_data_time":"2019-03-06 > 11:41:35","data_time":1551843695000,"message":"57 2019-03-06 > 11:41:35","send_data_string":"2019-03-06 21:03:43"} > {"no":58,"extract_data_time":"2019-03-06 > 11:41:36","data_time":1551843696000,"message":"58 2019-03-06 > 11:41:36","send_data_string":"2019-03-06 21:03:43"} > {"no":59,"extract_data_time":"2019-03-06 > 11:41:37","data_time":1551843697000,"message":"59 2019-03-06 > 11:41:37","send_data_string":"2019-03-06 21:03:43"} > {"no":60,"extract_data_time":"2019-03-06 > 11:41:38","data_time":1551843698000,"message":"60 2019-03-06 > 11:41:38","send_data_string":"2019-03-06 21:03:43"} > {"no":61,"extract_data_time":"2019-03-06 > 11:41:39","data_time":1551843699000,"message":"61 2019-03-06 > 11:41:39","send_data_string":"2019-03-06 21:03:43"} > {"no":62,"extract_data_time":"2019-03-06 > 11:41:40","data_time":1551843700000,"message":"62 2019-03-06 > 11:41:40","send_data_string":"2019-03-06 21:03:43"} > {"no":63,"extract_data_time":"2019-03-06 > 11:41:41","data_time":1551843701000,"message":"63 2019-03-06 > 11:41:41","send_data_string":"2019-03-06 21:03:43"} > {"no":64,"extract_data_time":"2019-03-06 > 11:41:42","data_time":1551843702000,"message":"64 2019-03-06 > 11:41:42","send_data_string":"2019-03-06 21:03:43"} > {"no":65,"extract_data_time":"2019-03-06 > 11:41:43","data_time":1551843703000,"message":"65 2019-03-06 > 11:41:43","send_data_string":"2019-03-06 21:03:43"} > {"no":66,"extract_data_time":"2019-03-06 > 11:41:44","data_time":1551843704000,"message":"66 2019-03-06 > 11:41:44","send_data_string":"2019-03-06 21:03:43"} > {"no":67,"extract_data_time":"2019-03-06 > 11:41:45","data_time":1551843705000,"message":"67 2019-03-06 > 11:41:45","send_data_string":"2019-03-06 21:03:43"} > {"no":68,"extract_data_time":"2019-03-06 > 11:41:46","data_time":1551843706000,"message":"68 2019-03-06 > 11:41:46","send_data_string":"2019-03-06 21:03:43"} > {"no":69,"extract_data_time":"2019-03-06 > 11:41:47","data_time":1551843707000,"message":"69 2019-03-06 > 11:41:47","send_data_string":"2019-03-06 21:03:43"} > {"no":70,"extract_data_time":"2019-03-06 > 11:41:48","data_time":1551843708000,"message":"70 2019-03-06 > 11:41:48","send_data_string":"2019-03-06 21:03:43"} > {"no":71,"extract_data_time":"2019-03-06 > 11:41:49","data_time":1551843709000,"message":"71 2019-03-06 > 11:41:49","send_data_string":"2019-03-06 21:03:43"} > {"no":72,"extract_data_time":"2019-03-06 > 11:41:50","data_time":1551843710000,"message":"72 2019-03-06 > 11:41:50","send_data_string":"2019-03-06 21:03:43"} > {"no":73,"extract_data_time":"2019-03-06 > 11:41:51","data_time":1551843711000,"message":"73 2019-03-06 > 11:41:51","send_data_string":"2019-03-06 21:03:43"} > {"no":74,"extract_data_time":"2019-03-06 > 11:41:52","data_time":1551843712000,"message":"74 2019-03-06 > 11:41:52","send_data_string":"2019-03-06 21:03:43"} > {"no":75,"extract_data_time":"2019-03-06 > 11:41:53","data_time":1551843713000,"message":"75 2019-03-06 > 11:41:53","send_data_string":"2019-03-06 21:03:43"} > {"no":76,"extract_data_time":"2019-03-06 > 11:41:54","data_time":1551843714000,"message":"76 2019-03-06 > 11:41:54","send_data_string":"2019-03-06 21:03:43"} > {"no":77,"extract_data_time":"2019-03-06 > 11:41:55","data_time":1551843715000,"message":"77 2019-03-06 > 11:41:55","send_data_string":"2019-03-06 21:03:43"} > {"no":78,"extract_data_time":"2019-03-06 > 11:41:56","data_time":1551843716000,"message":"78 2019-03-06 > 11:41:56","send_data_string":"2019-03-06 21:03:43"} > {"no":79,"extract_data_time":"2019-03-06 > 11:41:57","data_time":1551843717000,"message":"79 2019-03-06 > 11:41:57","send_data_string":"2019-03-06 21:03:43"} > {"no":80,"extract_data_time":"2019-03-06 > 11:41:58","data_time":1551843718000,"message":"80 2019-03-06 > 11:41:58","send_data_string":"2019-03-06 21:03:43"} > {"no":81,"extract_data_time":"2019-03-06 > 11:41:59","data_time":1551843719000,"message":"81 2019-03-06 > 11:41:59","send_data_string":"2019-03-06 21:03:43"} > {"no":82,"extract_data_time":"2019-03-06 > 11:42:00","data_time":1551843720000,"message":"82 2019-03-06 > 11:42:00","send_data_string":"2019-03-06 21:03:43"} > {"no":83,"extract_data_time":"2019-03-06 > 11:42:01","data_time":1551843721000,"message":"83 2019-03-06 > 11:42:01","send_data_string":"2019-03-06 21:03:43"} > {"no":84,"extract_data_time":"2019-03-06 > 11:42:02","data_time":1551843722000,"message":"84 2019-03-06 > 11:42:02","send_data_string":"2019-03-06 21:03:43"} > {"no":85,"extract_data_time":"2019-03-06 > 11:42:03","data_time":1551843723000,"message":"85 2019-03-06 > 11:42:03","send_data_string":"2019-03-06 21:03:43"} > {"no":86,"extract_data_time":"2019-03-06 > 11:42:04","data_time":1551843724000,"message":"86 2019-03-06 > 11:42:04","send_data_string":"2019-03-06 21:03:43"} > {"no":87,"extract_data_time":"2019-03-06 > 11:42:05","data_time":1551843725000,"message":"87 2019-03-06 > 11:42:05","send_data_string":"2019-03-06 21:03:43"} > {"no":88,"extract_data_time":"2019-03-06 > 11:42:06","data_time":1551843726000,"message":"88 2019-03-06 > 11:42:06","send_data_string":"2019-03-06 21:03:43"} > {"no":89,"extract_data_time":"2019-03-06 > 11:42:07","data_time":1551843727000,"message":"89 2019-03-06 > 11:42:07","send_data_string":"2019-03-06 21:03:43"} > {"no":90,"extract_data_time":"2019-03-06 > 11:42:08","data_time":1551843728000,"message":"90 2019-03-06 > 11:42:08","send_data_string":"2019-03-06 21:03:43"} > {"no":91,"extract_data_time":"2019-03-06 > 11:42:09","data_time":1551843729000,"message":"91 2019-03-06 > 11:42:09","send_data_string":"2019-03-06 21:03:43"} > {"no":92,"extract_data_time":"2019-03-06 > 11:42:10","data_time":1551843730000,"message":"92 2019-03-06 > 11:42:10","send_data_string":"2019-03-06 21:03:43"} > {"no":93,"extract_data_time":"2019-03-06 > 11:42:11","data_time":1551843731000,"message":"93 2019-03-06 > 11:42:11","send_data_string":"2019-03-06 21:03:43"} > {"no":94,"extract_data_time":"2019-03-06 > 11:42:12","data_time":1551843732000,"message":"94 2019-03-06 > 11:42:12","send_data_string":"2019-03-06 21:03:43"} > {"no":95,"extract_data_time":"2019-03-06 > 11:42:13","data_time":1551843733000,"message":"95 2019-03-06 > 11:42:13","send_data_string":"2019-03-06 21:03:43"} > {"no":96,"extract_data_time":"2019-03-06 > 11:42:14","data_time":1551843734000,"message":"96 2019-03-06 > 11:42:14","send_data_string":"2019-03-06 21:03:43"} > {"no":97,"extract_data_time":"2019-03-06 > 11:42:15","data_time":1551843735000,"message":"97 2019-03-06 > 11:42:15","send_data_string":"2019-03-06 21:03:43"} > {"no":98,"extract_data_time":"2019-03-06 > 11:42:16","data_time":1551843736000,"message":"98 2019-03-06 > 11:42:16","send_data_string":"2019-03-06 21:03:43"} > {"no":99,"extract_data_time":"2019-03-06 > 11:42:17","data_time":1551843737000,"message":"99 2019-03-06 > 11:42:17","send_data_string":"2019-03-06 21:03:43"} > {"no":100,"extract_data_time":"2019-03-06 > 11:42:18","data_time":1551843738000,"message":"100 2019-03-06 > 11:42:18","send_data_string":"2019-03-06 21:03:43"} > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#executiongraphscheduleeager>ExecutionGraph.scheduleEager > > 第一部分: 主是要source,分为两小步,第一步,通过socket读取数据,第二步,通过时间戳/水印处理source > (Source: Socket Stream -> Timestamps/Watermarks (1/1)) > 第二部分,分为两部分,第一步部TriggerWindow,调用ProcessAllWindowFunction函数,处理当前window元素,再到第二部Sink > (TriggerWindow(TumblingEventTimeWindows(100000000000), > ListStateDescriptor{name=window-contents, defaultValue=null, > serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@1e4a7dd4}, > EventTimeTrigger(), AllWindowedStream.process(AllWindowedStream.scala:593)) > -> Sink: Print to Std. Out (1/1)) > 详情 > 0 = {Execution@5366} "Attempt #0 (Source: Socket Stream -> > Timestamps/Watermarks (1/1)) @ > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@5987f5d2 - > [SCHEDULED]" > 1 = {Execution@5371} "Attempt #0 > (TriggerWindow(TumblingEventTimeWindows(100000000000), > ListStateDescriptor{name=window-contents, defaultValue=null, > serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@1e4a7dd4}, > EventTimeTrigger(), AllWindowedStream.process(AllWindowedStream.scala:593)) > -> Sink: Print to Std. Out (1/1)) @ > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4cb70b36 - > [SCHEDULED]" > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#source-task>Source > (Task) > > operatorChain.allOperators > Source > 任务,分两个operator,第一个是StreamSource,读取Socket中的数据,第二个是TimestampsAndPeriodicWatermarksOperator,处理Source中的时间戳问题 > 0 = {TimestampsAndPeriodicWatermarksOperator@7820} > 1 = {StreamSource@7785} > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#sourceoperator-streamsource>Source(operator > StreamSource) > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#sourcestreamtaskrun>SourceStreamTask.run > > 调用StreamSource.run()函数 > protected void run() throws Exception { > headOperator.run(getCheckpointLock(), > getStreamStatusMaintainer()); > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#%E8%B0%83%E7%94%A8streamsourcerun>调用StreamSource.run() > > public void run(final Object lockingObject, final > StreamStatusMaintainer streamStatusMaintainer) throws Exception { > run(lockingObject, streamStatusMaintainer, output); > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#%E8%B0%83%E7%94%A8streamsourcerun-1>调用StreamSource.run() > > 调用userFunction.run()函数,调用为SocketTextStreamFunction.run()函数 > public void run(final Object lockingObject, > final StreamStatusMaintainer streamStatusMaintainer, > final Output<StreamRecord<OUT>> collector) throws > Exception { > > final TimeCharacteristic timeCharacteristic = > getOperatorConfig().getTimeCharacteristic(); > > final Configuration configuration = > this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(); > final long latencyTrackingInterval = > getExecutionConfig().isLatencyTrackingConfigured() > ? getExecutionConfig().getLatencyTrackingInterval() > : configuration.getLong(MetricOptions.LATENCY_INTERVAL); > > LatencyMarksEmitter<OUT> latencyEmitter = null; > if (latencyTrackingInterval > 0) { > latencyEmitter = new LatencyMarksEmitter<>( > getProcessingTimeService(), > collector, > latencyTrackingInterval, > this.getOperatorID(), > getRuntimeContext().getIndexOfThisSubtask()); > } > > final long watermarkInterval = > getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); > > this.ctx = StreamSourceContexts.getSourceContext( > timeCharacteristic, > getProcessingTimeService(), > lockingObject, > streamStatusMaintainer, > collector, > watermarkInterval, > -1); > > try { > userFunction.run(ctx); > > // if we get here, then the user function either exited > after being done (finite source) > // or the function was canceled or stopped. For the > finite source case, we should emit > // a final watermark that indicates that we reached the > end of event-time > if (!isCanceledOrStopped()) { > ctx.emitWatermark(Watermark.MAX_WATERMARK); > } > } finally { > // make sure that the context is closed in any case > ctx.close(); > if (latencyEmitter != null) { > latencyEmitter.close(); > } > } > } > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#sockettextstreamfunctionrun>SocketTextStreamFunction.run > > 一次读取8kb数据,读取Socket中的数据放到缓存中,再按行处理缓存中的数据 > ctx.collect(record),把从sorcket得以的一行数据作为参数,调用StreamSourceContexts.WatermarkContext.collect > public void run(SourceContext<String> ctx) throws Exception { > final StringBuilder buffer = new StringBuilder(); > long attempt = 0; > > while (isRunning) { > > try (Socket socket = new Socket()) { > currentSocket = socket; > > LOG.info <http://log.info/>("Connecting to > server socket " + hostname + ':' + port); > socket.connect(new InetSocketAddress(hostname, > port), CONNECTION_TIMEOUT_TIME); > try (BufferedReader reader = new > BufferedReader(new InputStreamReader(socket.getInputStream()))) { > > char[] cbuf = new char[8192]; > int bytesRead; > while (isRunning && (bytesRead = > reader.read(cbuf)) != -1) { > buffer.append(cbuf, 0, > bytesRead); > int delimPos; > while (buffer.length() >= > delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) { > String record = > buffer.substring(0, delimPos); > // truncate trailing > carriage return > if > (delimiter.equals("\n") && record.endsWith("\r")) { > record = > record.substring(0, record.length() - 1); > } > ctx.collect(record); > buffer.delete(0, > delimPos + delimiter.length()); > } > } > } > } > > // if we dropped out of this loop due to an EOF, sleep > and retry > if (isRunning) { > attempt++; > if (maxNumRetries == -1 || attempt < > maxNumRetries) { > LOG.warn("Lost connection to server > socket. Retrying in " + delayBetweenRetries + " msecs..."); > Thread.sleep(delayBetweenRetries); > } > else { > // this should probably be here, but > some examples expect simple exists of the stream source > // throw new EOFException("Reached end > of stream and reconnects are not enabled."); > break; > } > } > } > > // collect trailing data > if (buffer.length() > 0) { > ctx.collect(buffer.toString()); > } > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#streamsourcecontextswatermarkcontext>StreamSourceContexts.WatermarkContext > > 调用StreamSourceContexts.ManualWatermarkContext.processAndCollect()函数 > public void collect(T element) { > synchronized (checkpointLock) { > > streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); > > if (nextCheck != null) { > this.failOnNextCheck = false; > } else { > scheduleNextIdleDetectionTask(); > } > > processAndCollect(element); > } > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#streamsourcecontextsmanualwatermarkcontextprocessandcollect>StreamSourceContexts.ManualWatermarkContext.processAndCollect() > > 调用AbstractStreamOperator.CountingOutput.collect > protected void processAndCollect(T element) { > output.collect(reuse.replace(element)); > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#abstractstreamoperatorcountingoutputcollect>AbstractStreamOperator.CountingOutput.collect > > 调用OperatorChain.CopyingChainingOutput.collect > public void collect(StreamRecord<OUT> record) { > numRecordsOut.inc(); > output.collect(record); > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#operatorchaincopyingchainingoutputcollect>OperatorChain.CopyingChainingOutput.collect > > 调用OperatorChain.CopyingChainingOutput.pushToOperator() > public void collect(StreamRecord<T> record) { > if (this.outputTag != null) { > // we are only responsible for emitting to the > main input > return; > } > > pushToOperator(record); > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#sourceoperator-timestampsandperiodicwatermarksoperator>Source(operator > TimestampsAndPeriodicWatermarksOperator) > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#operatorchaincopyingchainingoutputpushtooperator>OperatorChain.CopyingChainingOutput.pushToOperator() > > 这里调用source中的第一个二个operator > TimestampsAndPeriodicWatermarksOperator.processElement() 处理元素 > protected <X> void pushToOperator(StreamRecord<X> record) { > try { > // we know that the given outputTag matches our > OutputTag so the record > // must be of the type that our operator (and > Serializer) expects. > @SuppressWarnings("unchecked") > StreamRecord<T> castRecord = (StreamRecord<T>) > record; > > numRecordsIn.inc(); > StreamRecord<T> copy = > castRecord.copy(serializer.copy(castRecord.getValue())); > operator.setKeyContextElement1(copy); > operator.processElement(copy); > } catch (ClassCastException e) { > if (outputTag != null) { > // Enrich error message > ClassCastException replace = new > ClassCastException( > String.format( > "%s. Failed to push > OutputTag with id '%s' to operator. " + > "This can occur > when multiple OutputTags with different types " + > "but identical > names are being used.", > e.getMessage(), > outputTag.getId())); > > throw new > ExceptionInChainedOperatorException(replace); > } else { > throw new > ExceptionInChainedOperatorException(e); > } > } catch (Exception e) { > throw new > ExceptionInChainedOperatorException(e); > } > > } > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#timestampsandperiodicwatermarksoperatorprocesselement>TimestampsAndPeriodicWatermarksOperator.processElement() > > userFunction指的是 > dataStream.assignTimestampsAndWatermarks(指定的函数),调的为AssignerWithPeriodicWatermarks(),注意这里边两个函数 > extractTimestamp()提取时间戳,getCurrentWatermark()得到当前的水印 > element.replace(element.getValue(), > newTimestamp)给当前元素调置时间戳,时间戳为AssignerWithPeriodicWatermarks.extractTimestamp()得到的值 > 调用AbstractStreamOperator.CountingOutput.collect() > public void processElement(StreamRecord<T> element) throws Exception { > final long newTimestamp = > userFunction.extractTimestamp(element.getValue(), > element.hasTimestamp() ? element.getTimestamp() > : Long.MIN_VALUE); > > output.collect(element.replace(element.getValue(), > newTimestamp)); > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#abstractstreamoperatorcountingoutputcollect-1>AbstractStreamOperator.CountingOutput.collect() > > 调用RecordWriterOutput.collect > 调用RecordWriter.emit()进行按key的hash进行分区(如果没有进行map等操作,就是按行号进行hash分区),并且发送给下游 > public void collect(StreamRecord<OUT> record) { > numRecordsOut.inc(); > output.collect(record); > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#windowtask>Window(Task) > > operatorChain.allOperators > 0 = {StreamSink@6060} > 1 = {WindowOperator@6041} > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#windowwindowoperator>Window(WindowOperator) > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#oneinputstreamtaskrun>OneInputStreamTask.run() > > 处理Window元素,加时间戳/水印,分配window,加Trigger > 调用StreamInputProcessor.processInput()处理 > protected void run() throws Exception { > // cache processor reference on the stack, to make the code > more JIT friendly > final StreamInputProcessor<IN> inputProcessor = > this.inputProcessor; > > while (running && inputProcessor.processInput()) { > // all the work happens in the "processInput" method > } > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#streaminputprocessorprocessinput>StreamInputProcessor.processInput() > > 调用barrierHandler.getNextNonBlocked(),调用BarrierTracker.getNextNonBlocked()处理Source发过来的数据 > StreamElement recordOrMark = deserializationDelegate.getInstance(); > //得到source 发射过来的一条数据,进行反序列化 > 设置key streamOperator.setKeyContextElement1(record); > 调用WindowOperator.processElement(record) 得到这条数据 > streamOperator.processElement(record); > public boolean processInput() throws Exception { > if (isFinished) { > return false; > } > if (numRecordsIn == null) { > try { > numRecordsIn = ((OperatorMetricGroup) > streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); > } catch (Exception e) { > LOG.warn("An exception occurred during the > metrics setup.", e); > numRecordsIn = new SimpleCounter(); > } > } > > while (true) { > if (currentRecordDeserializer != null) { > DeserializationResult result = > currentRecordDeserializer.getNextRecord(deserializationDelegate); > > if (result.isBufferConsumed()) { > > currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); > currentRecordDeserializer = null; > } > > if (result.isFullRecord()) { > StreamElement recordOrMark = > deserializationDelegate.getInstance(); > > if (recordOrMark.isWatermark()) { > // handle watermark > > statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), > currentChannel); > continue; > } else if > (recordOrMark.isStreamStatus()) { > // handle stream status > > statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), > currentChannel); > continue; > } else if > (recordOrMark.isLatencyMarker()) { > // handle latency marker > synchronized (lock) { > > streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); > } > continue; > } else { > // now we can do the actual > processing > StreamRecord<IN> record = > recordOrMark.asRecord(); > synchronized (lock) { > numRecordsIn.inc(); > > streamOperator.setKeyContextElement1(record); > > streamOperator.processElement(record); > } > return true; > } > } > } > > final BufferOrEvent bufferOrEvent = > barrierHandler.getNextNonBlocked(); > if (bufferOrEvent != null) { > if (bufferOrEvent.isBuffer()) { > currentChannel = > bufferOrEvent.getChannelIndex(); > currentRecordDeserializer = > recordDeserializers[currentChannel]; > > currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); > } > else { > // Event received > final AbstractEvent event = > bufferOrEvent.getEvent(); > if (event.getClass() != > EndOfPartitionEvent.class) { > throw new > IOException("Unexpected event: " + event); > } > } > } > else { > isFinished = true; > if (!barrierHandler.isEmpty()) { > throw new > IllegalStateException("Trailing data in checkpoint barrier handler."); > } > return false; > } > } > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#windowoperatorprocesselementrecord>WindowOperator.processElement(record) > > 分配Window,也就是计算Window的开启时间和结束时间,该计算方式,算法,恰好把所有元素都分配到整size的时间段 如size = > 5秒,5000毫秒,也就是时间戳在这些范围内的,就会被分配的最近的一个区间 > > [2019-03-06 20:00:00 -> 2019-03-06 20:00:05) > [2019-03-06 20:00:05 -> 2019-03-06 20:00:10) > [2019-03-06 20:00:15 -> 2019-03-06 20:00:20) > [2019-03-06 20:00:20 -> 2019-03-06 20:00:25) > [2019-03-06 20:00:25 -> 2019-03-06 20:00:30) > ...... > 开始时间(单位:毫秒),此处offset = 0,timestamp = > AssignerWithPeriodicWatermarks.extractTimestamp() 提取的时间戳 > long start = timestamp - (timestamp - offset + windowSize) % windowSize; > 结束时间(单位:毫秒),size 为.timeWindowAll(Time.seconds(5)),5秒即为5000 毫秒 > long end = start + size > final Collection<W> elementWindows = windowAssigner.assignWindows( > element.getValue(), element.getTimestamp(), > windowAssignerContext); > 如果当前元素的时间戳,计算后,结束时间大于该Window的结束时间,就说明划到后面的window去了,也就是这一条数据延时了 > > // drop if the window is already late > if (isWindowLate(window)) { > continue; > } > windowState为HeapListState来存储元素 > windowState.add(element.getValue()); > 调用trigger函数,就是给每个window绑定触发器,当window到了结束时间,就会被触发,不过他不是每个元素触发一次,而是按key,进行了Set去重,每个key会调一次WindowOperator.onProcessingTime()函数 > TriggerResult triggerResult = triggerContext.onElement(element); > > public void processElement(StreamRecord<IN> element) throws Exception { > final Collection<W> elementWindows = > windowAssigner.assignWindows( > element.getValue(), element.getTimestamp(), > windowAssignerContext); > > //if element is handled by none of assigned elementWindows > boolean isSkippedElement = true; > > final K key = this.<K>getKeyedStateBackend().getCurrentKey(); > > if (windowAssigner instanceof MergingWindowAssigner) { > MergingWindowSet<W> mergingWindows = > getMergingWindowSet(); > > for (W window: elementWindows) { > > // adding the new window might result in a > merge, in that case the actualWindow > // is the merged window and we work with that. > If we don't merge then > // actualWindow == window > W actualWindow = > mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() { > @Override > public void merge(W mergeResult, > Collection<W> > mergedWindows, W stateWindowResult, > Collection<W> > mergedStateWindows) throws Exception { > > if > ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + > allowedLateness <= internalTimerService.currentWatermark())) { > throw new > UnsupportedOperationException("The end timestamp of an " + > > "event-time window cannot become earlier than the current watermark " + > "by > merging. Current watermark: " + internalTimerService.currentWatermark() + > " > window: " + mergeResult); > } else if > (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= > internalTimerService.currentProcessingTime()) { > throw new > UnsupportedOperationException("The end timestamp of a " + > > "processing-time window cannot become earlier than the current processing > time " + > "by > merging. Current processing time: " + > internalTimerService.currentProcessingTime() + > " > window: " + mergeResult); > } > > triggerContext.key = key; > triggerContext.window = > mergeResult; > > > triggerContext.onMerge(mergedWindows); > > for (W m: mergedWindows) { > triggerContext.window = > m; > triggerContext.clear(); > deleteCleanupTimer(m); > } > > // merge the merged state > windows into the newly resulting state window > > windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); > } > }); > > // drop if the window is already late > if (isWindowLate(actualWindow)) { > > mergingWindows.retireWindow(actualWindow); > continue; > } > isSkippedElement = false; > > W stateWindow = > mergingWindows.getStateWindow(actualWindow); > if (stateWindow == null) { > throw new IllegalStateException("Window > " + window + " is not in in-flight window set."); > } > > windowState.setCurrentNamespace(stateWindow); > windowState.add(element.getValue()); > > triggerContext.key = key; > triggerContext.window = actualWindow; > > TriggerResult triggerResult = > triggerContext.onElement(element); > > if (triggerResult.isFire()) { > ACC contents = windowState.get(); > if (contents == null) { > continue; > } > emitWindowContents(actualWindow, > contents); > } > > if (triggerResult.isPurge()) { > windowState.clear(); > } > registerCleanupTimer(actualWindow); > } > > // need to make sure to update the merging state in > state > mergingWindows.persist(); > } else { > for (W window: elementWindows) { > > // drop if the window is already late > if (isWindowLate(window)) { > continue; > } > isSkippedElement = false; > > windowState.setCurrentNamespace(window); > windowState.add(element.getValue()); > > triggerContext.key = key; > triggerContext.window = window; > > TriggerResult triggerResult = > triggerContext.onElement(element); > > if (triggerResult.isFire()) { > ACC contents = windowState.get(); > if (contents == null) { > continue; > } > emitWindowContents(window, contents); > } > > if (triggerResult.isPurge()) { > windowState.clear(); > } > registerCleanupTimer(window); > } > } > > // side output input event if > // element not handled by any window > // late arriving tag has been set > // windowAssigner is event time and current timestamp + allowed > lateness no less than element timestamp > if (isSkippedElement && isElementLate(element)) { > if (lateDataOutputTag != null){ > sideOutput(element); > } else { > this.numLateRecordsDropped.inc(); > } > } > } > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#windowoperatoroneventtime>WindowOperator.onEventTime() > > 当自定义程序datastream.assignTimestampsAndWatermarks.AssignerWithPeriodicWatermarks.getCurrentWatermark得到的值,大于当前window的end,说明当前window可以结束了,就会触发调用 > WindowOperator.onEventTime() > triggerResult.isFire()//就会满足条件 > windowState.get();//取出当前window中开始时间到结束时间的,所有元素 > 调用emitWindowContents()函数进行处理 > public void onEventTime(InternalTimer<K, W> timer) throws Exception { > triggerContext.key = timer.getKey(); > triggerContext.window = timer.getNamespace(); > > MergingWindowSet<W> mergingWindows; > > if (windowAssigner instanceof MergingWindowAssigner) { > mergingWindows = getMergingWindowSet(); > W stateWindow = > mergingWindows.getStateWindow(triggerContext.window); > if (stateWindow == null) { > // Timer firing for non-existent window, this > can only happen if a > // trigger did not clean up timers. We have > already cleared the merging > // window and therefore the Trigger state, > however, so nothing to do. > return; > } else { > windowState.setCurrentNamespace(stateWindow); > } > } else { > windowState.setCurrentNamespace(triggerContext.window); > mergingWindows = null; > } > > TriggerResult triggerResult = > triggerContext.onEventTime(timer.getTimestamp()); > > if (triggerResult.isFire()) { > ACC contents = windowState.get(); > if (contents != null) { > emitWindowContents(triggerContext.window, > contents); > } > } > > if (triggerResult.isPurge()) { > windowState.clear(); > } > > if (windowAssigner.isEventTime() && > isCleanupTime(triggerContext.window, timer.getTimestamp())) { > clearAllState(triggerContext.window, windowState, > mergingWindows); > } > > if (mergingWindows != null) { > // need to make sure to update the merging state in > state > mergingWindows.persist(); > } > } > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#windowoperatoremitwindowcontents>WindowOperator.emitWindowContents > > userFunction 调用InternalIterableProcessAllWindowFunction.process() > private void emitWindowContents(W window, ACC contents) throws > Exception { > > timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); > processContext.window = window; > userFunction.process(triggerContext.key, window, > processContext, contents, timestampedCollector); > } > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#internaliterableprocessallwindowfunctionprocess>InternalIterableProcessAllWindowFunction.process > > wrappedFunction.process为用户定义的处理函数 > AllWindowedStream.process()函数,即自己定义的处理window的函数 > input 为当前widow的所有元素 > 调用TimestampedCollector.collect()一条一条元素发送 > public void process(Byte key, final W window, final > InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws > Exception { > this.ctx.window = window; > this.ctx.internalContext = context; > ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = > this.wrappedFunction; > wrappedFunction.process(ctx, input, out); > } > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#abstractstreamoperatorcountingoutput>AbstractStreamOperator.CountingOutput > > 调用CopyingChainingOutput.collect() > public void collect(StreamRecord<OUT> record) { > numRecordsOut.inc(); > output.collect(record); > } > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#windowstreamsink>Window(StreamSink) > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#operatorchaincopyingchainingoutput>OperatorChain.CopyingChainingOutput > > 调用StreamSink.processElement() > protected <X> void pushToOperator(StreamRecord<X> record) { > try { > // we know that the given outputTag matches our > OutputTag so the record > // must be of the type that our operator (and > Serializer) expects. > @SuppressWarnings("unchecked") > StreamRecord<T> castRecord = (StreamRecord<T>) > record; > > numRecordsIn.inc(); > StreamRecord<T> copy = > castRecord.copy(serializer.copy(castRecord.getValue())); > operator.setKeyContextElement1(copy); > operator.processElement(copy); > } catch (ClassCastException e) { > if (outputTag != null) { > // Enrich error message > ClassCastException replace = new > ClassCastException( > String.format( > "%s. Failed to push > OutputTag with id '%s' to operator. " + > "This can occur > when multiple OutputTags with different types " + > "but identical > names are being used.", > e.getMessage(), > outputTag.getId())); > > throw new > ExceptionInChainedOperatorException(replace); > } else { > throw new > ExceptionInChainedOperatorException(e); > } > } catch (Exception e) { > throw new > ExceptionInChainedOperatorException(e); > } > > } > } > > > <https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md#streamsinkprocesselement>StreamSink.processElement > > 调用PrintSinkFuntion.invoke 输出当前元素 > public void processElement(StreamRecord<IN> element) throws Exception { > sinkContext.element = element; > userFunction.invoke(element.getValue(), sinkContext); > } > > > > > > ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- >> 在 2019年3月6日,下午5:57,Congxian Qiu <qcx978132...@gmail.com >> <mailto:qcx978132...@gmail.com>> 写道: >> >> 于 kafka 来说,单 partition 内的消息可以保证顺序,但是 partition A 和 partition B 之间的消息顺序是没法保证的 >