Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread Qingsheng Ren
Hi,

If the schema of records is not fixed I’m afraid you have to do it in UDTF. 

Best, 

Qingsheng

> On Apr 2, 2022, at 15:45, wang <24248...@163.com> wrote:
> 
> Hi,
> 
> Thanks for your quick response! 
> 
> And I tried the format "raw", seems it only support single physical column, 
> and in our project reqiurement, there are more than one hundred columns in 
> sink table. So I need combine those columns into one string in a single UDF?
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> 
> At 2022-04-02 15:17:14, "Qingsheng Ren"  wrote:
> >Hi,
> >
> >You can construct the final json string in your UDTF, and write it to Kafka 
> >sink table with only one field, which is the entire json string constructed 
> >in UDTF, and use raw format [1] in the sink table:
> >
> >CREATE TABLE TableSink (
> >`final_json_string` STRING
> >) WITH (
> >‘connector’ = ‘kafka’,
> >‘format’ = ‘raw’,
> >...
> >)
> >
> >So the entire flow would be like:
> >
> >1. Kafka source table reads data
> >2. UDTF parses the ‘content’ field, and construct the final json (id, 
> >content without backslash) string you need, maybe using Jackson [2] or other 
> >json tools
> >3. Insert the constructed json string as the only field in sink table
> >
> >The key problem is that the schema of field “content” is not fixed, so you 
> >have to complete most logics in UDTF. 
> >
> >[1] 
> >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
> >[2] https://github.com/FasterXML/jackson
> >
> >Best regards, 
> >
> >Qingsheng
> >
> >
> >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
> >> 
> >> Hi,
> >> 
> >> Thanks so much for your support! 
> >> 
> >> But sorry to say I'm still confused about it. No matter what the udf looks 
> >> like, the first thing I need confirm is the type of 'content' in 
> >> TableSink, what's the type of it should be, should I use type Row, like 
> >> this?
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` ROW
> >>  )
> >>  WITH (
> >>  ...
> >> );
> >> 
> >> This type is only suitable for source input {"schema": "schema_infos", 
> >> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> >> 
> >> But the json key name and format of 'content' in source is variable, if 
> >> the source input is 
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> >> 
> >> I should define `content` in TableSink with type `content` ROW >> STRING, BackgroundColor STRING, Height BIGINT>, like this:
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` ROW >> BIGINT>
> >>  )
> >>  WITH (
> >>  ...
> >> );
> >> 
> >> And in input json also might contains json array, like: 
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> >> 
> >> 
> >> So is there some common type I can use which can handle all input json 
> >> formats?  
> >> 
> >> Thanks so much!!
> >> 
> >> 
> >> Thanks && Regards,
> >> Hunk
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> At 2022-04-01 17:25:59, "Qingsheng Ren"  >> > wrote:
> >> >Hi, 
> >> >
> >> >I’m afraid you have to use a UDTF to parse the content and construct the 
> >> >final json string manually. The key problem is that the field “content” 
> >> >is actually a STRING, although it looks like a json object. Currently the 
> >> >json format provided by Flink could not handle this kind of field defined 
> >> >as STRING. Also considering the schema of this “content” field is not 
> >> >fixed across records, Flink SQL can’t use one DDL to consume / produce 
> >> >records with changing schema. 
> >> >
> >> >Cheers,
> >> >
> >> >Qingsheng
> >> >
> >> >> On Mar 31, 2022, at 21:42, wang <
> >> 24248...@163.com
> >> > wrote:
> >> >> 
> >> >> Hi dear engineer,
> >> >> 
> >> >> Thanks so much for your precious time reading my email!
> >> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
> >> >> and encountered one problem about json format data, hope you can take a 
> >> >> look, thanks! Below is the description of my issue.
> >> >> 
> >> >> I use kafka as source and sink, I define kafka source table like this:
> >> >> 
> >> >>  CREATE TABLE TableSource (
> >> >>   schema STRING,
> >> >>   payload ROW(
> >> >>   `id` STRING,
> >> >>   `content` STRING
> >> >>  )
> >> >>  )
> >> >>  WITH (
> >> >>  'connector' = 'kafka',
> >> >>  'topic' = 'topic_source',
> >> >>  'properties.bootstrap.servers' = 'localhost:9092',
> >> >>  '
> >> properties.group.id
> >> ' = 'all_gp',
> >> >>  'scan.startup.mode' = 

Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread Qingsheng Ren
Hi,

You can construct the final json string in your UDTF, and write it to Kafka 
sink table with only one field, which is the entire json string constructed in 
UDTF, and use raw format [1] in the sink table:

CREATE TABLE TableSink (
`final_json_string` STRING
) WITH (
‘connector’ = ‘kafka’,
‘format’ = ‘raw’,
...
)

So the entire flow would be like:

1. Kafka source table reads data
2. UDTF parses the ‘content’ field, and construct the final json (id, content 
without backslash) string you need, maybe using Jackson [2] or other json tools
3. Insert the constructed json string as the only field in sink table

The key problem is that the schema of field “content” is not fixed, so you have 
to complete most logics in UDTF. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
[2] https://github.com/FasterXML/jackson

Best regards, 

Qingsheng


> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
> 
> Hi,
> 
> Thanks so much for your support! 
> 
> But sorry to say I'm still confused about it. No matter what the udf looks 
> like, the first thing I need confirm is the type of 'content' in TableSink, 
> what's the type of it should be, should I use type Row, like this?
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` ROW
>  )
>  WITH (
>  ...
> );
> 
> This type is only suitable for source input {"schema": "schema_infos", 
> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> 
> But the json key name and format of 'content' in source is variable, if the 
> source input is 
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> I should define `content` in TableSink with type `content` ROW BackgroundColor STRING, Height BIGINT>, like this:
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` ROW
>  )
>  WITH (
>  ...
> );
> 
> And in input json also might contains json array, like: 
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> 
> 
> So is there some common type I can use which can handle all input json 
> formats?  
> 
> Thanks so much!!
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> At 2022-04-01 17:25:59, "Qingsheng Ren"  > wrote:
> >Hi, 
> >
> >I’m afraid you have to use a UDTF to parse the content and construct the 
> >final json string manually. The key problem is that the field “content” is 
> >actually a STRING, although it looks like a json object. Currently the json 
> >format provided by Flink could not handle this kind of field defined as 
> >STRING. Also considering the schema of this “content” field is not fixed 
> >across records, Flink SQL can’t use one DDL to consume / produce records 
> >with changing schema. 
> >
> >Cheers,
> >
> >Qingsheng
> >
> >> On Mar 31, 2022, at 21:42, wang <
> 24248...@163.com
> > wrote:
> >> 
> >> Hi dear engineer,
> >> 
> >> Thanks so much for your precious time reading my email!
> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
> >> and encountered one problem about json format data, hope you can take a 
> >> look, thanks! Below is the description of my issue.
> >> 
> >> I use kafka as source and sink, I define kafka source table like this:
> >> 
> >>  CREATE TABLE TableSource (
> >>   schema STRING,
> >>   payload ROW(
> >>   `id` STRING,
> >>   `content` STRING
> >>  )
> >>  )
> >>  WITH (
> >>  'connector' = 'kafka',
> >>  'topic' = 'topic_source',
> >>  'properties.bootstrap.servers' = 'localhost:9092',
> >>  '
> properties.group.id
> ' = 'all_gp',
> >>  'scan.startup.mode' = 'group-offsets',
> >>  'format' = 'json',
> >>  'json.fail-on-missing-field' = 'false',
> >>  'json.ignore-parse-errors' = 'true'
> >>  );
> >> 
> >> Define the kafka sink table like this:
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` STRING NOT NULL
> >>  )
> >>  WITH (
> >>  'connector' = 'kafka',
> >>  'topic' = 'topic_sink',
> >>  'properties.bootstrap.servers' = 'localhost:9092',
> >>  'format' = 'json',
> >>  'json.fail-on-missing-field' = 'false',
> >>  'json.ignore-parse-errors' = 'true'
> >> );
> >> 
> >> 
> >> Then insert into TableSink with data from TableSource:
> >> INSERT INTO TableSink SELECT id, content FROM TableSource;
> >> 
> >> Then I use "kafka-console-producer.sh" to produce data below into topic 
> >> "topic_source" (TableSource):
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 

Re: Could you please give me a hand about json object in flink sql

2022-04-01 Thread Qingsheng Ren
Hi, 

I’m afraid you have to use a UDTF to parse the content and construct the final 
json string manually. The key problem is that the field “content” is actually a 
STRING, although it looks like a json object. Currently the json format 
provided by Flink could not handle this kind of field defined as STRING. Also 
considering the schema of this “content” field is not fixed across records, 
Flink SQL can’t use one DDL to consume / produce records with changing schema. 

Cheers,

Qingsheng

> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
> 
> Hi dear engineer,
> 
> Thanks so much for your precious time reading my email!
> Resently I'm working on the Flink sql (with version 1.13) in my project and 
> encountered one problem about json format data, hope you can take a look, 
> thanks! Below is the description of my issue.
> 
> I use kafka as source and sink, I define kafka source table like this:
> 
>  CREATE TABLE TableSource (
>   schema STRING,
>   payload ROW(
>   `id` STRING,
>   `content` STRING
>  )
>  )
>  WITH (
>  'connector' = 'kafka',
>  'topic' = 'topic_source',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'all_gp',
>  'scan.startup.mode' = 'group-offsets',
>  'format' = 'json',
>  'json.fail-on-missing-field' = 'false',
>  'json.ignore-parse-errors' = 'true'
>  );
> 
> Define the kafka sink table like this:
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` STRING NOT NULL
>  )
>  WITH (
>  'connector' = 'kafka',
>  'topic' = 'topic_sink',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'format' = 'json',
>  'json.fail-on-missing-field' = 'false',
>  'json.ignore-parse-errors' = 'true'
> );
> 
> 
> Then insert into TableSink with data from TableSource:
> INSERT INTO TableSink SELECT id, content FROM TableSource;
> 
> Then I use "kafka-console-producer.sh" to produce data below into topic 
> "topic_source" (TableSource):
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"name\":\"Jone\",\"age\":20}"}}
> 
> 
> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
> output is:
> {"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}
> 
> But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
> I want the the value of "content" is json object, not json string.
> 
> And what's more, the format of "content" in TableSource is not fixed, it can 
> be any json formated(or json array format) string, such as:
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> 
> So my question is, how can I transform json format string(like 
> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
> (like{"name":"Jone","age":20} ).
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
>