Hi Chen, You should tell flink which table to insert by “INSERT INTO XXX SELECT XXX”.
For single non insert query, flink will collect output to the console automatically. Therefore, you don’t need to add insert also works. But you must point out target table specifically when you need to write data to external storage. Like, String relateQuery = "insert into xxx select correlator_id , name, relationship from Correlation; ; Best, Yu Chen 获取 Outlook for iOS<https://aka.ms/o0ukef> ________________________________ 发件人: Zhanghao Chen <zhanghao.c...@outlook.com> 发送时间: Wednesday, December 6, 2023 7:21:50 PM 收件人: elakiya udhayanan <laks....@gmail.com>; user@flink.apache.org <user@flink.apache.org> 主题: Re: Query on using two sinks for a Flink job (Flink SQL) Hi Elakiya, You can try executing TableEnvironmentImpl#executeInternal for non-insert statements, then using StatementSet.addInsertSql to add multiple insertion statetments, and finally calling StatementSet#execute. Best, Zhanghao Chen ________________________________ From: elakiya udhayanan <laks....@gmail.com> Sent: Wednesday, December 6, 2023 17:49 To: user@flink.apache.org <user@flink.apache.org> Subject: Query on using two sinks for a Flink job (Flink SQL) Hi Team, I would like to know the possibility of having two sinks in a single Flink job. In my case I am using the Flink SQL based job where I try to consume from two different Kafka topics using the create table (as below) DDL and then use a join condition to correlate them and at present write it to an external database (PostgreSQL - as a sink). I would like to know if I can add another sink where I want to also write it to kafka topic (as the second sink). I tried using two sql scripts (two create and two insert for the same) but was facing an exception "Cannot have more than one execute() or executeAsync() call in a single environment. at " Also tried to use the StatementSet functionality which again gave me an exception "org.apache.flink.table.api.TableException: Only insert statement is supported now. at ". I am looking for some help in regards to this. TIA Note: I am using the Flink UI to submit my job. Sample DDL statement used: String statement = "CREATE TABLE Person (\r\n" + " person ROW(id STRING, name STRING\r\n" + " ),\r\n" + " PRIMARY KEY (id) NOT ENFORCED\r\n" + ") WITH (\r\n" + " 'connector' = 'upsert-kafka',\r\n" + " 'topic' = 'employee',\r\n" + " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + " 'key.format' = 'raw',\r\n" + " 'value.format' = 'avro-confluent',\r\n" + " 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" + ")"; Thanks, Elakiya