回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-07 文章 阿华田
 获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re:Re:flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 iasiuide
好的,已经贴了sql片段

在 2024-03-08 11:02:34,"Xuyang"  写道:
>Hi, 你的图挂了,可以用图床或者直接贴SQL
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>在 2024-03-08 10:54:19,"iasiuide"  写道:
>
>
>
>
>
>下面的sql片段中
>ods_ymfz_prod_sys_divide_order  为kafka source表
>dim_ymfz_prod_sys_trans_log   为mysql为表
>dim_ptfz_ymfz_merchant_info   为mysql为表
>
>
>
>flink web ui界面的执行计划片段如下:
>
> [1]:TableSourceScan(table=[[default_catalog, default_database, 
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
>+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
>IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
>divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time 
>IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], 
>where=[((order_state = '2') AND (divide_fee_amt  0) AND (sys_date = 
>DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))])
>   +- 
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
> bg_rel_trans_id, pay_type, member_id, mer_name])
>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
> +- 
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 
> 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, pk_id, agent_id, bagent_id])
>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, agent_id, bagent_id])
>   +- 
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
> fagent_id0])
> +- 
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
> bagent_name])
>  
>
>
>为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
>(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
>lookup=[bg_rel_trans_id=bg_rel_trans_id],
>关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
>c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
>lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
>关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
>(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
>中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
>关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
>
>
>
>
>


Re:Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 iasiuide
你好,我们用的是1.13.2和1.15.4版本的,看了下flink ui,这两种版本针对下面sql片段的lookup执行计划中的关联维表条件是一样的


在 2024-03-08 11:08:51,"Yu Chen"  写道:
>Hi iasiuide,
>方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc 
>connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。
>
>[1] https://issues.apache.org/jira/browse/FLINK-33365
>
>祝好~
>
>> 2024年3月8日 11:02,iasiuide  写道:
>> 
>> 
>> 
>> 
>> 图片可能加载不出来,下面是图片中的sql片段 
>> ..
>> END AS trans_type,
>> 
>>  a.div_fee_amt,
>> 
>>  a.ts
>> 
>>FROM
>> 
>>  ods_ymfz_prod_sys_divide_order a
>> 
>>  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
>> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
>> 
>>  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS c ON b.member_id = c.pk_id
>> 
>>  AND c.data_source = 'merch'
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS d ON c.agent_id = d.pk_id
>> 
>>  AND (
>> 
>>d.data_source = 'ex_agent'
>> 
>>OR d.data_source = 'agent'
>> 
>>  ) 
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS d1 ON d.fagent_id = d1.pk_id
>> 
>>  AND d1.data_source = 'agent'
>> 
>>WHERE 
>> 
>>  a.order_state = '2' 
>> 
>>  AND a.divide_fee_amt > 0
>> 
>>  ) dat
>> 
>> WHERE
>> 
>>  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')
>> 
>>  AND CHAR_LENGTH(member_id) > 1;
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2024-03-08 10:54:19,"iasiuide"  写道:
>> 
>> 
>> 
>> 
>> 
>> 下面的sql片段中
>> ods_ymfz_prod_sys_divide_order  为kafka source表
>> dim_ymfz_prod_sys_trans_log   为mysql为表
>> dim_ptfz_ymfz_merchant_info   为mysql为表
>> 
>> 
>> 
>> flink web ui界面的执行计划片段如下:
>> 
>> [1]:TableSourceScan(table=[[default_catalog, default_database, 
>> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
>> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
>> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
>> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
>> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
>> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
>> divide_fee_amt), divide_fee_amt) AS div_fee_amt, 
>> Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time 
>> AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt 
>>  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
>> TIMESTAMP(9)), '-MM-dd')))])
>>   +- 
>> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
>> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
>> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
>> bg_rel_trans_id, pay_type, member_id, mer_name])
>>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
>> +- 
>> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source 
>> = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name, pk_id, agent_id, bagent_id])
>>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
>> pay_type, member_id, mer_name, agent_id, bagent_id])
>>   +- 
>> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
>> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
>> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
>> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
>> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
>> fagent_id0])
>> +- 
>> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
>> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
>> bagent_name])
>>  
>> 
>> 
>> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
>> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
>> lookup=[bg_rel_trans_id=bg_rel_trans_id],
>> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
>> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
>> 

Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 Yu Chen
Hi iasiuide,
方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc 
connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。

