[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2022-09-21 Thread guixin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607693#comment-17607693
 ] 

guixin commented on FLINK-22826:


I have same problem. Did progress now?

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-blocker
> Fix For: 1.16.0
>
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-11-10 Thread yaoboxu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442047#comment-17442047
 ] 

yaoboxu commented on FLINK-22826:
-

flink 1.13.3 still appear this question.

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-blocker
> Fix For: 1.15.0
>
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-11-10 Thread yaoboxu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442045#comment-17442045
 ] 

yaoboxu commented on FLINK-22826:
-

when i upgrade my flink to 1.13.3, data also be deleted . because of some 
tables join by flink-cdc.  The question still not solved.

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-blocker
> Fix For: 1.15.0
>
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-08-26 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405164#comment-17405164
 ] 

Jingsong Lee commented on FLINK-22826:
--

[~nkruber] You are right, at least, we should have some mechanism to tell 
users...

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-blocker
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-08-26 Thread Nico Kruber (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405135#comment-17405135
 ] 

Nico Kruber commented on FLINK-22826:
-

While I see what [~lzljs3620320] mentioned as very important for upsert stream 
processing, I don't see a reference to filtering in the text above; only about 
the order of +u and -u; but maybe I'm wrong or I'm missing some details here.

If I am wrong, then the mentioned issue of filtering UPDATE_BEFORE and 
UPDATE_AFTER differently is still is a very serious issue which we should 
either fix or prevent users from running into that (by not allowing such 
queries?). I suppose that would be a priority higher than "minor"...

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-blocker
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-08-23 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402992#comment-17402992
 ] 

Jingsong Lee commented on FLINK-22826:
--

[~nkruber] This is related to FLINK-20374. But the case in this JIRA has other 
problem.
At present, we have no good way to ensure the correctness of random filtering. 
For the same record, +I may not be filtered and -U may be filtered, which may 
lead to wrong results.
This is a known issue.

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-blocker
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-08-20 Thread Nico Kruber (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402117#comment-17402117
 ] 

Nico Kruber commented on FLINK-22826:
-

[~lzljs3620320] Why did you re-open this ticket? Isn't it what FLINK-20374 is 
about?

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-blocker
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-06-02 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356153#comment-17356153
 ] 

徐州州 commented on FLINK-22826:
-

 I set the parallelism to 1, Data is still lost,

My business didn't use no window and state expiration time,

Two consecutive days of non-stop operation.

The first day is normal because the first day only takes the data of the day, 
but the status of the order table changes the next day, resulting in the 
operation of + u.

I write the Kafka upert table based on the canal JSON Association,

but I find that even if the parallelism is set to 1,

because the first day is modified the next day,

the order table data of the original layer of the second day is Three pieces of 
+ U data,

but when it comes to the join stage, you should execute - U first and then + u,

But the program executes +u first, and then -u.

I lost my data

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Blocker
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-06-02 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356151#comment-17356151
 ] 

Jingsong Lee commented on FLINK-22826:
--

What is orders table?

And what is the error?

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Blocker
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-06-01 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355419#comment-17355419
 ] 

徐州州 commented on FLINK-22826:
-

Okay I try it

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Blocker
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-06-01 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355418#comment-17355418
 ] 

徐州州 commented on FLINK-22826:
-

好的 我试下

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Blocker
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22826) flink sql1.13.1 causes data loss based on change log stream data join

2021-06-01 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354953#comment-17354953
 ] 

Leonard Xu commented on FLINK-22826:


[~sansejin] Thanks for the report, I think the root cause is same with 
FLINK-20374, I link this one to is, we can discuss under FLINK-20374.

> flink sql1.13.1 causes data loss based on change log stream data join
> -
>
> Key: FLINK-22826
> URL: https://issues.apache.org/jira/browse/FLINK-22826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0, 1.13.1
>Reporter: 徐州州
>Priority: Blocker
>
> {code:java}
> insert into dwd_order_detail
> select
>ord.Id,
>ord.Code,
>Status
>  concat(cast(ord.Id as String),if(oed.Id is null,'oed_null',cast(oed.Id  
> as STRING)),DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd'))  as uuids,
>  TO_DATE(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd')) as As_Of_Date
> from
> orders ord
> left join order_extend oed on  ord.Id=oed.OrderId and oed.IsDeleted=0 and 
> oed.CreateTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> where ( ord.OrderTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS 
> TIMESTAMP)
> or ord.ReviewTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> or ord.RejectTime>CAST(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd') AS TIMESTAMP)
> ) and ord.IsDeleted=0;
> {code}
> My upsert-kafka table for PRIMARY KEY for uuids.
> This is the logic of my kafka based canal-json stream data join and write to 
> Upsert-kafka tables I confirm that version 1.12 also has this problem I just 
> upgraded from 1.12 to 1.13.
> I look up a user s order data and order number XJ0120210531004794 in 
> canal-json original table as U which is normal.
> {code:java}
> | +U | XJ0120210531004794 |  50 |
> | +U | XJ0120210531004672 |  50 |
> {code}
> But written to upsert-kakfa via join, the data consumed from upsert kafka is,
> {code:java}
> | +I | XJ0120210531004794 |  50 |
> | -U | XJ0120210531004794 |  50 |
> {code}
> The order is two records this sheet in orders and order_extend tables has not 
> changed since created -U status caused my data loss not computed and the 
> final result was wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)