[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607693#comment-17607693 ] guixin commented on FLINK-22826: I have same problem. Did progress now? > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Minor > Labels: auto-deprioritized-major, stale-blocker > Fix For: 1.16.0 > > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442047#comment-17442047 ] yaoboxu commented on FLINK-22826: - flink 1.13.3 still appear this question. > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Minor > Labels: auto-deprioritized-major, stale-blocker > Fix For: 1.15.0 > > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442045#comment-17442045 ] yaoboxu commented on FLINK-22826: - when i upgrade my flink to 1.13.3, data also be deleted . because of some tables join by flink-cdc. The question still not solved. > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Minor > Labels: auto-deprioritized-major, stale-blocker > Fix For: 1.15.0 > > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405164#comment-17405164 ] Jingsong Lee commented on FLINK-22826: -- [~nkruber] You are right, at least, we should have some mechanism to tell users... > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Minor > Labels: auto-deprioritized-major, stale-blocker > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405135#comment-17405135 ] Nico Kruber commented on FLINK-22826: - While I see what [~lzljs3620320] mentioned as very important for upsert stream processing, I don't see a reference to filtering in the text above; only about the order of +u and -u; but maybe I'm wrong or I'm missing some details here. If I am wrong, then the mentioned issue of filtering UPDATE_BEFORE and UPDATE_AFTER differently is still is a very serious issue which we should either fix or prevent users from running into that (by not allowing such queries?). I suppose that would be a priority higher than "minor"... > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Minor > Labels: auto-deprioritized-major, stale-blocker > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402992#comment-17402992 ] Jingsong Lee commented on FLINK-22826: -- [~nkruber] This is related to FLINK-20374. But the case in this JIRA has other problem. At present, we have no good way to ensure the correctness of random filtering. For the same record, +I may not be filtered and -U may be filtered, which may lead to wrong results. This is a known issue. > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Minor > Labels: auto-deprioritized-major, stale-blocker > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402117#comment-17402117 ] Nico Kruber commented on FLINK-22826: - [~lzljs3620320] Why did you re-open this ticket? Isn't it what FLINK-20374 is about? > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Minor > Labels: auto-deprioritized-major, stale-blocker > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356153#comment-17356153 ] 徐州州 commented on FLINK-22826: - I set the parallelism to 1, Data is still lost, My business didn't use no window and state expiration time, Two consecutive days of non-stop operation. The first day is normal because the first day only takes the data of the day, but the status of the order table changes the next day, resulting in the operation of + u. I write the Kafka upert table based on the canal JSON Association, but I find that even if the parallelism is set to 1, because the first day is modified the next day, the order table data of the original layer of the second day is Three pieces of + U data, but when it comes to the join stage, you should execute - U first and then + u, But the program executes +u first, and then -u. I lost my data > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Blocker > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356151#comment-17356151 ] Jingsong Lee commented on FLINK-22826: -- What is orders table? And what is the error? > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Blocker > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355419#comment-17355419 ] 徐州州 commented on FLINK-22826: - Okay I try it > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Blocker > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355418#comment-17355418 ] 徐州州 commented on FLINK-22826: - 好的 我试下 > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Blocker > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join
[ https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354953#comment-17354953 ] Leonard Xu commented on FLINK-22826: [~sansejin] Thanks for the report, I think the root cause is same with FLINK-20374, I link this one to is, we can discuss under FLINK-20374. > flink sql1.13.1 causes data loss based on change log stream data join > - > > Key: FLINK-22826 > URL: https://issues.apache.org/jira/browse/FLINK-22826 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0, 1.13.1 >Reporter: 徐州州 >Priority: Blocker > > {code:java} > insert into dwd_order_detail > select >ord.Id, >ord.Code, >Status > concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id > as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as uuids, > TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date > from > orders ord > left join order_extend oed on ord.Id=oed.OrderId and oed.IsDeleted=0 and > oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS > TIMESTAMP) > or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP) > ) and ord.IsDeleted=0; > {code} > My upsert-kafka table for PRIMARY KEY for uuids. > This is the logic of my kafka based canal-json stream data join and write to > Upsert-kafka tables I confirm that version 1.12 also has this problem I just > upgraded from 1.12 to 1.13. > I look up a user s order data and order number XJ0120210531004794 in > canal-json original table as U which is normal. > {code:java} > | +U | XJ0120210531004794 | 50 | > | +U | XJ0120210531004672 | 50 | > {code} > But written to upsert-kakfa via join, the data consumed from upsert kafka is, > {code:java} > | +I | XJ0120210531004794 | 50 | > | -U | XJ0120210531004794 | 50 | > {code} > The order is two records this sheet in orders and order_extend tables has not > changed since created -U status caused my data loss not computed and the > final result was wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)