Thank you very much, Yuxia!

ssn stands for the social security number here (it was just an example but
it can be any field).

Best,
Amir


On Fri, Feb 3, 2023 at 5:45 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:

> Hi, Amir.
> May look like using scala code:
>
> val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn
> string) WITH ('connector' = 'kafka', ...);
> val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn
> string) WITH ('connector' = 'kafka', ...);
>
> // you will need to rename the field to join, otherwise, it'll
> "org.apache.flink.table.api.ValidationException: Ambiguous column name:
> ssn".
> val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2")
> val result = t1.join(t3).where($"ssn" === $"ssn1");
>
> Also, you can refer here for more detail[1].
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -----
> 发件人: "Amir Hossein Sharifzadeh" <amirsharifza...@gmail.com>
> 收件人: "dev" <dev@flink.apache.org>
> 发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08
> 主题: Need help how to use Table API to join two Kafka streams
>
> Hello,
>
> I have a Kafka producer and a Kafka consumer that produces and consumes
> multiple data respectively. You can think of two data sets here. Both
> datasets have a similar structure but carry different data.
>
> I want to implement a Table API to join two Kafka streams while I
> consume them. For example, data1.ssn==data2.ssn
>
> Constraints:
> I don't want to change my producer or use FlinkKafkaProducer.
>
> Thank you very much.
>
> Best,
> Amir
>

Reply via email to