[ https://issues.apache.org/jira/browse/FLINK-5735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546846#comment-16546846 ]
Aljoscha Krettek commented on FLINK-5735: ----------------------------------------- That's what I though. So there is no bug, it's just a problem of the test setup, how the watermark extractor interacts with the timestamps and the ordering of elements. > Non-overlapping sliding window is not deterministic > --------------------------------------------------- > > Key: FLINK-5735 > URL: https://issues.apache.org/jira/browse/FLINK-5735 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Valerii Florov > Priority: Major > > I don't know if this is a problem of the Table API or the underlying API. We > have to investigate this as part of the issue. > The following code leads to different results from time to time. Sometimes > the count of "Hello" is 1 sometimes 2. > {code} > val data = List( > (1L, 1, "Hi"), > (2L, 2, "Hallo"), > (3L, 2, "Hello"), > (6L, 3, "Hello"), > (4L, 5, "Hello"), > (16L, 4, "Hello world"), > (8L, 3, "Hello world")) > @Test > def testEventTimeSlidingWindowNonOverlapping(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.testResults = mutable.MutableList() > val stream = env > .fromCollection(data) > .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) > val table = stream.toTable(tEnv, 'long, 'int, 'string) > val windowedTable = table > .window(Slide over 5.milli every 10.milli on 'rowtime as 'w) > .groupBy('w, 'string) > .select('string, 'int.count, 'w.start, 'w.end) > val results = windowedTable.toDataStream[Row] > results.addSink(new StreamITCase.StringSink) > env.execute() > val expected = Seq( > "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", > "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", > "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") > assertEquals(expected.sorted, StreamITCase.testResults.sorted) > } > class TimestampWithEqualWatermark extends > AssignerWithPunctuatedWatermarks[(Long, Int, String)] { > override def checkAndGetNextWatermark( > lastElement: (Long, Int, String), > extractedTimestamp: Long) > : Watermark = { > new Watermark(extractedTimestamp) > } > override def extractTimestamp( > element: (Long, Int, String), > previousElementTimestamp: Long): Long = { > element._1 > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)