Hi, Amir.
May look like using scala code:
val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn string)
WITH ('connector' = 'kafka', ...);
val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn string)
WITH ('connector' = 'kafka', ...);
// you will need to rename the field to join, otherwise, it'll
"org.apache.flink.table.api.ValidationException: Ambiguous column name: ssn".
val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2")
val result = t1.join(t3).where($"ssn" === $"ssn1");
Also, you can refer here for more detail[1].
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
Best regards,
Yuxia
----- 原始邮件 -----
发件人: "Amir Hossein Sharifzadeh" <[email protected]>
收件人: "dev" <[email protected]>
发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08
主题: Need help how to use Table API to join two Kafka streams
Hello,
I have a Kafka producer and a Kafka consumer that produces and consumes
multiple data respectively. You can think of two data sets here. Both
datasets have a similar structure but carry different data.
I want to implement a Table API to join two Kafka streams while I
consume them. For example, data1.ssn==data2.ssn
Constraints:
I don't want to change my producer or use FlinkKafkaProducer.
Thank you very much.
Best,
Amir