Re: Re: Flink sql regular join not working as expect.

2021-06-05 Thread JING ZHANG
Hi,
After some investigation, I found a simpler way to satisfy the demand, use
cdc connector[1] which supports reading database snapshots and continues to
read binlogs in the database without deploying kafka and debezium.

Best regards,
JING ZHANG

[1] https://github.com/ververica/flink-cdc-connectors

JING ZHANG  于2021年6月5日周六 下午2:36写道:

> Hi,
> Although JDBC connector could not read changlog from Database, however
> there are already connectors which could satisfy your demands. You could
> use Maxwell <https://maxwells-daemon.io/>[1], Canal
> <https://github.com/alibaba/canal/wiki> [2],Debezium
> <https://debezium.io/>[3] CDC tools to capture changes in databases,
> please have a try.
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/maxwell/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/canal/
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/
>
> Best regards,
> JING ZHANG
>
> 1095193...@qq.com <1095193...@qq.com> 于2021年6月5日周六 下午12:56写道:
>
>> Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join.
>> BTW, would we support read changelog for JDBC source when it works as right
>> stream of a regular join in future?
>>
>> --
>> 1095193...@qq.com
>>
>>
>> *From:* JING ZHANG 
>> *Date:* 2021-06-04 18:32
>> *To:* Yun Gao 
>> *CC:* 1095193...@qq.com; user 
>> *Subject:* Re: Flink sql regular join not working as expect.
>> Hi,
>> JDBC source only does a snapshot and sends all datas in the snapshot to
>> downstream when it works as a right stream of a regular join, it could not
>> produce a changlog stream.
>> After you update the field 'target'  from '56.32.15.55:8080
>> <http://56.3215.55:8080/>' to '56.32.15.54:8080', JDBC source would not
>> send new data to downstream.
>>
>> You could try to use Upsert kafka [1] as right side of the regular join
>> and set `source` as primary key.
>>
>> BTW, if use Processing TIme Temporal Join[2] in your case, you could
>> always join the latest version of dimension table, but updates on dimension
>> table would not trigger join because it only waits for look up by keys.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>>
>> Best regards,
>> JING ZHANG
>>
>>
>> Yun Gao  于2021年6月4日周五 下午5:07写道:
>>
>>> Hi,
>>>
>>> I'm not the expert for the table/sql, but it seems to me that for
>>> regular joins, Flink would not re-read the dimension
>>> table after it has read it fully for the first time. If you want to
>>> always join the records with the latest version of
>>> dimension table, you may need to use the temporal joins [1].
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> [1]
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>>>
>>>
>>> --
>>> From:1095193...@qq.com <1095193...@qq.com>
>>> Send Time:2021 Jun. 4 (Fri.) 16:45
>>> To:user 
>>> Subject:Flink sql regular join not working as expect.
>>>
>>> Hi
>>>I am working on joining a Kafka stream with a Postgres Dimension
>>> table.  Accoring to:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
>>>  *"**Regular joins are the most generic type of join in which any new
>>> record, or changes to either side of the join, are visible and affect the
>>> entirety of the join result."*
>>>However, in my test, change record in dimenstion table will not
>>> affect the result of the join.  My test steps:
>>>1. create Kafka table sql
>>>   CREATE TABLE test1 (  source String )  WITH (  'connector' =
>>> 'kafka',   'topic' = 'test' ...)
>>>2.create dimension table sql
>>>  CREATE TABLE test2 (source String, target String)  WITH  (
>>> 'connector' = 'jdbc'... )
>>>  Prepared 1 record in dimenion table:
>>>  source  |   target
>>>   

Re: Re: Flink sql regular join not working as expect.

2021-06-05 Thread JING ZHANG
Hi,
Although JDBC connector could not read changlog from Database, however
there are already connectors which could satisfy your demands. You could
use Maxwell <https://maxwells-daemon.io/>[1], Canal
<https://github.com/alibaba/canal/wiki> [2],Debezium <https://debezium.io/>[3]
CDC tools to capture changes in databases, please have a try.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/maxwell/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/canal/
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/

Best regards,
JING ZHANG

1095193...@qq.com <1095193...@qq.com> 于2021年6月5日周六 下午12:56写道:

> Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join.
> BTW, would we support read changelog for JDBC source when it works as right
> stream of a regular join in future?
>
> --
> 1095193...@qq.com
>
>
> *From:* JING ZHANG 
> *Date:* 2021-06-04 18:32
> *To:* Yun Gao 
> *CC:* 1095193...@qq.com; user 
> *Subject:* Re: Flink sql regular join not working as expect.
> Hi,
> JDBC source only does a snapshot and sends all datas in the snapshot to
> downstream when it works as a right stream of a regular join, it could not
> produce a changlog stream.
> After you update the field 'target'  from '56.32.15.55:8080
> <http://56.3215.55:8080/>' to '56.32.15.54:8080', JDBC source would not
> send new data to downstream.
>
> You could try to use Upsert kafka [1] as right side of the regular join
> and set `source` as primary key.
>
> BTW, if use Processing TIme Temporal Join[2] in your case, you could
> always join the latest version of dimension table, but updates on dimension
> table would not trigger join because it only waits for look up by keys.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>
> Best regards,
> JING ZHANG
>
>
> Yun Gao  于2021年6月4日周五 下午5:07写道:
>
>> Hi,
>>
>> I'm not the expert for the table/sql, but it seems to me that for regular
>> joins, Flink would not re-read the dimension
>> table after it has read it fully for the first time. If you want to
>> always join the records with the latest version of
>> dimension table, you may need to use the temporal joins [1].
>>
>> Best,
>> Yun
>>
>>
>> [1]
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>>
>>
>> --
>> From:1095193...@qq.com <1095193...@qq.com>
>> Send Time:2021 Jun. 4 (Fri.) 16:45
>> To:user 
>> Subject:Flink sql regular join not working as expect.
>>
>> Hi
>>I am working on joining a Kafka stream with a Postgres Dimension
>> table.  Accoring to:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
>>  *"**Regular joins are the most generic type of join in which any new
>> record, or changes to either side of the join, are visible and affect the
>> entirety of the join result."*
>>However, in my test, change record in dimenstion table will not affect
>> the result of the join.  My test steps:
>>1. create Kafka table sql
>>   CREATE TABLE test1 (  source String )  WITH (  'connector' =
>> 'kafka',   'topic' = 'test' ...)
>>2.create dimension table sql
>>  CREATE TABLE test2 (source String, target String)  WITH  (
>> 'connector' = 'jdbc'... )
>>  Prepared 1 record in dimenion table:
>>  source  |   target
>>   172.16.1.109:8080   | 56.32.15.55:8080
>>3. regular join sql
>>select test1.source, test2.target from test1 join test2 on
>> test1.source = test2.source
>>4. feed data into Kafka
>>   {"source":"172.16.1.109:8080"}
>>   Flink could output result as expect:  +I[172.16.1.109:8080,
>> 56.32.15.55:8080]
>>5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080'
>> in dimension table:
>>   source  |   target
>> 172.16.1.109:8080 56.32.15.54:8080
>>6. feed data into Kafka
>>   {"source":"172.16.1.109:8080"}
>>   Flink still output result as not affected by changes to dimension
>> table:  +I[172.16.1.109:8080, 56.32.15.55:8080]
>>   Expect result:  +I[172.16.1.109:8080,
>> 56.32.15.54:8080]
>> Could you give me some suggestions why regualar join result not be
>> affected by changes to dimension table in mytest? Appreciation.
>>
>> --
>> 1095193...@qq.com
>>
>>
>>


Re: Re: Flink sql regular join not working as expect.

2021-06-04 Thread 1095193...@qq.com
Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join. BTW, 
would we support read changelog for JDBC source when it works as right stream 
of a regular join in future?



1095193...@qq.com
 
From: JING ZHANG
Date: 2021-06-04 18:32
To: Yun Gao
CC: 1095193...@qq.com; user
Subject: Re: Flink sql regular join not working as expect.
Hi,
JDBC source only does a snapshot and sends all datas in the snapshot to 
downstream when it works as a right stream of a regular join, it could not 
produce a changlog stream.
After you update the field 'target'  from '56.32.15.55:8080' to 
'56.32.15.54:8080', JDBC source would not send new data to downstream.

You could try to use Upsert kafka [1] as right side of the regular join and set 
`source` as primary key.