[1] https://issues.apache.org/jira/browse/FLINK-33365

祝好~

> 2024年3月8日 11:02,iasiuide  写道:
> 
> 
> 
> 
> 图片可能加载不出来,下面是图片中的sql片段 
> ..
> END AS trans_type,
> 
>  a.div_fee_amt,
> 
>  a.ts
> 
>FROM
> 
>  ods_ymfz_prod_sys_divide_order a
> 
>  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
> 
>  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS c ON b.member_id = c.pk_id
> 
>  AND c.data_source = 'merch'
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS d ON c.agent_id = d.pk_id
> 
>  AND (
> 
>d.data_source = 'ex_agent'
> 
>OR d.data_source = 'agent'
> 
>  ) 
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS d1 ON d.fagent_id = d1.pk_id
> 
>  AND d1.data_source = 'agent'
> 
>WHERE 
> 
>  a.order_state = '2' 
> 
>  AND a.divide_fee_amt > 0
> 
>  ) dat
> 
> WHERE
> 
>  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')
> 
>  AND CHAR_LENGTH(member_id) > 1;
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-03-08 10:54:19,"iasiuide"  写道:
> 
> 
> 
> 
> 
> 下面的sql片段中
> ods_ymfz_prod_sys_divide_order  为kafka source表
> dim_ymfz_prod_sys_trans_log   为mysql为表
> dim_ptfz_ymfz_merchant_info   为mysql为表
> 
> 
> 
> flink web ui界面的执行计划片段如下:
> 
> [1]:TableSourceScan(table=[[default_catalog, default_database, 
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
> divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time 
> IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], 
> where=[((order_state = '2') AND (divide_fee_amt  0) AND (sys_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))])
>   +- 
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
> bg_rel_trans_id, pay_type, member_id, mer_name])
>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
> +- 
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 
> 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, pk_id, agent_id, bagent_id])
>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, agent_id, bagent_id])
>   +- 
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
> fagent_id0])
> +- 
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
> bagent_name])
>  
> 
> 
> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
> (d.data_source = 'ex_agent' OR d.data_source = 'agent') 
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
> 

Re:flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 Xuyang
Hi, 你的图挂了,可以用图床或者直接贴SQL




--

Best!
Xuyang




在 2024-03-08 10:54:19,"iasiuide"  写道:





下面的sql片段中
ods_ymfz_prod_sys_divide_order  为kafka source表
dim_ymfz_prod_sys_trans_log   为mysql为表
dim_ptfz_ymfz_merchant_info   为mysql为表



flink web ui界面的执行计划片段如下:

 [1]:TableSourceScan(table=[[default_catalog, default_database, 
ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, 
Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS 
div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, 
CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND 
(divide_fee_amt  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
TIMESTAMP(9)), '-MM-dd')))])
   +- 
[3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
 joinType=[LeftOuterJoin], async=[false], 
lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
bg_rel_trans_id, pay_type, member_id, mer_name])
  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
 +- 
[5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', 
pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, 
bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, agent_id, bagent_id])
   +- 
[7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, 
agent_id, bagent_id, pk_id, bagent_id, fagent_id])
  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
 +- 
[9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', 
pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, 
bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
  


为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。







Re:flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 iasiuide



图片可能加载不出来,下面是图片中的sql片段 
 ..
 END AS trans_type,

  a.div_fee_amt,

  a.ts

FROM

  ods_ymfz_prod_sys_divide_order a

  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id

  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')

  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
AS c ON b.member_id = c.pk_id

  AND c.data_source = 'merch'

  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
AS d ON c.agent_id = d.pk_id

  AND (

d.data_source = 'ex_agent'

OR d.data_source = 'agent'

  ) 

  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
AS d1 ON d.fagent_id = d1.pk_id

  AND d1.data_source = 'agent'

WHERE 

  a.order_state = '2' 

  AND a.divide_fee_amt > 0

  ) dat

