Re: flink jdbc source oom

2022-03-31 Thread r pp
hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了


Could you please give me a hand about json object in flink sql

2022-03-31 Thread wang
Hi dear engineer,


Thanks so much for your precious time reading my email!
Resently I'm working on the Flink sql (with version 1.13) in my project and 
encountered one problem about json format data, hope you can take a look, 
thanks! Below is the description of my issue.


I use kafka as source and sink, I define kafka source table like this:


 CREATE TABLE TableSource (
  schema STRING,
  payload ROW(
  `id` STRING,
  `content` STRING
 )
 )
 WITH (
 'connector' = 'kafka',
 'topic' = 'topic_source',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'all_gp',
 'scan.startup.mode' = 'group-offsets',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
 );


Define the kafka sink table like this:


 CREATE TABLE TableSink (
  `id` STRING NOT NULL,
  `content` STRING NOT NULL
 )
 WITH (
 'connector' = 'kafka',
 'topic' = 'topic_sink',
 'properties.bootstrap.servers' = 'localhost:9092',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
);




Then insert into TableSink with data from TableSource:
INSERT INTO TableSink SELECT id, content FROM TableSource;


Then I use "kafka-console-producer.sh" to produce data below into topic 
"topic_source" (TableSource):
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"name\":\"Jone\",\"age\":20}"}}




Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
output is:
{"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}


But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
I want the the value of "content" is json object, not json string.


And what's more, the format of "content" in TableSource is not fixed, it can be 
any json formated(or json array format) string, such as:
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




So my question is, how can I transform json format string(like 
"{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
(like{"name":"Jone","age":20} ).




Thanks && Regards,
Hunk



Re: flink jdbc source oom

2022-03-31 Thread Guo Thompson
难道条件还不会下推么?

Peihui He  于2022年3月31日周四 10:33写道:

> Hi, all
>
> 请教下大家,使用flink jdbc 读取tidb中数据时如何在查询的时候能否根据条件在数据库层面做一些过滤呢?
> 当数据量很大比如几千万上亿的话,flink jdbc source 就很无力了。
>
>
> Best Regards!
>


Re: flinksql 使用kafka connector (自定义的cdc格式) 消费删除语义的数据没有纳入计算

2022-03-31 Thread Guo Thompson
看不到图

赵旭晨  于2022年3月15日周二 12:25写道:

> flink版本:1.14.3   场景如下:
> sql:
> set table.exec.state.ttl=1 day;
> describe t_k_chargeorder;
> describe t_k_appointment;
> SELECT
> ReportTime,
> sum( InsertAppointmentCount ) + sum( InsertChargeOrderCount )
> kpitotalcount,
> sum( InsertActualPriceCount ) InsertActualPriceCount,
> sum( InsertAppointmentCount ) InsertAppointmentCount,
> sum( InsertChargeOrderCount ) InsertChargeOrderCount,
> now() LastUpdatedDT
> from
> (
> SELECT
> DATE_FORMAT( recordcreatedtime, '-MM-dd' ) ReportTime,
> sum( actualprice ) InsertActualPriceCount,
> 0 InsertShortMessageCount,
> 0 InsertAppointmentCount,
> 0 InsertImageCount,
> 0 InsertChargeOrderCount,
> 0 InsertPerioExamCount,
> 0 InsertMedicalCount,
> 0 InsertPatientCount,
> 0 InsertGeneralExamCount,
> 0 InsertFollowupCount
> FROM
> --effective_chargeorder t
> (SELECT
> o.recordcreatedtime,
> o.recordcreateduser,
> o.status,
> o._is_delete,
> o.appointmentid,
> o.id,
> o.tenantid,
> o.actualprice,
> o.proc_time,
> t.Name,
> t.IsInactive
> FROM
> t_k_chargeorder AS o
> INNER JOIN t_dental_tenant FOR SYSTEM_TIME AS OF o.proc_time AS t
> ON o.tenantid = t.Id
> WHERE
> t.IsInactive = '0'
> AND o.recordcreateduser > 0
> AND o.status NOT IN ( '已作废', '未收费', '作废并撤回', '等待支付宝付费', '等待微信付费' )
> AND o._is_delete = '0'
> AND o.appointmentid > 0) t
> WHERE
> recordcreatedtime BETWEEN concat( DATE_FORMAT( now() , '-MM-dd' ),
> ' 00:00:00' )
> AND now()
> GROUP BY
> DATE_FORMAT( recordcreatedtime, '-MM-dd' )
> ) a
> group by ReportTime;
>
> DAG图如下:
> 业务库的新增、修改操作都能监听到,并给出正确结果。
> 但只要是删除语义,kafka的cdc format能消费到删除数据
>
> 但sql计算结果却没有作相应的扣减,如下:
> 删除后应该由150>100,但什么也没有发生,感觉是内部算子把这条-D给过滤了
> 恳请大佬解惑~~
>
>
>
>
>
>
>


Re: flink jdbc source oom

2022-03-31 Thread Lincoln Lee
@Peihui  当前社区的 jdbc table source 实现了这些接口:
ScanTableSource,
LookupTableSource,
SupportsProjectionPushDown,
SupportsLimitPushDown

其中 lookup table source 用于维表的 kv lookup 查询,  scan table source 支持了
projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
filter/aggregate pushdown 满足前置过滤需求


Best,
Lincoln Lee


r pp  于2022年3月31日周四 18:40写道:

> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
>


Re:Re: flink jdbc source oom

2022-03-31 Thread Michael Ran
这个当初提过自定义SQL 数据集,但是社区否定了这种做法- -,但是从功能上来说,我们也是实现的自定义SQL结果集,进行join 
之类的操作,在大数据集,以及一些数据排序、剔除重复等场景有一定优势
在 2022-04-01 10:12:55,"Lincoln Lee"  写道:
>@Peihui  当前社区的 jdbc table source 实现了这些接口:
>ScanTableSource,
>LookupTableSource,
>SupportsProjectionPushDown,
>SupportsLimitPushDown
>
>其中 lookup table source 用于维表的 kv lookup 查询,  scan table source 支持了
>projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
>filter/aggregate pushdown 满足前置过滤需求
>
>
>Best,
>Lincoln Lee
>
>
>r pp  于2022年3月31日周四 18:40写道:
>
>> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
>>