退订

2022-04-03 Thread 刘明迪
退订
















--

—

刘明迪  13126692977
2022年4月4日

Re:Re:Re: Could you please give me a hand about json object in flink sql

2022-04-03 Thread lixiongfeng
May be you can get  some inspiration  from JsonDeserializationSchema an 
JSONKeyValueDeserializationSchema.













At 2022-04-02 14:47:08, "wang" <24248...@163.com> wrote:

Hi,




Thanks so much for your support! 




But sorry to say I'm still confused about it. No matter what the udf looks 
like, the first thing I need confirm is the type of 'content' in TableSink, 
what's the type of it should be, should I use type Row, like this?




 CREATE TABLE TableSink (

  `id` STRING NOT NULL,

  `content` ROW

 )

 WITH (

 ...

);




This type is only suitable for source input {"schema": "schema_infos", 
"payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}




But the json key name and format of 'content' in source is variable, if the 
source input is 

{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




I should define `content` in TableSink with type `content` ROW, like this:




 CREATE TABLE TableSink (

  `id` STRING NOT NULL,

  `content` ROW

 )

 WITH (

 ...

);



And in input json also might contains json array, like: 
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}




So is there some common type I can use which can handle all input json formats? 
 


Thanks so much!!







Thanks && Regards,

Hunk

















At 2022-04-01 17:25:59, "Qingsheng Ren"  wrote:
>Hi, 
>
>I’m afraid you have to use a UDTF to parse the content and construct the final 
>json string manually. The key problem is that the field “content” is actually 
>a STRING, although it looks like a json object. Currently the json format 
>provided by Flink could not handle this kind of field defined as STRING. Also 
>considering the schema of this “content” field is not fixed across records, 
>Flink SQL can’t use one DDL to consume / produce records with changing schema. 
>
>Cheers,
>
>Qingsheng
>
>> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
>> 
>> Hi dear engineer,
>> 
>> Thanks so much for your precious time reading my email!
>> Resently I'm working on the Flink sql (with version 1.13) in my project and 
>> encountered one problem about json format data, hope you can take a look, 
>> thanks! Below is the description of my issue.
>> 
>> I use kafka as source and sink, I define kafka source table like this:
>> 
>>  CREATE TABLE TableSource (
>>   schema STRING,
>>   payload ROW(
>>   `id` STRING,
>>   `content` STRING
>>  )
>>  )
>>  WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'topic_source',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'properties.group.id' = 'all_gp',
>>  'scan.startup.mode' = 'group-offsets',
>>  'format' = 'json',
>>  'json.fail-on-missing-field' = 'false',
>>  'json.ignore-parse-errors' = 'true'
>>  );
>> 
>> Define the kafka sink table like this:
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` STRING NOT NULL
>>  )
>>  WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'topic_sink',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'format' = 'json',
>>  'json.fail-on-missing-field' = 'false',
>>  'json.ignore-parse-errors' = 'true'
>> );
>> 
>> 
>> Then insert into TableSink with data from TableSource:
>> INSERT INTO TableSink SELECT id, content FROM TableSource;
>> 
>> Then I use "kafka-console-producer.sh" to produce data below into topic 
>> "topic_source" (TableSource):
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> 
>> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
>> output is:
>> {"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}
>> 
>> But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
>> I want the the value of "content" is json object, not json string.
>> 
>> And what's more, the format of "content" in TableSource is not fixed, it can 
>> be any json formated(or json array format) string, such as:
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> 
>> So my question is, how can I transform json format string(like 
>> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
>> (like{"name":"Jone","age":20} ).
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>>  





 

Source with S3 bucket with millions ( billions ) of object ( small files )

2022-04-03 Thread Vishal Santoshi
Folks,
I am doing a simple batch job that uses readFile() with
"s3a://[bucket_name]" as the path with setNestedFileEnumeration(true). I am
a little curious about a few things.

In batch mode which I think is turned on by FileProcessingMode.PROCESS_ONCE
mode does the source list all the S3 objects in the bucket to create input
splits *before* it calls downstream operators ?




Thanks.


Re: RocksDB 读 cpu 100% 如何调优

2022-04-03 Thread Guo Thompson
基于yarn的多并行度,其实是落在不同的机器,当然,这么大的状态,RocksDB肯定会罗盘,是不是只有上SSD和多磁盘读写,靠硬件来优化了?

Yun Tang  于2022年4月2日周六 16:35写道:

> Hi,
>
> 200GB 这么大规模的单机state,其实没有什么很好的优化途径了,因为这个时候基本就得落盘,比拼的就是操作系统的page
> cache和磁盘的IO能力。
>
> 祝好
> 唐云
> 
> From: Guo Thompson 
> Sent: Tuesday, March 29, 2022 20:27
> To: user-zh 
> Subject: Re: RocksDB 读 cpu 100% 如何调优
>
> 如果rocksDB的状态很大呢?例如:200G,这种开了火焰图经常发现瓶颈也是在rocksDB的get(),这种有优化思路么?
>
> Yun Tang  于2022年3月21日周一 14:42写道:
>
> > Hi,
> >
> > RocksDB 的CPU栈能卡在100%,很有可能是大量解压缩 index/filter block导致的,可以enable partition
> > index/filter [1] 看看问题是否解决。
> > 相关内容也可以参考我之前线下做过的分享 [2]
> >
> >
> > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-memory-partitioned-index-filters
> >
> > [2] https://developer.aliyun.com/article/784995 《Flink 1.13,State
> Backend
> > 优化及生产实践分享》
> >
> > 祝好
> > 唐云
> >
> > 
> > From: Peihui He 
> > Sent: Friday, March 18, 2022 20:16
> > To: user-zh@flink.apache.org 
> > Subject: Re: RocksDB 读 cpu 100% 如何调优
> >
> > OK,我这边加个metric,先观察下
> >
> > yue ma  于2022年3月18日周五 12:23写道:
> >
> > > hi
> > > 我觉得这里可以注意两地方
> > > 首先 你可以观察一下这个时候 task 的吞吐量是多少 ,如果 qps 特别高 ,比如作业重最旧的offset 消费,我觉得这个时候 cpu
> > 100%
> > > 是符合预期的。
> > > 其次 你可以在代码中加一些内存缓存的逻辑 类似于 mini-batch, 来减少和 state 交互的频率,也许这样能缓解一部分问题。
> > >
> > > deng xuezhao  于2022年3月18日周五 11:19写道:
> > >
> > > > 退订
> > > >
> > > >
> > > >
> > > > 在 Peihui He ,2022年3月18日 上午11:18写道:
> > > >
> > > > Hi, all
> > > >
> > > > 如题,flink 任务使用rocksdb 做为状态后端,任务逻辑大概意思是:
> > > > 来一条数据先判断该数据的key 是否再mapstat 中, 然后再将该key 写入mapstat中。
> > > >
> > > > 产生问题是当数据跑一段时间后,判断是否存在线程cpu总是100%,堆栈如下:
> > > >
> > > > "process (6/18)#0" Id=80 RUNNABLE (in native)
> > > > at org.rocksdb.RocksDB.get(Native Method)
> > > > at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:173)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)
> > > > at
> > > >
> > > >
> > >
> >
> com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:156)
> > > > at
> > > >
> > > >
> > >
> >
> com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:145)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> > > > at
> > > > org.apache.flink.streaming.runtime.io
> > > >
> > >
> >
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> > > > at
> > > > org.apache.flink.streaming.runtime.io
> > > >
> > >
> >
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> > > > at
> > > > org.apache.flink.streaming.runtime.io
> > > >
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$624/715942770.runDefaultAction(Unknown
> > > > Source)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.taskmanager.Task$$Lambda$773/520411616.run(Unknown
> > > > Source)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> > > > at
> > > >
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> > > > at java.lang.Thread.run(Thread.java:748)
> > > >
> > > > 但是看checkpoint数据,才100m左右
> > > >
> > > > 请问大家 rocksdb 是出现什么性能瓶颈了呢? 改怎么调优呢?
> > > >
> > >
> >
>