Re:Re: flink sql关联维表在lookup执行计划中的关联条件问题
你好,我们用的是1.13.2和1.15.4版本的,看了下flink ui,这两种版本针对下面sql片段的lookup执行计划中的关联维表条件是一样的 在 2024-03-08 11:08:51,"Yu Chen" 写道: >Hi iasiuide, >方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc >connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。 > >[1] https://issues.apache.org/jira/browse/FLINK-33365 > >祝好~ > >> 2024年3月8日 11:02,iasiuide 写道: >> >> >> >> >> 图片可能加载不出来,下面是图片中的sql片段 >> .. >> END AS trans_type, >> >> a.div_fee_amt, >> >> a.ts >> >>FROM >> >> ods_ymfz_prod_sys_divide_order a >> >> LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time >> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id >> >> AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd') >> >> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time >> AS c ON b.member_id = c.pk_id >> >> AND c.data_source = 'merch' >> >> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time >> AS d ON c.agent_id = d.pk_id >> >> AND ( >> >>d.data_source = 'ex_agent' >> >>OR d.data_source = 'agent' >> >> ) >> >> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time >> AS d1 ON d.fagent_id = d1.pk_id >> >> AND d1.data_source = 'agent' >> >>WHERE >> >> a.order_state = '2' >> >> AND a.divide_fee_amt > 0 >> >> ) dat >> >> WHERE >> >> trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd') >> >> AND CHAR_LENGTH(member_id) > 1; >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2024-03-08 10:54:19,"iasiuide" 写道: >> >> >> >> >> >> 下面的sql片段中 >> ods_ymfz_prod_sys_divide_order 为kafka source表 >> dim_ymfz_prod_sys_trans_log 为mysql为表 >> dim_ptfz_ymfz_merchant_info 为mysql为表 >> >> >> >> flink web ui界面的执行计划片段如下: >> >> [1]:TableSourceScan(table=[[default_catalog, default_database, >> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), >> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), >> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, >> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag]) >> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, >> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * >> divide_fee_amt), divide_fee_amt) AS div_fee_amt, >> Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time >> AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt >> 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS >> TIMESTAMP(9)), '-MM-dd')))]) >> +- >> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], >> joinType=[LeftOuterJoin], async=[false], >> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = >> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], >> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, >> bg_rel_trans_id, pay_type, member_id, mer_name]) >> +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, >> member_id, mer_name], where=[(CHAR_LENGTH(member_id) 1)]) >> +- >> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], >> joinType=[LeftOuterJoin], async=[false], >> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source >> = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, >> member_id, mer_name, pk_id, agent_id, bagent_id]) >>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, >> pay_type, member_id, mer_name, agent_id, bagent_id]) >> +- >> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], >> joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], >> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], >> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, >> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id]) >> +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, >> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS >> fagent_id0]) >> +- >> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], >> joinType=[LeftOuterJoin], async=[false], >> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source >> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, >> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, >> bagent_name]) >> >> >> >> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT >> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> >> lookup=[bg_rel_trans_id=bg_rel_trans_id], >> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND >> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> >>
Re: flink sql关联维表在lookup执行计划中的关联条件问题
Hi iasiuide, 方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。 [1] https://issues.apache.org/jira/browse/FLINK-33365 祝好~ > 2024年3月8日 11:02,iasiuide 写道: > > > > > 图片可能加载不出来,下面是图片中的sql片段 > .. > END AS trans_type, > > a.div_fee_amt, > > a.ts > >FROM > > ods_ymfz_prod_sys_divide_order a > > LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time > AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id > > AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd') > > LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time > AS c ON b.member_id = c.pk_id > > AND c.data_source = 'merch' > > LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time > AS d ON c.agent_id = d.pk_id > > AND ( > >d.data_source = 'ex_agent' > >OR d.data_source = 'agent' > > ) > > LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time > AS d1 ON d.fagent_id = d1.pk_id > > AND d1.data_source = 'agent' > >WHERE > > a.order_state = '2' > > AND a.divide_fee_amt > 0 > > ) dat > > WHERE > > trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd') > > AND CHAR_LENGTH(member_id) > 1; > > > > > > > > > > > > > > 在 2024-03-08 10:54:19,"iasiuide" 写道: > > > > > > 下面的sql片段中 > ods_ymfz_prod_sys_divide_order 为kafka source表 > dim_ymfz_prod_sys_trans_log 为mysql为表 > dim_ptfz_ymfz_merchant_info 为mysql为表 > > > > flink web ui界面的执行计划片段如下: > > [1]:TableSourceScan(table=[[default_catalog, default_database, > ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), > 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), > 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, > order_state, create_time, update_time, divide_fee_amt, divide_fee_flag]) > +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, > IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * > divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time > IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], > where=[((order_state = '2') AND (divide_fee_amt 0) AND (sys_date = > DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))]) > +- > [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], > joinType=[LeftOuterJoin], async=[false], > lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = > DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], > select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, > bg_rel_trans_id, pay_type, member_id, mer_name]) > +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, > member_id, mer_name], where=[(CHAR_LENGTH(member_id) 1)]) > +- > [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], > joinType=[LeftOuterJoin], async=[false], > lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = > 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, > member_id, mer_name, pk_id, agent_id, bagent_id]) >+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, > pay_type, member_id, mer_name, agent_id, bagent_id]) > +- > [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], > joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], > where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], > select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, > mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id]) > +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, > pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS > fagent_id0]) > +- > [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], > joinType=[LeftOuterJoin], async=[false], > lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source > = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, > member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, > bagent_name]) > > > > 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT > (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> > lookup=[bg_rel_trans_id=bg_rel_trans_id], > 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND > c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> > lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], > 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND > (d.data_source = 'ex_agent' OR d.data_source = 'agent') > 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id], >
flink sql关联维表在lookup执行计划中的关联条件问题
下面的sql片段中 ods_ymfz_prod_sys_divide_order 为kafka source表 dim_ymfz_prod_sys_trans_log 为mysql为表 dim_ptfz_ymfz_merchant_info 为mysql为表 flink web ui界面的执行计划片段如下: [1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, order_state, create_time, update_time, divide_fee_amt, divide_fee_flag]) +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))]) +- [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], joinType=[LeftOuterJoin], async=[false], lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, bg_rel_trans_id, pay_type, member_id, mer_name]) +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) 1)]) +- [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, bagent_id]) +- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id]) +- [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id]) +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0]) +- [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, bagent_name]) 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> lookup=[bg_rel_trans_id=bg_rel_trans_id], 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND (d.data_source = 'ex_agent' OR d.data_source = 'agent') 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id], 关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。