Hi,
Although JDBC connector could not read changlog from Database, however
there are already connectors which could satisfy your demands. You could
use Maxwell <https://maxwells-daemon.io/>[1], Canal
<https://github.com/alibaba/canal/wiki> [2],Debezium <https://debezium.io/>[3]
CDC tools to capture changes in databases, please have a try.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/maxwell/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/canal/
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/

Best regards,
JING ZHANG

1095193...@qq.com <1095193...@qq.com> 于2021年6月5日周六 下午12:56写道:

> Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join.
> BTW, would we support read changelog for JDBC source when it works as right
> stream of a regular join in future?
>
> ------------------------------
> 1095193...@qq.com
>
>
> *From:* JING ZHANG <beyond1...@gmail.com>
> *Date:* 2021-06-04 18:32
> *To:* Yun Gao <yungao...@aliyun.com>
> *CC:* 1095193...@qq.com; user <user@flink.apache.org>
> *Subject:* Re: Flink sql regular join not working as expect.
> Hi,
> JDBC source only does a snapshot and sends all datas in the snapshot to
> downstream when it works as a right stream of a regular join, it could not
> produce a changlog stream.
> After you update the field 'target'  from '56.32.15.55:8080
> <http://56.3215.55:8080/>' to '56.32.15.54:8080', JDBC source would not
> send new data to downstream.
>
> You could try to use Upsert kafka [1] as right side of the regular join
> and set `source` as primary key.
>
> BTW, if use Processing TIme Temporal Join[2] in your case, you could
> always join the latest version of dimension table, but updates on dimension
> table would not trigger join because it only waits for look up by keys.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>
> Best regards,
> JING ZHANG
>
>
> Yun Gao <yungao...@aliyun.com> 于2021年6月4日周五 下午5:07写道:
>
>> Hi,
>>
>> I'm not the expert for the table/sql, but it seems to me that for regular
>> joins, Flink would not re-read the dimension
>> table after it has read it fully for the first time. If you want to
>> always join the records with the latest version of
>> dimension table, you may need to use the temporal joins [1].
>>
>> Best,
>> Yun
>>
>>
>> [1]
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>>
>>
>> ------------------------------------------------------------------
>> From:1095193...@qq.com <1095193...@qq.com>
>> Send Time:2021 Jun. 4 (Fri.) 16:45
>> To:user <user@flink.apache.org>
>> Subject:Flink sql regular join not working as expect.
>>
>> Hi
>>    I am working on joining a Kafka stream with a Postgres Dimension
>> table.  Accoring to:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
>>  *"**Regular joins are the most generic type of join in which any new
>> record, or changes to either side of the join, are visible and affect the
>> entirety of the join result."*
>>    However, in my test, change record in dimenstion table will not affect
>> the result of the join.  My test steps:
>>    1. create Kafka table sql
>>       CREATE TABLE test1 (  source String )  WITH (  'connector' =
>> 'kafka',   'topic' = 'test' ...)
>>    2.create dimension table sql
>>      CREATE TABLE test2 (source String, target String)  WITH  (
>> 'connector' = 'jdbc'... )
>>      Prepared 1 record in dimenion table:
>>      source                      |   target
>>   172.16.1.109:8080       | 56.32.15.55:8080
>>    3. regular join sql
>>        select test1.source, test2.target from test1 join test2 on
>> test1.source = test2.source
>>    4. feed data into Kafka
>>       {"source":"172.16.1.109:8080"}
>>       Flink could output result as expect:  +I[172.16.1.109:8080,
>> 56.32.15.55:8080]
>>    5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080'
>> in dimension table:
>>       source                      |   target
>>     172.16.1.109:8080 56.32.15.54:8080
>>    6. feed data into Kafka
>>       {"source":"172.16.1.109:8080"}
>>       Flink still output result as not affected by changes to dimension
>> table:  +I[172.16.1.109:8080, 56.32.15.55:8080]
>>       Expect result:                  +I[172.16.1.109:8080,
>> 56.32.15.54:8080]
>>     Could you give me some suggestions why regualar join result not be
>> affected by changes to dimension table in mytest? Appreciation.
>>
>> ------------------------------
>> 1095193...@qq.com
>>
>>
>>

Reply via email to