WHERE

  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')

  AND CHAR_LENGTH(member_id) > 1;













在 2024-03-08 10:54:19,"iasiuide"  写道:





下面的sql片段中
ods_ymfz_prod_sys_divide_order  为kafka source表
dim_ymfz_prod_sys_trans_log   为mysql为表
dim_ptfz_ymfz_merchant_info   为mysql为表



flink web ui界面的执行计划片段如下:

 [1]:TableSourceScan(table=[[default_catalog, default_database, 
ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, 
Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS 
div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, 
CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND 
(divide_fee_amt  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
TIMESTAMP(9)), '-MM-dd')))])
   +- 
[3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
 joinType=[LeftOuterJoin], async=[false], 
lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
bg_rel_trans_id, pay_type, member_id, mer_name])
  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
 +- 
[5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', 
pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, 
bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, agent_id, bagent_id])
   +- 
[7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, 
agent_id, bagent_id, pk_id, bagent_id, fagent_id])
  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
 +- 
[9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', 
pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, 
bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
  


为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。







flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 iasiuide




下面的sql片段中
ods_ymfz_prod_sys_divide_order  为kafka source表
dim_ymfz_prod_sys_trans_log   为mysql为表
dim_ptfz_ymfz_merchant_info   为mysql为表



flink web ui界面的执行计划片段如下:

 [1]:TableSourceScan(table=[[default_catalog, default_database, 
ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, 
Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS 
div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, 
CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND 
(divide_fee_amt  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
TIMESTAMP(9)), '-MM-dd')))])
   +- 
[3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
 joinType=[LeftOuterJoin], async=[false], 
lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
bg_rel_trans_id, pay_type, member_id, mer_name])
  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
 +- 
[5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', 
pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, 
bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, agent_id, bagent_id])
   +- 
[7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, 
agent_id, bagent_id, pk_id, bagent_id, fagent_id])
  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
 +- 