BTW, if use Processing TIme Temporal Join[2] in your case, you could always 
join the latest version of dimension table, but updates on dimension table 
would not trigger join because it only waits for look up by keys.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins

Best regards,
JING ZHANG


Yun Gao  于2021年6月4日周五 下午5:07写道:
Hi, 

I'm not the expert for the table/sql, but it seems to me that for regular 
joins, Flink would not re-read the dimension 
table after it has read it fully for the first time. If you want to always join 
the records with the latest version of 
dimension table, you may need to use the temporal joins [1]. 

Best,
Yun


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins


--
From:1095193...@qq.com <1095193...@qq.com>
Send Time:2021 Jun. 4 (Fri.) 16:45
To:user 
Subject:Flink sql regular join not working as expect.

Hi
   I am working on joining a Kafka stream with a Postgres Dimension table.  
Accoring to: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
 "Regular joins are the most generic type of join in which any new record, or 
changes to either side of the join, are visible and affect the entirety of the 
join result."
   However, in my test, change record in dimenstion table will not affect the 
result of the join.  My test steps:
   1. create Kafka table sql 
  CREATE TABLE test1 (  source String )  WITH (  'connector' = 'kafka',   
'topic' = 'test' ...)
   2.create dimension table sql 
 CREATE TABLE test2 (source String, target String)  WITH  ( 'connector' 
= 'jdbc'... )
 Prepared 1 record in dimenion table:
 source  |   target
  172.16.1.109:8080   | 56.32.15.55:8080 
   3. regular join sql
   select test1.source, test2.target from test1 join test2 on test1.source 
= test2.source
   4. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink could output result as expect:  +I[172.16.1.109:8080, 
56.32.15.55:8080]
   5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080' in 
dimension table:  
  source  |   target
172.16.1.109:8080 56.32.15.54:8080
   6. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink still output result as not affected by changes to dimension table:  
+I[172.16.1.109:8080, 56.32.15.55:8080]
  Expect result:  +I[172.16.1.109:8080, 56.32.15.54:8080]
Could you give me some suggestions why regualar join result not be affected 
by changes to dimension table in mytest? Appreciation.



1095193...@qq.com



Re: Flink sql regular join not working as expect.

2021-06-04 Thread JING ZHANG
Hi,
JDBC source only does a snapshot and sends all datas in the snapshot to
downstream when it works as a right stream of a regular join, it could not
produce a changlog stream.
After you update the field 'target'  from '56.32.15.55:8080' to '
56.32.15.54:8080', JDBC source would not send new data to downstream.

You could try to use Upsert kafka [1] as right side of the regular join and
set `source` as primary key.

BTW, if use Processing TIme Temporal Join[2] in your case, you could always
join the latest version of dimension table, but updates on dimension table
would not trigger join because it only waits for look up by keys.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins

Best regards,
JING ZHANG


Yun Gao  于2021年6月4日周五 下午5:07写道:

> Hi,
>
> I'm not the expert for the table/sql, but it seems to me that for regular
> joins, Flink would not re-read the dimension
> table after it has read it fully for the first time. If you want to always
> join the records with the latest version of
> dimension table, you may need to use the temporal joins [1].
>
> Best,
> Yun
>
>
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>
>
> --
> From:1095193...@qq.com <1095193...@qq.com>
> Send Time:2021 Jun. 4 (Fri.) 16:45
> To:user 
> Subject:Flink sql regular join not working as expect.
>
> Hi
>I am working on joining a Kafka stream with a Postgres Dimension
> table.  Accoring to:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
>  *"**Regular joins are the most generic type of join in which any new
> record, or changes to either side of the join, are visible and affect the
> entirety of the join result."*
>However, in my test, change record in dimenstion table will not affect
> the result of the join.  My test steps:
>1. create Kafka table sql
>   CREATE TABLE test1 (  source String )  WITH (  'connector' =
> 'kafka',   'topic' = 'test' ...)
>2.create dimension table sql
>  CREATE TABLE test2 (source String, target String)  WITH  (
> 'connector' = 'jdbc'... )
>  Prepared 1 record in dimenion table:
>  source  |   target
>   172.16.1.109:8080   | 56.32.15.55:8080
>3. regular join sql
>select test1.source, test2.target from test1 join test2 on
> test1.source = test2.source
>4. feed data into Kafka
>   {"source":"172.16.1.109:8080"}
>   Flink could output result as expect:  +I[172.16.1.109:8080,
> 56.32.15.55:8080]
>5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080'
> in dimension table:
>   source  |   target
> 172.16.1.109:8080 56.32.15.54:8080
>6. feed data into Kafka
>   {"source":"172.16.1.109:8080"}
>   Flink still output result as not affected by changes to dimension
> table:  +I[172.16.1.109:8080, 56.32.15.55:8080]
>   Expect result:  +I[172.16.1.109:8080,
> 56.32.15.54:8080]
> Could you give me some suggestions why regualar join result not be
> affected by changes to dimension table in mytest? Appreciation.
>
> --
> 1095193...@qq.com
>
>
>


