Hi, Elakiya.
Are you following the example here[1]? Could you attach a minimal, reproducible 
SQL?


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/







--

    Best!
    Xuyang




At 2023-12-06 17:49:17, "elakiya udhayanan" <laks....@gmail.com> wrote:

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA


Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya

Reply via email to