Hi, JDBC source only does a snapshot and sends all datas in the snapshot to downstream when it works as a right stream of a regular join, it could not produce a changlog stream. After you update the field 'target' from '56.32.15.55:8080' to ' 56.32.15.54:8080', JDBC source would not send new data to downstream.
You could try to use Upsert kafka [1] as right side of the regular join and set `source` as primary key. BTW, if use Processing TIme Temporal Join[2] in your case, you could always join the latest version of dimension table, but updates on dimension table would not trigger join because it only waits for look up by keys. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins Best regards, JING ZHANG Yun Gao <yungao...@aliyun.com> 于2021年6月4日周五 下午5:07写道: > Hi, > > I'm not the expert for the table/sql, but it seems to me that for regular > joins, Flink would not re-read the dimension > table after it has read it fully for the first time. If you want to always > join the records with the latest version of > dimension table, you may need to use the temporal joins [1]. > > Best, > Yun > > > [1] > <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins > > > ------------------------------------------------------------------ > From:1095193...@qq.com <1095193...@qq.com> > Send Time:2021 Jun. 4 (Fri.) 16:45 > To:user <user@flink.apache.org> > Subject:Flink sql regular join not working as expect. > > Hi > I am working on joining a Kafka stream with a Postgres Dimension > table. Accoring to: > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/ > *"**Regular joins are the most generic type of join in which any new > record, or changes to either side of the join, are visible and affect the > entirety of the join result."* > However, in my test, change record in dimenstion table will not affect > the result of the join. My test steps: > 1. create Kafka table sql > CREATE TABLE test1 ( source String ) WITH ( 'connector' = > 'kafka', 'topic' = 'test' ...) > 2.create dimension table sql > CREATE TABLE test2 (source String, target String) WITH ( > 'connector' = 'jdbc'... ) > Prepared 1 record in dimenion table: > source | target > 172.16.1.109:8080 | 56.32.15.55:8080 > 3. regular join sql > select test1.source, test2.target from test1 join test2 on > test1.source = test2.source > 4. feed data into Kafka > {"source":"172.16.1.109:8080"} > Flink could output result as expect: +I[172.16.1.109:8080, > 56.32.15.55:8080] > 5. change field 'target' from '56.32.15.55:8080' to '56.32.15.54:8080' > in dimension table: > source | target > 172.16.1.109:8080 56.32.15.54:8080 > 6. feed data into Kafka > {"source":"172.16.1.109:8080"} > Flink still output result as not affected by changes to dimension > table: +I[172.16.1.109:8080, 56.32.15.55:8080] > Expect result: +I[172.16.1.109:8080, > 56.32.15.54:8080] > Could you give me some suggestions why regualar join result not be > affected by changes to dimension table in mytest? Appreciation. > > ------------------------------ > 1095193...@qq.com > > >