[ 
https://issues.apache.org/jira/browse/FLINK-23379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381003#comment-17381003
 ] 

JING ZHANG commented on FLINK-23379:
------------------------------------

[~waywtdcc] You're right, however you could solve your problem by add a 
[deduplicate|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/deduplication]
 based on event time.

Interval join would hold back the watermark, so the padding result for 
mismatched records would not be considered as late event by the following 
operators.

Besides, If we update the behavior of IntervalJoin to send the padding result 
intermediately, the result stream of interval join is a retract stream instead 
of append stream. The modification would restrict the user case of interval 
join because a lot of existed operators could not receive update stream as 
input (e.g Deduplicate/IntervalJoin/OverAgg/WindowAgg/WindowJoin/WindowRank) 
and the rank would be routed to a poor performance implementation.

What do you think?

 

 

>  interval left join null value result out of order
> --------------------------------------------------
>
>                 Key: FLINK-23379
>                 URL: https://issues.apache.org/jira/browse/FLINK-23379
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.2
>            Reporter: waywtdcc
>            Priority: Major
>
> * Scenes:
>  Person main table left interval join associated message information table,
>  The first record that is not associated with the message information table 
> will be later than the later record that is associated with the message 
> information table.
>  When there are normal output and null value output with the same primary 
> key, it will be out of order, and the null value output is later than the 
> normal value output, resulting in incorrect results
> enter:
> {"id": 1, "name":"chencc2", "message": "good boy2", "ts":"2021-03-26 
> 18:56:43"}
> {"id": 1, "name":"chencc2", "age": "28", "ts":"2021-03-26 19:02:47"}
> {"id": 1, "name":"chencc2", "message": "good boy3", "ts":"2021-03-26 
> 19:06:43"}
> {"id": 1, "name":"chencc2", "age": "27", "ts":"2021-03-26 19:06:47"}
> Output:
>  +I(chencc2,27,2021-03-26T19:06:47,good boy3,2021-03-26 19:06:43.000)
>  +I(chencc2,28,2021-03-26T19:02:47,null,null)
>  The time of the second record here is 19:02 earlier than the first record, 
> but the output of the result is late, causing data update errors
>  
>  *  code
> {code:java}
> tableEnv.executeSql("drop table if exists persons_table_kafka2");
>          String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
>                  "  `id` BIGINT,\n" +
>                  "  `name` STRING,\n" +
>                  "  `age` INT,\n" +
>                  "  proctime as PROCTIME(),\n" +
>                  "  `ts` TIMESTAMP(3),\n" +
>                  " WATERMARK FOR ts AS ts\n" +
>                  ") WITH (\n" +
>                  "  'connector' = 'kafka',\n" +
>                  "  'topic' = 'persons_test2',\n" +
>                  "  'properties.bootstrap.servers' = 'node2:6667',\n" +
>                  "  'properties.group.id' = 'testGroa115',\n" +
>                  "  'scan.startup.mode' = 'group-offsets',\n" +
>                  "  'format' = 'json'\n" +
>                  ")";
>          tableEnv.executeSql(kafka_source_sql);
>         tableEnv.executeSql("drop table if exists 
> persons_message_table_kafka2");
>          String kafka_source_sql2 = "CREATE TABLE 
> persons_message_table_kafka2 (\n" +
>                  "  `id` BIGINT,\n" +
>                  "  `name` STRING,\n" +
>                  "  `message` STRING,\n" +
>                  "  `ts` TIMESTAMP(3) ," +
>  //                " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
>                  " WATERMARK FOR ts AS ts\n" +
>                  ") WITH (\n" +
>                  "  'connector' = 'kafka',\n" +
>                  "  'topic' = 'persons_extra_message2',\n" +
>                  "  'properties.bootstrap.servers' = 'node2:6667',\n" +
>                  "  'properties.group.id' = 'testGroud2e313',\n" +
>                  "  'scan.startup.mode' = 'group-offsets',\n" +
>                  "  'format' = 'json'\n" +
>                  ")";
>          tableEnv.executeSql(kafka_source_sql2);
>         tableEnv.executeSql("" +
>                  "CREATE TEMPORARY VIEW result_data_view " +
>                  " as " +
>                  " select " +
>                  " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as 
> string) as ts2 " +
>                  " from persons_table_kafka2 t1 " +
>                  " left  join persons_message_table_kafka2 t2 on t1.name = 
> t2.name and t1.ts between " +
>                          " t2.ts and t2.ts +  INTERVAL '3' MINUTE"
>                  );
>         Table resultTable = tableEnv.from("result_data_view");
>          DataStream<RowData> rowDataDataStream = 
> tableEnv.toAppendStream(resultTable, RowData.class);
>          rowDataDataStream.print();
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to