Hi! You can define your sink with the following schema:
CREATE TABLE kafka_sink ( employee ROW<id STRING, name STRING> ) WITH ( 'connector' = 'kafka', 'format' = 'json' // other properties... ); You can also insert into this sink with the following SQL: INSERT INTO kafka_sink SELECT ROW(id, name) FROM kafka_source; 1095193...@qq.com <1095193...@qq.com> 于2021年7月9日周五 下午7:08写道: > Hi community, > I'll receive json message from Kafka, convert flat json to nested json and > send it back to Kafka. > receive message from Kafka: {“id”:"001","name":"wang"} > send message back to Kafka: {"employee":{“id”:"001","name":"wang"}} > How to do it in Flink sql? > > ------------------------------ > 1095193...@qq.com >