Hi, Although JDBC connector could not read changlog from Database, however there are already connectors which could satisfy your demands. You could use Maxwell <https://maxwells-daemon.io/>[1], Canal <https://github.com/alibaba/canal/wiki> [2],Debezium <https://debezium.io/>[3] CDC tools to capture changes in databases, please have a try. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/maxwell/ [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/canal/ [3] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/
Best regards, JING ZHANG 1095193...@qq.com <1095193...@qq.com> 于2021年6月5日周六 下午12:56写道: > Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join. > BTW, would we support read changelog for JDBC source when it works as right > stream of a regular join in future? > > ------------------------------ > 1095193...@qq.com > > > *From:* JING ZHANG <beyond1...@gmail.com> > *Date:* 2021-06-04 18:32 > *To:* Yun Gao <yungao...@aliyun.com> > *CC:* 1095193...@qq.com; user <user@flink.apache.org> > *Subject:* Re: Flink sql regular join not working as expect. > Hi, > JDBC source only does a snapshot and sends all datas in the snapshot to > downstream when it works as a right stream of a regular join, it could not > produce a changlog stream. > After you update the field 'target' from '56.32.15.55:8080 > <http://56.3215.55:8080/>' to '56.32.15.54:8080', JDBC source would not > send new data to downstream. > > You could try to use Upsert kafka [1] as right side of the regular join > and set `source` as primary key. > > BTW, if use Processing TIme Temporal Join[2] in your case, you could > always join the latest version of dimension table, but updates on dimension > table would not trigger join because it only waits for look up by keys. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins > > Best regards, > JING ZHANG > > > Yun Gao <yungao...@aliyun.com> 于2021年6月4日周五 下午5:07写道: > >> Hi, >> >> I'm not the expert for the table/sql, but it seems to me that for regular >> joins, Flink would not re-read the dimension >> table after it has read it fully for the first time. If you want to >> always join the records with the latest version of >> dimension table, you may need to use the temporal joins [1]. >> >> Best, >> Yun >> >> >> [1] >> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins >> >> >> ------------------------------------------------------------------ >> From:1095193...@qq.com <1095193...@qq.com> >> Send Time:2021 Jun. 4 (Fri.) 16:45 >> To:user <user@flink.apache.org> >> Subject:Flink sql regular join not working as expect. >> >> Hi >> I am working on joining a Kafka stream with a Postgres Dimension >> table. Accoring to: >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/ >> *"**Regular joins are the most generic type of join in which any new >> record, or changes to either side of the join, are visible and affect the >> entirety of the join result."* >> However, in my test, change record in dimenstion table will not affect >> the result of the join. My test steps: >> 1. create Kafka table sql >> CREATE TABLE test1 ( source String ) WITH ( 'connector' = >> 'kafka', 'topic' = 'test' ...) >> 2.create dimension table sql >> CREATE TABLE test2 (source String, target String) WITH ( >> 'connector' = 'jdbc'... ) >> Prepared 1 record in dimenion table: >> source | target >> 172.16.1.109:8080 | 56.32.15.55:8080 >> 3. regular join sql >> select test1.source, test2.target from test1 join test2 on >> test1.source = test2.source >> 4. feed data into Kafka >> {"source":"172.16.1.109:8080"} >> Flink could output result as expect: +I[172.16.1.109:8080, >> 56.32.15.55:8080] >> 5. change field 'target' from '56.32.15.55:8080' to '56.32.15.54:8080' >> in dimension table: >> source | target >> 172.16.1.109:8080 56.32.15.54:8080 >> 6. feed data into Kafka >> {"source":"172.16.1.109:8080"} >> Flink still output result as not affected by changes to dimension >> table: +I[172.16.1.109:8080, 56.32.15.55:8080] >> Expect result: +I[172.16.1.109:8080, >> 56.32.15.54:8080] >> Could you give me some suggestions why regualar join result not be >> affected by changes to dimension table in mytest? Appreciation. >> >> ------------------------------ >> 1095193...@qq.com >> >> >>