Re:Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 iasiuide
你好,我们用的是1.13.2和1.15.4版本的,看了下flink ui,这两种版本针对下面sql片段的lookup执行计划中的关联维表条件是一样的


在 2024-03-08 11:08:51,"Yu Chen"  写道:
>Hi iasiuide,
>方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc 
>connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。
>
>[1] https://issues.apache.org/jira/browse/FLINK-33365
>
>祝好~
>
>> 2024年3月8日 11:02,iasiuide  写道:
>> 
>> 
>> 
>> 
>> 图片可能加载不出来,下面是图片中的sql片段 
>> ..
>> END AS trans_type,
>> 
>>  a.div_fee_amt,
>> 
>>  a.ts
>> 
>>FROM
>> 
>>  ods_ymfz_prod_sys_divide_order a
>> 
>>  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
>> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
>> 
>>  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS c ON b.member_id = c.pk_id
>> 
>>  AND c.data_source = 'merch'
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS d ON c.agent_id = d.pk_id
>> 
>>  AND (
>> 
>>d.data_source = 'ex_agent'
>> 
>>OR d.data_source = 'agent'
>> 
>>  ) 
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS d1 ON d.fagent_id = d1.pk_id
>> 
>>  AND d1.data_source = 'agent'
>> 
>>WHERE 
>> 
>>  a.order_state = '2' 
>> 
>>  AND a.divide_fee_amt > 0
>> 
>>  ) dat
>> 
>> WHERE
>> 
>>  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')
>> 
>>  AND CHAR_LENGTH(member_id) > 1;
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2024-03-08 10:54:19,"iasiuide"  写道:
>> 
>> 
>> 
>> 
>> 
>> 下面的sql片段中
>> ods_ymfz_prod_sys_divide_order  为kafka source表
>> dim_ymfz_prod_sys_trans_log   为mysql为表
>> dim_ptfz_ymfz_merchant_info   为mysql为表
>> 
>> 
>> 
>> flink web ui界面的执行计划片段如下:
>> 
>> [1]:TableSourceScan(table=[[default_catalog, default_database, 
>> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
>> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
>> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
>> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
>> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
>> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
>> divide_fee_amt), divide_fee_amt) AS div_fee_amt, 
>> Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time 
>> AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt 
>>  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
>> TIMESTAMP(9)), '-MM-dd')))])
>>   +- 
>> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
>> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
>> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
>> bg_rel_trans_id, pay_type, member_id, mer_name])
>>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
>> +- 
>> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source 
>> = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name, pk_id, agent_id, bagent_id])
>>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
>> pay_type, member_id, mer_name, agent_id, bagent_id])
>>   +- 
>> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
>> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
>> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
>> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
>> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
>> fagent_id0])
>> +- 
>> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
>> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
>> bagent_name])
>>  
>> 
>> 
>> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
>> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
>> lookup=[bg_rel_trans_id=bg_rel_trans_id],
>> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
>> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
>> 

Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 Yu Chen
Hi iasiuide,
方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc 
connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。

[1] https://issues.apache.org/jira/browse/FLINK-33365

祝好~

> 2024年3月8日 11:02,iasiuide  写道:
> 
> 
> 
> 
> 图片可能加载不出来,下面是图片中的sql片段 
> ..
> END AS trans_type,
> 
>  a.div_fee_amt,
> 
>  a.ts
> 
>FROM
> 
>  ods_ymfz_prod_sys_divide_order a
> 
>  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
> 
>  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS c ON b.member_id = c.pk_id
> 
>  AND c.data_source = 'merch'
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS d ON c.agent_id = d.pk_id
> 
>  AND (
> 
>d.data_source = 'ex_agent'
> 
>OR d.data_source = 'agent'
> 
>  ) 
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS d1 ON d.fagent_id = d1.pk_id
> 
>  AND d1.data_source = 'agent'
> 
>WHERE 
> 
>  a.order_state = '2' 
> 
>  AND a.divide_fee_amt > 0
> 
>  ) dat
> 
> WHERE
> 
>  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')
> 
>  AND CHAR_LENGTH(member_id) > 1;
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-03-08 10:54:19,"iasiuide"  写道:
> 
> 
> 
> 
> 
> 下面的sql片段中
> ods_ymfz_prod_sys_divide_order  为kafka source表
> dim_ymfz_prod_sys_trans_log   为mysql为表
> dim_ptfz_ymfz_merchant_info   为mysql为表
> 
> 
> 
> flink web ui界面的执行计划片段如下:
> 
> [1]:TableSourceScan(table=[[default_catalog, default_database, 
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
> divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time 
> IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], 
> where=[((order_state = '2') AND (divide_fee_amt  0) AND (sys_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))])
>   +- 
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
> bg_rel_trans_id, pay_type, member_id, mer_name])
>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
> +- 
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 
> 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, pk_id, agent_id, bagent_id])
>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, agent_id, bagent_id])
>   +- 
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
> fagent_id0])
> +- 
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
> bagent_name])
>  
> 
> 
> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
> (d.data_source = 'ex_agent' OR d.data_source = 'agent') 
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
> 

flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 iasiuide




下面的sql片段中
ods_ymfz_prod_sys_divide_order  为kafka source表
dim_ymfz_prod_sys_trans_log   为mysql为表
dim_ptfz_ymfz_merchant_info   为mysql为表



flink web ui界面的执行计划片段如下:

 [1]:TableSourceScan(table=[[default_catalog, default_database, 
ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, 
Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS 
div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, 
CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND 
(divide_fee_amt  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
TIMESTAMP(9)), '-MM-dd')))])
   +- 
[3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
 joinType=[LeftOuterJoin], async=[false], 
lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
bg_rel_trans_id, pay_type, member_id, mer_name])
  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
 +- 
[5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', 
pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, 
bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, agent_id, bagent_id])
   +- 
[7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, 
agent_id, bagent_id, pk_id, bagent_id, fagent_id])
  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
 +- 
[9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', 
pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, 
bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
  


为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。