[ https://issues.apache.org/jira/browse/FLINK-21553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17294981#comment-17294981 ]
Andy edited comment on FLINK-21553 at 3/4/21, 4:26 AM: ------------------------------------------------------- [~jark] [~dwysakowicz] [~guoweima] Now when flush buffer to state, `CombineRecordsFunction` only copy window key because window key is reused. However, it forgets to copy record which is also reused. I think it's the root cause of the failure case. For above failed case, there exists `count(distinct distinctKey)` in sql, distinctKey is the UK of MapState. If pushed object directly to state when flush buffer to state, it may be updated after it is pushed into HeapStateBackend because it is a reused object in `AbstractBytesMultiMap`. {code:java} //代码占位符 @Test def testHopWindow_Cube(): Unit = { System.setProperty("org.codehaus.janino.source_debugging.enable", "true") System.setProperty("org.codehaus.janino.source_debugging.dir", "/Users/zhangjing/IdeaProjects/flink/flink-table/flink-table-planner-blink/src/main/java") val inputData: Seq[Row] = List( row("2020-10-10 00:00:01", "Hi", "a"), row("2020-10-10 00:00:03", "Comment#1", "a"), row("2020-10-10 00:00:04", null, "a"), row("2020-10-10 00:00:07", "Hello", "b"), row("2020-10-10 00:00:06", "Hi", "b"), // out of order row("2020-10-10 00:00:08", "Comment#2", "a") ) val dataId = TestValuesTableFactory.registerData(inputData) tEnv.executeSql( s""" |CREATE TABLE T2 ( | `ts` STRING, | `string` STRING, | `name` STRING, | `rowtime` AS TO_TIMESTAMP(`ts`), | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'failing-source' = 'true' |) |""".stripMargin) val sql = """ |SELECT | GROUPING_ID(`name`), | `name`, | window_start, | window_end, | COUNT(DISTINCT `string`) |FROM TABLE( | HOP(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND)) |GROUP BY CUBE(`name`), window_start, window_end """.stripMargin val sink = new TestingAppendSink tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) env.execute() val data = Seq( "0,a,2020-10-09T23:59:55,2020-10-10T00:00:05,2", "0,a,2020-10-10T00:00,2020-10-10T00:00:10,3", "0,a,2020-10-10T00:00:05,2020-10-10T00:00:15,1", "0,b,2020-10-10T00:00,2020-10-10T00:00:10,2", "0,b,2020-10-10T00:00:05,2020-10-10T00:00:15,2", "1,null,2020-10-09T23:59:55,2020-10-10T00:00:05,2", "1,null,2020-10-10T00:00,2020-10-10T00:00:10,4", "1,null,2020-10-10T00:00:05,2020-10-10T00:00:15,3" ) assertEquals( data.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) } {code} I simplify the failure case as above case, which could fails frequently (1 per 3~4 times based on *HeapStateBackend + SplitDistinct: false*) on my local machine. The state in HeapStateBackend is following pic1 when test pass, while state are pic2 or pic3 when test failure. !image-2021-03-04-12-05-59-802.png|width=1308,height=292! !image-2021-03-04-12-07-53-566.png|width=1420,height=361! !image-2021-03-04-12-08-07-097.png|width=1297,height=211! After copy record, the case could always be passed. was (Author: qingru zhang): [~jark] [~dwysakowicz] [~guoweima] Now when flush buffer to state, `CombineRecordsFunction` only copy window key because window key is reused. However, it forgets to copy record which is also reused. I think it's the root cause of the failure case. For above failed case, there exists `count(distinct distinctKey)` in sql, distinctKey is the UK of MapState. If pushed object directly to state when flush buffer to state, it may be updated after it is pushed into HeapStateBackend because it is a reused object in `AbstractBytesMultiMap`. {code:java} //代码占位符 @Test def testHopWindow_Cube(): Unit = { System.setProperty("org.codehaus.janino.source_debugging.enable", "true") System.setProperty("org.codehaus.janino.source_debugging.dir", "/Users/zhangjing/IdeaProjects/flink/flink-table/flink-table-planner-blink/src/main/java") val inputData: Seq[Row] = List( row("2020-10-10 00:00:01", "Hi", "a"), row("2020-10-10 00:00:03", "Comment#1", "a"), row("2020-10-10 00:00:04", null, "a"), row("2020-10-10 00:00:07", "Hello", "b"), row("2020-10-10 00:00:06", "Hi", "b"), // out of order row("2020-10-10 00:00:08", "Comment#2", "a") ) val dataId = TestValuesTableFactory.registerData(inputData) tEnv.executeSql( s""" |CREATE TABLE T2 ( | `ts` STRING, | `string` STRING, | `name` STRING, | `rowtime` AS TO_TIMESTAMP(`ts`), | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'failing-source' = 'true' |) |""".stripMargin) val sql = """ |SELECT | GROUPING_ID(`name`), | `name`, | window_start, | window_end, | COUNT(DISTINCT `string`) |FROM TABLE( | HOP(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND)) |GROUP BY CUBE(`name`), window_start, window_end """.stripMargin val sink = new TestingAppendSink tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) env.execute() val data = Seq( "0,a,2020-10-09T23:59:55,2020-10-10T00:00:05,2", "0,a,2020-10-10T00:00,2020-10-10T00:00:10,3", "0,a,2020-10-10T00:00:05,2020-10-10T00:00:15,1", "0,b,2020-10-10T00:00,2020-10-10T00:00:10,2", "0,b,2020-10-10T00:00:05,2020-10-10T00:00:15,2", "1,null,2020-10-09T23:59:55,2020-10-10T00:00:05,2", "1,null,2020-10-10T00:00,2020-10-10T00:00:10,4", "1,null,2020-10-10T00:00:05,2020-10-10T00:00:15,3" ) assertEquals( data.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) } {code} I simplify the failure case as above case, which could fails frequently (1 per 3~4 times based on HeapStateBackend + SplitDistinct: false) on my local machine. The state in HeapStateBackend is following pic1 when test pass, while state are pic2 or pic3 when test failure. !image-2021-03-04-12-05-59-802.png|width=1308,height=292! !image-2021-03-04-12-07-53-566.png|width=1420,height=361! !image-2021-03-04-12-08-07-097.png|width=1297,height=211! After copy record, the case could always be passed. > WindowDistinctAggregateITCase#testHopWindow_Cube is unstable > ------------------------------------------------------------ > > Key: FLINK-21553 > URL: https://issues.apache.org/jira/browse/FLINK-21553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Jark Wu > Assignee: Andy > Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.13.0 > > Attachments: image-2021-03-04-12-05-59-802.png, > image-2021-03-04-12-07-53-566.png, image-2021-03-04-12-08-07-097.png, > screenshot-1.png > > > See > https://dev.azure.com/imjark/Flink/_build/results?buildId=422&view=logs&j=d1352042-8a7d-50b6-3946-a85d176b7981&t=b2322052-d503-5552-81e2-b3a532a1d7e8 > !screenshot-1.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)