Hi!

You can define your sink with the following schema:

CREATE TABLE kafka_sink (
  employee ROW<id STRING, name STRING>
) WITH (
  'connector' = 'kafka',
  'format' = 'json'
  // other properties...
);

You can also insert into this sink with the following SQL:

INSERT INTO kafka_sink SELECT ROW(id, name) FROM kafka_source;

1095193...@qq.com <1095193...@qq.com> 于2021年7月9日周五 下午7:08写道:

> Hi community,
> I'll receive json message from Kafka, convert flat json to nested json and
> send it back to Kafka.
> receive message from Kafka: {“id”:"001","name":"wang"}
> send message back to Kafka:  {"employee":{“id”:"001","name":"wang"}}
> How to do it in Flink sql?
>
> ------------------------------
> 1095193...@qq.com
>

Reply via email to