[9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', 
pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, 
bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
  


为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。







Re: Re:RE: RE: flink cdc动态加表不生效

2024-03-07 文章 Hongshun Wang
Hi, casel chan,
社区已经对增量框架实现动态加表(https://github.com/apache/flink-cdc/pull/3024
),预计3.1对mongodb和postgres暴露出来,但是Oracle和Sqlserver目前并没暴露,你可以去社区参照这两个框架,将参数打开,并且测试和适配。
Best,
Hongshun


Re:Window properties can only be used on windowed tables

2024-03-07 文章 周尹
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
ListPerson list = new ArrayList();list.add(new 
Person("Fred",35));list.add(new Person("Wilma",35));
list.add(new Person("Pebbles",2));DataStreamPerson flintstones 
= env.fromCollection(list);// 为数据流定义事件时间属性
DataStreamPerson flintstonesWithTime = 
flintstones.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractorPerson(Time.seconds(0)) {   
 @Overridepublic long extractTimestamp(Person element) {  
  // 这里假设您的数据流中的每个元素都包含一个时间戳字段,您可以根据实际情况进行修改return 
System.currentTimeMillis(); // 也可以使用您的数据中的时间字段}});   
 // 将DataStream转换为Table时定义窗口Table table = 
tEnv.fromDataStream(flintstonesWithTime, $("name"), $("age"), 
$("eventTime").rowtime());Table select = 
table.window(Tumble.over(lit(10).seconds()).on($("eventTime")).as("w"))
.groupBy($("name"), $("w")).select($("name"), 
$("age").sum());tEnv.toAppendStream(select, 
Row.class).print();env.execute("Flink Window 
Example");}public static class Person{public String 
name;public Integer age;public Person(){}public 
Person(String name,Integer age){this.name = name;
this.age = age;}public String toString(){return 
this.name.toString()+":age "+this.age.toString();}}
At 2024-03-08 09:28:10, "ha.fen...@aisino.com"  wrote:
>public static void main(String[] args) {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>List list = new ArrayList<>();
>list.add(new Persion("Fred",35));
>list.add(new Persion("wilma",35));
>list.add(new Persion("Pebbles",2));
>DataStream flintstones = env.fromCollection(list);
>Table table = tEnv.fromDataStream(flintstones);
>Table select = table.select($("name"), $("age"), 
> $("addtime").proctime());
>Table select1 = select.window(
>Tumble.over(lit(10).second())
>.on($("addtime"))
>.as("w"))
>.groupBy($("name"), $("w"))
>.select($("name"), $("age").sum());
>select1.execute().print();
>
>}
>
>public static class Persion{
>public String name;
>public Integer age;
>public Persion(){}
>public Persion(String name,Integer age){
>this.name = name;
>this.age = age;
>}
>public String toString(){
>return this.name.toString()+":age "+this.age.toString();
>}
>}
>
>提示Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Window properties can only be used on windowed tables
>是哪里错了?


Re:Window properties can only be used on windowed tables

2024-03-07 文章 周尹
在非窗口化的表上使用窗口属性
At 2024-03-08 09:28:10, "ha.fen...@aisino.com"  wrote:
>public static void main(String[] args) {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>List list = new ArrayList<>();
>list.add(new Persion("Fred",35));
>list.add(new Persion("wilma",35));
>list.add(new Persion("Pebbles",2));
>DataStream flintstones = env.fromCollection(list);
>Table table = tEnv.fromDataStream(flintstones);
>Table select = table.select($("name"), $("age"), 
> $("addtime").proctime());
>Table select1 = select.window(
>Tumble.over(lit(10).second())
>.on($("addtime"))
>.as("w"))
>.groupBy($("name"), $("w"))
>.select($("name"), $("age").sum());
>select1.execute().print();
>
>}
>
>public static class Persion{
>public String name;
>public Integer age;
>public Persion(){}
>public Persion(String name,Integer age){
>this.name = name;
>this.age = age;
>}
>public String toString(){
>return this.name.toString()+":age "+this.age.toString();
>}
>}
>
>提示Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Window properties can only be used on windowed tables
>是哪里错了?


Re:Window properties can only be used on windowed tables

2024-03-07 文章 Xuyang
Hi, fengqi.
这看起来像是select语句中,不能直接使用非来源于window 
agg的proctime或者event函数。目前不确定这是不是预期行为,方便的话可以在社区jira[1]上提一个bug看看。
快速绕过的话,可以试试下面的代码:



DataStream flintstones = env.fromCollection(list); // Table select = 
table.select($("name"), $("age"), $("addtime").proctime()); Table table = 
tEnv.fromDataStream( flintstones, Schema.newBuilder() .column("name", "string") 
.column("age", "int") .columnByExpression("addtime", "proctime()") .build()); 
Table select1 = // select.xxx 
table.window(Tumble.over(lit(10).second()).on($("addtime")).as("w")) 
.groupBy($("name"), $("w")) .select($("name"), $("age").sum()); 
select1.execute().print();


[1] https://issues.apache.org/jira/projects/FLINK/issues

--

Best!
Xuyang





At 2024-03-08 09:28:10, "ha.fen...@aisino.com"  wrote:
>public static void main(String[] args) {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>List list = new ArrayList<>();
>list.add(new Persion("Fred",35));
>list.add(new Persion("wilma",35));
>list.add(new Persion("Pebbles",2));
>DataStream flintstones = env.fromCollection(list);
>Table table = tEnv.fromDataStream(flintstones);
>Table select = table.select($("name"), $("age"), 
> $("addtime").proctime());
>Table select1 = select.window(
>Tumble.over(lit(10).second())
>.on($("addtime"))
>.as("w"))
>.groupBy($("name"), $("w"))
>.select($("name"), $("age").sum());
>select1.execute().print();
>
>}
>
>public static class Persion{
>public String name;
>public Integer age;
>public Persion(){}
>public Persion(String name,Integer age){
>this.name = name;
>this.age = age;
>}
>public String toString(){
>return this.name.toString()+":age "+this.age.toString();
>}
>}
>
>提示Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Window properties can only be used on windowed tables
>是哪里错了?


使用avro schema注册confluent schema registry失败

2024-03-07 文章 casel.chen
我使用注册kafka topic对应的schema到confluent schema registry时报错,想知道问题的原因是什么?如何fix?




io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Schema being registered is incompatible with an earlier schema for subject 
"rtdp_test-test_schema-value", details: 
[{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 
'salary' at path '/fields/10/type/fields/5' in the new schema has no default 
value and is missing in the old schema', additionalInfo:'salary'}, 
{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 
'salary' at path '/fields/11/type/fields/5' in the new schema has no default 
value and is missing in the old schema', additionalInfo:'salary'}, 
{oldSchemaVersion: 4}, {oldSchema: 
'{"type":"record","name":"Envelope","namespace":"rtdp_test-test_schema","fields":[{"name":"database","type":"string"},{"name":"es","type":"long"},{"name":"id","type":"int"},{"name":"isDdl","type":"boolean"},{"name":"sql","type":"string"},{"name":"table","type":"string"},{"name":"ts","type":"long"},{"name":"type","type":"string"},{"name":"pkNames","type":{"type":"array","items":"string"}},{"name":"data","type":[{"type":"array","items":{"type":"record","name":"Value","fields":[{"name":"id","type":["long","null"],"default":0},{"name":"create_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"update_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"name","type":["string","null"],"default":""},{"name":"gender","type":["string","null"],"default":""}]}},"null"]},{"name":"mysqlType","type":{"type":"record","name":"mysqlType","fields":[{"name":"id","type":"string"},{"name":"create_time","type":"string"},{"name":"update_time","type":"string"},{"name":"name","type":"string"},{"name":"gender","type":"string"}]}},{"name":"sqlType","type":{"type":"record","name":"sqlType","fields":[{"name":"id","type":"int"},{"name":"create_time","type":"int"},{"name":"update_time","type":"int"},{"name":"name","type":"int"},{"name":"gender","type":"int"}]}},{"name":"old","type":[{"type":"array","items":"Value"},"null"]}]}'},
 {validateFields: 'false', compatibility: 'BACKWARD'}]; error code: 409

at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)

at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)




maven依赖:



io.confluent

kafka-schema-registry-client

7.3.1






java代码:

Schema decimalSchema = LogicalTypes.decimal(precision, 
scale).addToSchema(SchemaBuilder.builder().bytesType());

data = 
data.name(columnName).type().unionOf().nullType().and().type(decimalSchema).endUnion().nullDefault();




salary字段是decimal类型,报错是说之前有一个不带salary字段版本的schema,而新版本schema里该salary字段定义中缺少default
 value,可我明明设置了nullDefault呀,这一点从生成的avro schema json string也可验证:




{

"type": "record",

"name": "Envelope",

"namespace": "rtdp_test-test_schema",

"fields": [

{

"name": "database",

"type": "string"

},

{

"name": "es",

"type": "long"

},

{

"name": "id",

"type": "int"

},

{

"name": "isDdl",

"type": "boolean"

},

{

"name": "sql",

"type": "string"

},

{

"name": "table",

"type": "string"

},

{

"name": "ts",

"type": "long"

},

{

"name": "type",

"type": "string"

},

{

"name": "pkNames",

"type": {

"type": "array",

"items": "string"

}

},

{

"name": "data",

"type": [

{

"type": "array",

"items": {

"type": "record",

"name": "Value",

"fields": [

{

"name": "id",

"type": [

"null",

"long"

],

"default": null

},

{

"name": "create_time",

"type": [

"null",

{

"type": "long",

"logicalType": "timestamp-millis"

}

],

"default": null

},

{