Timo Walther created FLINK-5735: ----------------------------------- Summary: Non-overlapping sliding window is not deterministic Key: FLINK-5735 URL: https://issues.apache.org/jira/browse/FLINK-5735 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther
I don't know if this is a problem of the Table API or the underlying API. We have to investigate this as part of the issue. The following code leads to different results from time to time. Sometimes the count of "Hello" is 1 sometimes 2. {code} val data = List( (1L, 1, "Hi"), (2L, 2, "Hallo"), (3L, 2, "Hello"), (6L, 3, "Hello"), (4L, 5, "Hello"), (16L, 4, "Hello world"), (8L, 3, "Hello world")) @Test def testEventTimeSlidingWindowNonOverlapping(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table .window(Slide over 5.milli every 10.milli on 'rowtime as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) val results = windowedTable.toDataStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() val expected = Seq( "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] { override def checkAndGetNextWatermark( lastElement: (Long, Int, String), extractedTimestamp: Long) : Watermark = { new Watermark(extractedTimestamp) } override def extractTimestamp( element: (Long, Int, String), previousElementTimestamp: Long): Long = { element._1 } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)