Cody created FLINK-4215: --------------------------- Summary: timestamp of StreamRecord is lost in WindowOperator Key: FLINK-4215 URL: https://issues.apache.org/jira/browse/FLINK-4215 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.0.3 Reporter: Cody
In a WindowedStream, if the subsequent operator is a WindowOperator(by applying a fold function), the timestamp of StreamRecord will be lost. Here's my test code: --------------------------------------------- def getSmall4TupleDataStreamWithTime(env: StreamExecutionEnvironment): DataStream[(Int, Long, String, String)] = { val data = new mutable.MutableList[(Int, Long, String, String)] data.+=((1, 1L, "Hi", "2016-07-06 14:00:00")) data.+=((2, 2L, "Hello", "2016-07-06 14:01:00")) data.+=((3, 2L, "Hello world", "2016-07-06 14:02:00")) env.fromCollection(data) } @Test def testTimestampInWindowOperator(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = StreamTestData.getSmall4TupleDataStreamWithTime(env).assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks[(Int, Long, String, String)] { override def getCurrentWatermark: Watermark = null override def extractTimestamp(element: (Int, Long, String, String), previousElementTimestamp: Long): Long = { DateFormat.getDateTimeInstance.parse(element._4).getTime } }).keyBy(3).timeWindow(Time.milliseconds(1000)) .fold((0, 0L, "", ""), new FoldFunction[(Int, Long, String, String), (Int, Long, String, String)] { override def fold(v1: (Int, Long, String, String), v2: (Int, Long, String, String)) : (Int, Long, String, String) = { (v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3, v1._4 + v2._4) } }).addSink(new PrintSinkFunction[(Int, Long, String, String)]()) env.execute() } -- This message was sent by Atlassian JIRA (v6.3.4#6332)