Hi,
JDBC source only does a snapshot and sends all datas in the snapshot to
downstream when it works as a right stream of a regular join, it could not
produce a changlog stream.
After you update the field 'target'  from '56.32.15.55:8080' to '
56.32.15.54:8080', JDBC source would not send new data to downstream.

You could try to use Upsert kafka [1] as right side of the regular join and
set `source` as primary key.

BTW, if use Processing TIme Temporal Join[2] in your case, you could always
join the latest version of dimension table, but updates on dimension table
would not trigger join because it only waits for look up by keys.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins

Best regards,
JING ZHANG


Yun Gao <yungao...@aliyun.com> 于2021年6月4日周五 下午5:07写道:

> Hi,
>
> I'm not the expert for the table/sql, but it seems to me that for regular
> joins, Flink would not re-read the dimension
> table after it has read it fully for the first time. If you want to always
> join the records with the latest version of
> dimension table, you may need to use the temporal joins [1].
>
> Best,
> Yun
>
>
> [1]
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins
>
>
> ------------------------------------------------------------------
> From:1095193...@qq.com <1095193...@qq.com>
> Send Time:2021 Jun. 4 (Fri.) 16:45
> To:user <user@flink.apache.org>
> Subject:Flink sql regular join not working as expect.
>
> Hi
>    I am working on joining a Kafka stream with a Postgres Dimension
> table.  Accoring to:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
>  *"**Regular joins are the most generic type of join in which any new
> record, or changes to either side of the join, are visible and affect the
> entirety of the join result."*
>    However, in my test, change record in dimenstion table will not affect
> the result of the join.  My test steps:
>    1. create Kafka table sql
>       CREATE TABLE test1 (  source String )  WITH (  'connector' =
> 'kafka',   'topic' = 'test' ...)
>    2.create dimension table sql
>      CREATE TABLE test2 (source String, target String)  WITH  (
> 'connector' = 'jdbc'... )
>      Prepared 1 record in dimenion table:
>      source                      |   target
>   172.16.1.109:8080       | 56.32.15.55:8080
>    3. regular join sql
>        select test1.source, test2.target from test1 join test2 on
> test1.source = test2.source
>    4. feed data into Kafka
>       {"source":"172.16.1.109:8080"}
>       Flink could output result as expect:  +I[172.16.1.109:8080,
> 56.32.15.55:8080]
>    5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080'
> in dimension table:
>       source                      |   target
>     172.16.1.109:8080 56.32.15.54:8080
>    6. feed data into Kafka
>       {"source":"172.16.1.109:8080"}
>       Flink still output result as not affected by changes to dimension
> table:  +I[172.16.1.109:8080, 56.32.15.55:8080]
>       Expect result:                  +I[172.16.1.109:8080,
> 56.32.15.54:8080]
>     Could you give me some suggestions why regualar join result not be
> affected by changes to dimension table in mytest? Appreciation.
>
> ------------------------------
> 1095193...@qq.com
>
>
>

Reply via email to