DataStream EventTime last data cannot be output ?
In the verification of EventTime plus watermark processing, I found that the data sent to the socket cannot be output in time or output. ). The verification found that only the timestamp of the current send data of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the end of the last window ). But the latest record can not be processed in time, or can not be processed ). How can I deal with this problem? The following is the Flink program ,Flink 1.7.2 --------------------------------------------------------------------------- package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime import java.util.{Date, Properties} import com.alibaba.fastjson.JSON import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector object SockWordCountRun { def main(args: Array[String]): Unit = { // get the execution environment // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val configuration : Configuration = ConfigurationUtil.getConfiguration(true) val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) import org.apache.flink.streaming.api.scala._ val dataStream = env.socketTextStream("localhost", 1234, '\n') // .setParallelism(3) dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] { val maxOutOfOrderness = 2 * 1000L // 3.5 seconds var currentMaxTimestamp: Long = _ var currentTimestamp: Long = _ override def getCurrentWatermark: Watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness) override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = { val jsonObject = JSON.parseObject(element) val timestamp = jsonObject.getLongValue("extract_data_time") currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp) currentTimestamp = timestamp /* println("===========watermark begin===========") println() println(new Date(currentMaxTimestamp - 20 * 1000)) println(jsonObject) println("===========watermark end===========") println()*/ timestamp } }) .timeWindowAll(Time.seconds(3)) .process(new ProcessAllWindowFunction[String,String,TimeWindow]() { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { println() println("开始提交window") println(new Date()) for(e <- elements) out.collect(e) println("结束提交window") println(new Date()) println() } }) .print() //.setParallelism(3) println("==================================以下为执行计划==================================") println("执行地址(firefox效果更好):https://flink.apache.org/visualizer") //执行计划 println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================以上为执行计划 JSON串==================================\n") env.execute("Socket 水印作业") println("结束") } // Data type for words with count case class WordWithCount(word: String, count: Long){ //override def toString: String = Thread.currentThread().getName + word + " : " + count } def getConfiguration(isDebug:Boolean = false):Configuration = { val configuration : Configuration = new Configuration() if(isDebug){ val timeout = "100000 s" val timeoutHeartbeatPause = "1000000 s" configuration.setString("akka.ask.timeout",timeout) configuration.setString("akka.lookup.timeout",timeout) configuration.setString("akka.tcp.timeout",timeout) configuration.setString("akka.transport.heartbeat.interval",timeout) configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) configuration.setString("akka.watch.heartbeat.pause",timeout) configuration.setInteger("heartbeat.interval",10000000) configuration.setInteger("heartbeat.timeout",50000000) } configuration } } best thinktothings