[ https://issues.apache.org/jira/browse/FLINK-23379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381103#comment-17381103 ]
Jingsong Lee commented on FLINK-23379: -------------------------------------- You can't expect the order of the append stream. Distributed systems will be out of order... > interval left join null value result out of order > -------------------------------------------------- > > Key: FLINK-23379 > URL: https://issues.apache.org/jira/browse/FLINK-23379 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.12.2 > Reporter: waywtdcc > Priority: Major > > * Scenes: > Person main table left interval join associated message information table, > The first record that is not associated with the message information table > will be later than the later record that is associated with the message > information table. > When there are normal output and null value output with the same primary > key, it will be out of order, and the null value output is later than the > normal value output, resulting in incorrect results > enter: > {"id": 1, "name":"chencc2", "message": "good boy2", "ts":"2021-03-26 > 18:56:43"} > {"id": 1, "name":"chencc2", "age": "28", "ts":"2021-03-26 19:02:47"} > {"id": 1, "name":"chencc2", "message": "good boy3", "ts":"2021-03-26 > 19:06:43"} > {"id": 1, "name":"chencc2", "age": "27", "ts":"2021-03-26 19:06:47"} > Output: > +I(chencc2,27,2021-03-26T19:06:47,good boy3,2021-03-26 19:06:43.000) > +I(chencc2,28,2021-03-26T19:02:47,null,null) > The time of the second record here is 19:02 earlier than the first record, > but the output of the result is late, causing data update errors > > * code > {code:java} > tableEnv.executeSql("drop table if exists persons_table_kafka2"); > String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" + > " `id` BIGINT,\n" + > " `name` STRING,\n" + > " `age` INT,\n" + > " proctime as PROCTIME(),\n" + > " `ts` TIMESTAMP(3),\n" + > " WATERMARK FOR ts AS ts\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'persons_test2',\n" + > " 'properties.bootstrap.servers' = 'node2:6667',\n" + > " 'properties.group.id' = 'testGroa115',\n" + > " 'scan.startup.mode' = 'group-offsets',\n" + > " 'format' = 'json'\n" + > ")"; > tableEnv.executeSql(kafka_source_sql); > tableEnv.executeSql("drop table if exists > persons_message_table_kafka2"); > String kafka_source_sql2 = "CREATE TABLE > persons_message_table_kafka2 (\n" + > " `id` BIGINT,\n" + > " `name` STRING,\n" + > " `message` STRING,\n" + > " `ts` TIMESTAMP(3) ," + > // " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" + > " WATERMARK FOR ts AS ts\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'persons_extra_message2',\n" + > " 'properties.bootstrap.servers' = 'node2:6667',\n" + > " 'properties.group.id' = 'testGroud2e313',\n" + > " 'scan.startup.mode' = 'group-offsets',\n" + > " 'format' = 'json'\n" + > ")"; > tableEnv.executeSql(kafka_source_sql2); > tableEnv.executeSql("" + > "CREATE TEMPORARY VIEW result_data_view " + > " as " + > " select " + > " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as > string) as ts2 " + > " from persons_table_kafka2 t1 " + > " left join persons_message_table_kafka2 t2 on t1.name = > t2.name and t1.ts between " + > " t2.ts and t2.ts + INTERVAL '3' MINUTE" > ); > Table resultTable = tableEnv.from("result_data_view"); > DataStream<RowData> rowDataDataStream = > tableEnv.toAppendStream(resultTable, RowData.class); > rowDataDataStream.print(); > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)