May be you can get  some inspiration  from JsonDeserializationSchema an 
JSONKeyValueDeserializationSchema.













At 2022-04-02 14:47:08, "wang" <24248...@163.com> wrote:

Hi,




Thanks so much for your support! 




But sorry to say I'm still confused about it. No matter what the udf looks 
like, the first thing I need confirm is the type of 'content' in TableSink, 
what's the type of it should be, should I use type Row, like this?




     CREATE TABLE TableSink (

          `id` STRING NOT NULL,

          `content` ROW<name STRING, age BIGINT>

     )

     WITH (

         ...

    );




This type is only suitable for source input {"schema": "schema_infos", 
"payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}




But the json key name and format of 'content' in source is variable, if the 
source input is 

{"schema": "schema_infos", "payload": {"id": "10000", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




I should define `content` in TableSink with type `content` ROW<color STRING, 
BackgroundColor STRING, Height BIGINT>, like this:




     CREATE TABLE TableSink (

          `id` STRING NOT NULL,

          `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>

     )

     WITH (

         ...

    );



And in input json also might contains json array, like: 
{"schema": "schema_infos", "payload": {"id": "10000", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}




So is there some common type I can use which can handle all input json formats? 
 


Thanks so much!!







Thanks && Regards,

Hunk

















At 2022-04-01 17:25:59, "Qingsheng Ren" <renqs...@gmail.com> wrote:
>Hi, 
>
>I’m afraid you have to use a UDTF to parse the content and construct the final 
>json string manually. The key problem is that the field “content” is actually 
>a STRING, although it looks like a json object. Currently the json format 
>provided by Flink could not handle this kind of field defined as STRING. Also 
>considering the schema of this “content” field is not fixed across records, 
>Flink SQL can’t use one DDL to consume / produce records with changing schema. 
>
>Cheers,
>
>Qingsheng
>
>> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
>> 
>> Hi dear engineer,
>> 
>> Thanks so much for your precious time reading my email!
>> Resently I'm working on the Flink sql (with version 1.13) in my project and 
>> encountered one problem about json format data, hope you can take a look, 
>> thanks! Below is the description of my issue.
>> 
>> I use kafka as source and sink, I define kafka source table like this:
>> 
>>      CREATE TABLE TableSource (
>>           schema STRING,
>>           payload ROW(
>>               `id` STRING,
>>               `content` STRING
>>          )
>>      )
>>      WITH (
>>          'connector' = 'kafka',
>>          'topic' = 'topic_source',
>>          'properties.bootstrap.servers' = 'localhost:9092',
>>          'properties.group.id' = 'all_gp',
>>          'scan.startup.mode' = 'group-offsets',
>>          'format' = 'json',
>>          'json.fail-on-missing-field' = 'false',
>>          'json.ignore-parse-errors' = 'true'
>>      );
>> 
>> Define the kafka sink table like this:
>> 
>>      CREATE TABLE TableSink (
>>           `id` STRING NOT NULL,
>>           `content` STRING NOT NULL
>>      )
>>      WITH (
>>          'connector' = 'kafka',
>>          'topic' = 'topic_sink',
>>          'properties.bootstrap.servers' = 'localhost:9092',
>>          'format' = 'json',
>>          'json.fail-on-missing-field' = 'false',
>>          'json.ignore-parse-errors' = 'true'
>>     );
>> 
>> 
>> Then insert into TableSink with data from TableSource:
>>     INSERT INTO TableSink SELECT id, content FROM TableSource;
>> 
>> Then I use "kafka-console-producer.sh" to produce data below into topic 
>> "topic_source" (TableSource):
>>     {"schema": "schema_infos", "payload": {"id": "10000", "content": 
>> "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> 
>> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
>> output is:
>>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
>> 
>> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
>> I want the the value of "content" is json object, not json string.
>> 
>> And what's more, the format of "content" in TableSource is not fixed, it can 
>> be any json formated(or json array format) string, such as:
>> {"schema": "schema_infos", "payload": {"id": "10000", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> 
>> So my question is, how can I transform json format string(like 
>> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
>> (like{"name":"Jone","age":20} ).
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>>  





 

Reply via email to