Re:Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread wang
Hi,


Got it, seems this way is not flexable enough, but still thanks so much for 
your great support!  Good wished!




Regards && Thanks
Hunk








At 2022-04-02 16:34:29, "Qingsheng Ren"  wrote:
>Hi,
>
>If the schema of records is not fixed I’m afraid you have to do it in UDTF. 
>
>Best, 
>
>Qingsheng
>
>> On Apr 2, 2022, at 15:45, wang <24248...@163.com> wrote:
>> 
>> Hi,
>> 
>> Thanks for your quick response! 
>> 
>> And I tried the format "raw", seems it only support single physical column, 
>> and in our project reqiurement, there are more than one hundred columns in 
>> sink table. So I need combine those columns into one string in a single UDF?
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2022-04-02 15:17:14, "Qingsheng Ren"  wrote:
>> >Hi,
>> >
>> >You can construct the final json string in your UDTF, and write it to Kafka 
>> >sink table with only one field, which is the entire json string constructed 
>> >in UDTF, and use raw format [1] in the sink table:
>> >
>> >CREATE TABLE TableSink (
>> >`final_json_string` STRING
>> >) WITH (
>> >‘connector’ = ‘kafka’,
>> >‘format’ = ‘raw’,
>> >...
>> >)
>> >
>> >So the entire flow would be like:
>> >
>> >1. Kafka source table reads data
>> >2. UDTF parses the ‘content’ field, and construct the final json (id, 
>> >content without backslash) string you need, maybe using Jackson [2] or 
>> >other json tools
>> >3. Insert the constructed json string as the only field in sink table
>> >
>> >The key problem is that the schema of field “content” is not fixed, so you 
>> >have to complete most logics in UDTF. 
>> >
>> >[1] 
>> >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
>> >[2] https://github.com/FasterXML/jackson
>> >
>> >Best regards, 
>> >
>> >Qingsheng
>> >
>> >
>> >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
>> >> 
>> >> Hi,
>> >> 
>> >> Thanks so much for your support! 
>> >> 
>> >> But sorry to say I'm still confused about it. No matter what the udf 
>> >> looks like, the first thing I need confirm is the type of 'content' in 
>> >> TableSink, what's the type of it should be, should I use type Row, like 
>> >> this?
>> >> 
>> >>  CREATE TABLE TableSink (
>> >>   `id` STRING NOT NULL,
>> >>   `content` ROW
>> >>  )
>> >>  WITH (
>> >>  ...
>> >> );
>> >> 
>> >> This type is only suitable for source input {"schema": "schema_infos", 
>> >> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> >> 
>> >> But the json key name and format of 'content' in source is variable, if 
>> >> the source input is 
>> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> >> 
>> >> I should define `content` in TableSink with type `content` ROW> >> STRING, BackgroundColor STRING, Height BIGINT>, like this:
>> >> 
>> >>  CREATE TABLE TableSink (
>> >>   `id` STRING NOT NULL,
>> >>   `content` ROW> >> BIGINT>
>> >>  )
>> >>  WITH (
>> >>  ...
>> >> );
>> >> 
>> >> And in input json also might contains json array, like: 
>> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
>> >> 
>> >> 
>> >> So is there some common type I can use which can handle all input json 
>> >> formats?  
>> >> 
>> >> Thanks so much!!
>> >> 
>> >> 
>> >> Thanks && Regards,
>> >> Hunk
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> At 2022-04-01 17:25:59, "Qingsheng Ren" > >> > wrote:
>> >> >Hi, 
>> >> >
>> >> >I’m afraid you have to use a UDTF to parse the content and construct the 
>> >> >final json string manually. The key problem is that the field “content” 
>> >> >is actually a STRING, although it looks like a json object. Currently 
>> >> >the json format provided by Flink could not handle this kind of field 
>> >> >defined as STRING. Also considering the schema of this “content” field 
>> >> >is not fixed across records, Flink SQL can’t use one DDL to consume / 
>> >> >produce records with changing schema. 
>> >> >
>> >> >Cheers,
>> >> >
>> >> >Qingsheng
>> >> >
>> >> >> On Mar 31, 2022, at 21:42, wang <
>> >> 24248...@163.com
>> >> > wrote:
>> >> >> 
>> >> >> Hi dear engineer,
>> >> >> 
>> >> >> Thanks so much for your precious time reading my email!
>> >> >> Resently I'm working on the Flink sql (with version 1.13) in my 
>> >> >> project and encountered one problem about json format data, hope you 
>> >> >> can take a look, thanks! Below is the description of my issue.
>> >> >> 
>> >> >> I use kafka as source and sink, I define kafka source table like this:
>> >> >> 
>> >> >>  CREATE TABLE TableSource (
>> >> >>   schema STRING,
>> >> >>   payload ROW(
>> >> >>   `id` STRING,
>> 

Re:Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread wang
Hi,


Thanks for your quick response! 


And I tried the format "raw", seems it only support single physical column, and 
in our project reqiurement, there are more than one hundred columns in sink 
table. So I need combine those columns into one string in a single UDF?


Thanks && Regards,
Hunk

















At 2022-04-02 15:17:14, "Qingsheng Ren"  wrote:
>Hi,
>
>You can construct the final json string in your UDTF, and write it to Kafka 
>sink table with only one field, which is the entire json string constructed in 
>UDTF, and use raw format [1] in the sink table:
>
>CREATE TABLE TableSink (
>`final_json_string` STRING
>) WITH (
>‘connector’ = ‘kafka’,
>‘format’ = ‘raw’,
>...
>)
>
>So the entire flow would be like:
>
>1. Kafka source table reads data
>2. UDTF parses the ‘content’ field, and construct the final json (id, content 
>without backslash) string you need, maybe using Jackson [2] or other json tools
>3. Insert the constructed json string as the only field in sink table
>
>The key problem is that the schema of field “content” is not fixed, so you 
>have to complete most logics in UDTF. 
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
>[2] https://github.com/FasterXML/jackson
>
>Best regards, 
>
>Qingsheng
>
>
>> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
>> 
>> Hi,
>> 
>> Thanks so much for your support! 
>> 
>> But sorry to say I'm still confused about it. No matter what the udf looks 
>> like, the first thing I need confirm is the type of 'content' in TableSink, 
>> what's the type of it should be, should I use type Row, like this?
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` ROW
>>  )
>>  WITH (
>>  ...
>> );
>> 
>> This type is only suitable for source input {"schema": "schema_infos", 
>> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> But the json key name and format of 'content' in source is variable, if the 
>> source input is 
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> I should define `content` in TableSink with type `content` ROW> BackgroundColor STRING, Height BIGINT>, like this:
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` ROW
>>  )
>>  WITH (
>>  ...
>> );
>> 
>> And in input json also might contains json array, like: 
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
>> 
>> 
>> So is there some common type I can use which can handle all input json 
>> formats?  
>> 
>> Thanks so much!!
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2022-04-01 17:25:59, "Qingsheng Ren" > > wrote:
>> >Hi, 
>> >
>> >I’m afraid you have to use a UDTF to parse the content and construct the 
>> >final json string manually. The key problem is that the field “content” is 
>> >actually a STRING, although it looks like a json object. Currently the json 
>> >format provided by Flink could not handle this kind of field defined as 
>> >STRING. Also considering the schema of this “content” field is not fixed 
>> >across records, Flink SQL can’t use one DDL to consume / produce records 
>> >with changing schema. 
>> >
>> >Cheers,
>> >
>> >Qingsheng
>> >
>> >> On Mar 31, 2022, at 21:42, wang <
>> 24248...@163.com
>> > wrote:
>> >> 
>> >> Hi dear engineer,
>> >> 
>> >> Thanks so much for your precious time reading my email!
>> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
>> >> and encountered one problem about json format data, hope you can take a 
>> >> look, thanks! Below is the description of my issue.
>> >> 
>> >> I use kafka as source and sink, I define kafka source table like this:
>> >> 
>> >>  CREATE TABLE TableSource (
>> >>   schema STRING,
>> >>   payload ROW(
>> >>   `id` STRING,
>> >>   `content` STRING
>> >>  )
>> >>  )
>> >>  WITH (
>> >>  'connector' = 'kafka',
>> >>  'topic' = 'topic_source',
>> >>  'properties.bootstrap.servers' = 'localhost:9092',
>> >>  '
>> properties.group.id
>> ' = 'all_gp',
>> >>  'scan.startup.mode' = 'group-offsets',
>> >>  'format' = 'json',
>> >>  'json.fail-on-missing-field' = 'false',
>> >>  'json.ignore-parse-errors' = 'true'
>> >>  );
>> >> 
>> >> Define the kafka sink table like this:
>> >> 
>> >>  CREATE TABLE TableSink (
>> >>   `id` STRING NOT NULL,
>> >>   `content` STRING NOT NULL
>> >>  )
>> >>  WITH (
>> >>  'connector' = 'kafka',
>> >>  'topic' = 'topic_sink',
>> >>  

Re:Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread wang
Hi,




Thanks so much for your support! 




But sorry to say I'm still confused about it. No matter what the udf looks 
like, the first thing I need confirm is the type of 'content' in TableSink, 
what's the type of it should be, should I use type Row, like this?




 CREATE TABLE TableSink (

  `id` STRING NOT NULL,

  `content` ROW

 )

 WITH (

 ...

);




This type is only suitable for source input {"schema": "schema_infos", 
"payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}




But the json key name and format of 'content' in source is variable, if the 
source input is 

{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




I should define `content` in TableSink with type `content` ROW, like this:




 CREATE TABLE TableSink (

  `id` STRING NOT NULL,

  `content` ROW

 )

 WITH (

 ...

);



And in input json also might contains json array, like: 
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}




So is there some common type I can use which can handle all input json formats? 
 


Thanks so much!!







Thanks && Regards,

Hunk

















At 2022-04-01 17:25:59, "Qingsheng Ren"  wrote:
>Hi, 
>
>I’m afraid you have to use a UDTF to parse the content and construct the final 
>json string manually. The key problem is that the field “content” is actually 
>a STRING, although it looks like a json object. Currently the json format 
>provided by Flink could not handle this kind of field defined as STRING. Also 
>considering the schema of this “content” field is not fixed across records, 
>Flink SQL can’t use one DDL to consume / produce records with changing schema. 
>
>Cheers,
>
>Qingsheng
>
>> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
>> 
>> Hi dear engineer,
>> 
>> Thanks so much for your precious time reading my email!
>> Resently I'm working on the Flink sql (with version 1.13) in my project and 
>> encountered one problem about json format data, hope you can take a look, 
>> thanks! Below is the description of my issue.
>> 
>> I use kafka as source and sink, I define kafka source table like this:
>> 
>>  CREATE TABLE TableSource (
>>   schema STRING,
>>   payload ROW(
>>   `id` STRING,
>>   `content` STRING
>>  )
>>  )
>>  WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'topic_source',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'properties.group.id' = 'all_gp',
>>  'scan.startup.mode' = 'group-offsets',
>>  'format' = 'json',
>>  'json.fail-on-missing-field' = 'false',
>>  'json.ignore-parse-errors' = 'true'
>>  );
>> 
>> Define the kafka sink table like this:
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` STRING NOT NULL
>>  )
>>  WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'topic_sink',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'format' = 'json',
>>  'json.fail-on-missing-field' = 'false',
>>  'json.ignore-parse-errors' = 'true'
>> );
>> 
>> 
>> Then insert into TableSink with data from TableSource:
>> INSERT INTO TableSink SELECT id, content FROM TableSource;
>> 
>> Then I use "kafka-console-producer.sh" to produce data below into topic 
>> "topic_source" (TableSource):
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> 
>> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
>> output is:
>> {"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}
>> 
>> But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
>> I want the the value of "content" is json object, not json string.
>> 
>> And what's more, the format of "content" in TableSource is not fixed, it can 
>> be any json formated(or json array format) string, such as:
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> 
>> So my question is, how can I transform json format string(like 
>> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
>> (like{"name":"Jone","age":20} ).
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>>