Re: Flink sql regular join not working as expect.

2021-06-04 Thread Yun Gao
Hi, 

I'm not the expert for the table/sql, but it seems to me that for regular 
joins, Flink would not re-read the dimension 
table after it has read it fully for the first time. If you want to always join 
the records with the latest version of 
dimension table, you may need to use the temporal joins [1]. 

Best,
Yun


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins



--
From:1095193...@qq.com <1095193...@qq.com>
Send Time:2021 Jun. 4 (Fri.) 16:45
To:user 
Subject:Flink sql regular join not working as expect.

Hi
   I am working on joining a Kafka stream with a Postgres Dimension table.  
Accoring to: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
 "Regular joins are the most generic type of join in which any new record, or 
changes to either side of the join, are visible and affect the entirety of the 
join result."
   However, in my test, change record in dimenstion table will not affect the 
result of the join.  My test steps:
  1. create Kafka table sql 
  CREATE TABLE test1 (  source String )  WITH (  'connector' = 'kafka',   
'topic' = 'test' ...)
   2.create dimension table sql 
 CREATE TABLE test2 (source String, target String)  WITH  ( 'connector' 
= 'jdbc'... )
 Prepared 1 record in dimenion table:
 source  |   target
 172.16.1.109:8080   | 56.32.15.55:8080 
  3. regular join sql
 select test1.source, test2.target from test1 join test2 on test1.source = 
test2.source
   4. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink could output result as expect:  +I[172.16.1.109:8080, 
56.32.15.55:8080]
   5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080' in 
dimension table:  
  source  |   target
172.16.1.109:8080 56.32.15.54:8080
   6. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink still output result as not affected by changes to dimension table:  
+I[172.16.1.109:8080, 56.32.15.55:8080]
  Expect result:  +I[172.16.1.109:8080, 56.32.15.54:8080]
Could you give me some suggestions why regualar join result not be affected 
by changes to dimension table in mytest? Appreciation.



1095193...@qq.com



Flink sql regular join not working as expect.

2021-06-04 Thread 1095193...@qq.com
Hi
   I am working on joining a Kafka stream with a Postgres Dimension table.  
Accoring to: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
 "Regular joins are the most generic type of join in which any new record, or 
changes to either side of the join, are visible and affect the entirety of the 
join result."
   However, in my test, change record in dimenstion table will not affect the 
result of the join.  My test steps:
   1. create Kafka table sql 
  CREATE TABLE test1 (  source String )  WITH (  'connector' = 'kafka',   
'topic' = 'test' ...)
   2.create dimension table sql 
 CREATE TABLE test2 (source String, target String)  WITH  ( 'connector' 
= 'jdbc'... )
 Prepared 1 record in dimenion table:
 source  |   target
  172.16.1.109:8080   | 56.32.15.55:8080 
   3. regular join sql
   select test1.source, test2.target from test1 join test2 on test1.source 
= test2.source
   4. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink could output result as expect:  +I[172.16.1.109:8080, 
56.32.15.55:8080]
   5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080' in 
dimension table:  
  source  |   target
172.16.1.109:8080 56.32.15.54:8080
   6. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink still output result as not affected by changes to dimension table:  
+I[172.16.1.109:8080, 56.32.15.55:8080]
  Expect result:  +I[172.16.1.109:8080, 56.32.15.54:8080]
Could you give me some suggestions why regualar join result not be affected 
by changes to dimension table in mytest? Appreciation.



1095193...@qq.com