Hi dear engineer,

Thanks so much for your precious time reading my email!
Resently I'm working on the Flink sql (with version 1.13) in my project and 
encountered one problem about json format data, hope you can take a look, 
thanks! Below is the description of my issue.


I use kafka as source and sink, I define kafka source table like this:


     CREATE TABLE TableSource (
          schema STRING,
          payload ROW(
              `id` STRING,
              `content` STRING
         )
     )
     WITH (
         'connector' = 'kafka',
         'topic' = 'topic_source',
         'properties.bootstrap.servers' = 'localhost:9092',
         'properties.group.id' = 'all_gp',
         'scan.startup.mode' = 'group-offsets',
         'format' = 'json',
         'json.fail-on-missing-field' = 'false',
         'json.ignore-parse-errors' = 'true'
     );


Define the kafka sink table like this:


     CREATE TABLE TableSink (
          `id` STRING NOT NULL,
          `content` STRING NOT NULL
     )
     WITH (
         'connector' = 'kafka',
         'topic' = 'topic_sink',
         'properties.bootstrap.servers' = 'localhost:9092',
         'format' = 'json',
         'json.fail-on-missing-field' = 'false',
         'json.ignore-parse-errors' = 'true'
    );




Then insert into TableSink with data from TableSource:
    INSERT INTO TableSink SELECT id, content FROM TableSource;


Then I use "kafka-console-producer.sh" to produce data below into topic 
"topic_source" (TableSource):
    {"schema": "schema_infos", "payload": {"id": "10000", "content": 
"{\"name\":\"Jone\",\"age\":20}"}}




Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
output is:
    {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}


But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
I want the the value of "content" is json object, not json string.


And what's more, the format of "content" in TableSource is not fixed, it can be 
any json formated(or json array format) string, such as:
{"schema": "schema_infos", "payload": {"id": "10000", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




So my question is, how can I transform json format string(like 
"{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
(like{"name":"Jone","age":20} ).




Thanks && Regards,
Hunk

回复