退订
退订 -- — 刘明迪 13126692977 2022年4月4日
Re:Re:Re: Could you please give me a hand about json object in flink sql
May be you can get some inspiration from JsonDeserializationSchema an JSONKeyValueDeserializationSchema. At 2022-04-02 14:47:08, "wang" <24248...@163.com> wrote: Hi, Thanks so much for your support! But sorry to say I'm still confused about it. No matter what the udf looks like, the first thing I need confirm is the type of 'content' in TableSink, what's the type of it should be, should I use type Row, like this? CREATE TABLE TableSink ( `id` STRING NOT NULL, `content` ROW ) WITH ( ... ); This type is only suitable for source input {"schema": "schema_infos", "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}} But the json key name and format of 'content' in source is variable, if the source input is {"schema": "schema_infos", "payload": {"id": "1", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} I should define `content` in TableSink with type `content` ROW, like this: CREATE TABLE TableSink ( `id` STRING NOT NULL, `content` ROW ) WITH ( ... ); And in input json also might contains json array, like: {"schema": "schema_infos", "payload": {"id": "1", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}} So is there some common type I can use which can handle all input json formats? Thanks so much!! Thanks && Regards, Hunk At 2022-04-01 17:25:59, "Qingsheng Ren" wrote: >Hi, > >I’m afraid you have to use a UDTF to parse the content and construct the final >json string manually. The key problem is that the field “content” is actually >a STRING, although it looks like a json object. Currently the json format >provided by Flink could not handle this kind of field defined as STRING. Also >considering the schema of this “content” field is not fixed across records, >Flink SQL can’t use one DDL to consume / produce records with changing schema. > >Cheers, > >Qingsheng > >> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote: >> >> Hi dear engineer, >> >> Thanks so much for your precious time reading my email! >> Resently I'm working on the Flink sql (with version 1.13) in my project and >> encountered one problem about json format data, hope you can take a look, >> thanks! Below is the description of my issue. >> >> I use kafka as source and sink, I define kafka source table like this: >> >> CREATE TABLE TableSource ( >> schema STRING, >> payload ROW( >> `id` STRING, >> `content` STRING >> ) >> ) >> WITH ( >> 'connector' = 'kafka', >> 'topic' = 'topic_source', >> 'properties.bootstrap.servers' = 'localhost:9092', >> 'properties.group.id' = 'all_gp', >> 'scan.startup.mode' = 'group-offsets', >> 'format' = 'json', >> 'json.fail-on-missing-field' = 'false', >> 'json.ignore-parse-errors' = 'true' >> ); >> >> Define the kafka sink table like this: >> >> CREATE TABLE TableSink ( >> `id` STRING NOT NULL, >> `content` STRING NOT NULL >> ) >> WITH ( >> 'connector' = 'kafka', >> 'topic' = 'topic_sink', >> 'properties.bootstrap.servers' = 'localhost:9092', >> 'format' = 'json', >> 'json.fail-on-missing-field' = 'false', >> 'json.ignore-parse-errors' = 'true' >> ); >> >> >> Then insert into TableSink with data from TableSource: >> INSERT INTO TableSink SELECT id, content FROM TableSource; >> >> Then I use "kafka-console-producer.sh" to produce data below into topic >> "topic_source" (TableSource): >> {"schema": "schema_infos", "payload": {"id": "1", "content": >> "{\"name\":\"Jone\",\"age\":20}"}} >> >> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the >> output is: >> {"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"} >> >> But what I want here is {"id":"1","content": {"name":"Jone","age":20}} >> I want the the value of "content" is json object, not json string. >> >> And what's more, the format of "content" in TableSource is not fixed, it can >> be any json formated(or json array format) string, such as: >> {"schema": "schema_infos", "payload": {"id": "1", "content": >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}} >> >> >> So my question is, how can I transform json format string(like >> "{\"name\":\"Jone\",\"age\":20}") from TableSource to json object >> (like{"name":"Jone","age":20} ). >> >> >> Thanks && Regards, >> Hunk >> >> >> >> >>
Source with S3 bucket with millions ( billions ) of object ( small files )
Folks, I am doing a simple batch job that uses readFile() with "s3a://[bucket_name]" as the path with setNestedFileEnumeration(true). I am a little curious about a few things. In batch mode which I think is turned on by FileProcessingMode.PROCESS_ONCE mode does the source list all the S3 objects in the bucket to create input splits *before* it calls downstream operators ? Thanks.
Re: RocksDB 读 cpu 100% 如何调优
基于yarn的多并行度,其实是落在不同的机器,当然,这么大的状态,RocksDB肯定会罗盘,是不是只有上SSD和多磁盘读写,靠硬件来优化了? Yun Tang 于2022年4月2日周六 16:35写道: > Hi, > > 200GB 这么大规模的单机state,其实没有什么很好的优化途径了,因为这个时候基本就得落盘,比拼的就是操作系统的page > cache和磁盘的IO能力。 > > 祝好 > 唐云 > > From: Guo Thompson > Sent: Tuesday, March 29, 2022 20:27 > To: user-zh > Subject: Re: RocksDB 读 cpu 100% 如何调优 > > 如果rocksDB的状态很大呢?例如:200G,这种开了火焰图经常发现瓶颈也是在rocksDB的get(),这种有优化思路么? > > Yun Tang 于2022年3月21日周一 14:42写道: > > > Hi, > > > > RocksDB 的CPU栈能卡在100%,很有可能是大量解压缩 index/filter block导致的,可以enable partition > > index/filter [1] 看看问题是否解决。 > > 相关内容也可以参考我之前线下做过的分享 [2] > > > > > > [1] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-memory-partitioned-index-filters > > > > [2] https://developer.aliyun.com/article/784995 《Flink 1.13,State > Backend > > 优化及生产实践分享》 > > > > 祝好 > > 唐云 > > > > > > From: Peihui He > > Sent: Friday, March 18, 2022 20:16 > > To: user-zh@flink.apache.org > > Subject: Re: RocksDB 读 cpu 100% 如何调优 > > > > OK,我这边加个metric,先观察下 > > > > yue ma 于2022年3月18日周五 12:23写道: > > > > > hi > > > 我觉得这里可以注意两地方 > > > 首先 你可以观察一下这个时候 task 的吞吐量是多少 ,如果 qps 特别高 ,比如作业重最旧的offset 消费,我觉得这个时候 cpu > > 100% > > > 是符合预期的。 > > > 其次 你可以在代码中加一些内存缓存的逻辑 类似于 mini-batch, 来减少和 state 交互的频率,也许这样能缓解一部分问题。 > > > > > > deng xuezhao 于2022年3月18日周五 11:19写道: > > > > > > > 退订 > > > > > > > > > > > > > > > > 在 Peihui He ,2022年3月18日 上午11:18写道: > > > > > > > > Hi, all > > > > > > > > 如题,flink 任务使用rocksdb 做为状态后端,任务逻辑大概意思是: > > > > 来一条数据先判断该数据的key 是否再mapstat 中, 然后再将该key 写入mapstat中。 > > > > > > > > 产生问题是当数据跑一段时间后,判断是否存在线程cpu总是100%,堆栈如下: > > > > > > > > "process (6/18)#0" Id=80 RUNNABLE (in native) > > > > at org.rocksdb.RocksDB.get(Native Method) > > > > at org.rocksdb.RocksDB.get(RocksDB.java:2084) > > > > at > > > > > > > > > > > > > > org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:173) > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72) > > > > at > > > > > > > > > > > > > > com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:156) > > > > at > > > > > > > > > > > > > > com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:145) > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > > > > at > > > > org.apache.flink.streaming.runtime.io > > > > > > > > > > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > > > > at > > > > org.apache.flink.streaming.runtime.io > > > > > > > > > > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > > > > at > > > > org.apache.flink.streaming.runtime.io > > > > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$624/715942770.runDefaultAction(Unknown > > > > Source) > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > > > > at > > > > > > > > > > org.apache.flink.runtime.taskmanager.Task$$Lambda$773/520411616.run(Unknown > > > > Source) > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > > > > at > > > > > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > > > 但是看checkpoint数据,才100m左右 > > > > > > > > 请问大家 rocksdb 是出现什么性能瓶颈了呢? 改怎么调优呢? > > > > > > > > > >