回复: Flink DataStream 作业如何获取到作业血缘?
获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年02月26日 20:04,Feng Jin 写道: 通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris Sink,之后再通过反射获取里面的 properties 信息进行提取。 可以参考 OpenLineage[1] 的实现. 1. https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java Best, Feng On Mon, Feb 26, 2024 at 6:20 PM casel.chen wrote: 一个Flink DataStream 作业从mysql cdc消费处理后写入apache doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink connector信息,包括连接字符串、数据库名、表名等?
Re:Re:flink sql关联维表在lookup执行计划中的关联条件问题
好的,已经贴了sql片段 在 2024-03-08 11:02:34,"Xuyang" 写道: >Hi, 你的图挂了,可以用图床或者直接贴SQL > > > > >-- > >Best! >Xuyang > > > > >在 2024-03-08 10:54:19,"iasiuide" 写道: > > > > > >下面的sql片段中 >ods_ymfz_prod_sys_divide_order 为kafka source表 >dim_ymfz_prod_sys_trans_log 为mysql为表 >dim_ptfz_ymfz_merchant_info 为mysql为表 > > > >flink web ui界面的执行计划片段如下: > > [1]:TableSourceScan(table=[[default_catalog, default_database, > ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), > 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), > 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, > order_state, create_time, update_time, divide_fee_amt, divide_fee_flag]) >+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, >IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * >divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time >IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], >where=[((order_state = '2') AND (divide_fee_amt 0) AND (sys_date = >DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))]) > +- > [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], > joinType=[LeftOuterJoin], async=[false], > lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = > DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], > select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, > bg_rel_trans_id, pay_type, member_id, mer_name]) > +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, > member_id, mer_name], where=[(CHAR_LENGTH(member_id) 1)]) > +- > [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], > joinType=[LeftOuterJoin], async=[false], > lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = > 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, > member_id, mer_name, pk_id, agent_id, bagent_id]) >+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, > pay_type, member_id, mer_name, agent_id, bagent_id]) > +- > [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], > joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], > where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], > select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, > mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id]) > +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, > pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS > fagent_id0]) > +- > [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], > joinType=[LeftOuterJoin], async=[false], > lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source > = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, > member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, > bagent_name]) > > > >为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT >(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> >lookup=[bg_rel_trans_id=bg_rel_trans_id], >关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND >c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> >lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], >关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND >(d.data_source = 'ex_agent' OR d.data_source = 'agent') >中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id], >关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。 > > > > >
Re:Re: flink sql关联维表在lookup执行计划中的关联条件问题
你好,我们用的是1.13.2和1.15.4版本的,看了下flink ui,这两种版本针对下面sql片段的lookup执行计划中的关联维表条件是一样的 在 2024-03-08 11:08:51,"Yu Chen" 写道: >Hi iasiuide, >方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc >connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。 > >[1] https://issues.apache.org/jira/browse/FLINK-33365 > >祝好~ > >> 2024年3月8日 11:02,iasiuide 写道: >> >> >> >> >> 图片可能加载不出来,下面是图片中的sql片段 >> .. >> END AS trans_type, >> >> a.div_fee_amt, >> >> a.ts >> >>FROM >> >> ods_ymfz_prod_sys_divide_order a >> >> LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time >> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id >> >> AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd') >> >> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time >> AS c ON b.member_id = c.pk_id >> >> AND c.data_source = 'merch' >> >> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time >> AS d ON c.agent_id = d.pk_id >> >> AND ( >> >>d.data_source = 'ex_agent' >> >>OR d.data_source = 'agent' >> >> ) >> >> LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time >> AS d1 ON d.fagent_id = d1.pk_id >> >> AND d1.data_source = 'agent' >> >>WHERE >> >> a.order_state = '2' >> >> AND a.divide_fee_amt > 0 >> >> ) dat >> >> WHERE >> >> trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd') >> >> AND CHAR_LENGTH(member_id) > 1; >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2024-03-08 10:54:19,"iasiuide" 写道: >> >> >> >> >> >> 下面的sql片段中 >> ods_ymfz_prod_sys_divide_order 为kafka source表 >> dim_ymfz_prod_sys_trans_log 为mysql为表 >> dim_ptfz_ymfz_merchant_info 为mysql为表 >> >> >> >> flink web ui界面的执行计划片段如下: >> >> [1]:TableSourceScan(table=[[default_catalog, default_database, >> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), >> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), >> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, >> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag]) >> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, >> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * >> divide_fee_amt), divide_fee_amt) AS div_fee_amt, >> Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time >> AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt >> 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS >> TIMESTAMP(9)), '-MM-dd')))]) >> +- >> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], >> joinType=[LeftOuterJoin], async=[false], >> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = >> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], >> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, >> bg_rel_trans_id, pay_type, member_id, mer_name]) >> +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, >> member_id, mer_name], where=[(CHAR_LENGTH(member_id) 1)]) >> +- >> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], >> joinType=[LeftOuterJoin], async=[false], >> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source >> = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, >> member_id, mer_name, pk_id, agent_id, bagent_id]) >>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, >> pay_type, member_id, mer_name, agent_id, bagent_id]) >> +- >> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], >> joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], >> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], >> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, >> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id]) >> +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, >> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS >> fagent_id0]) >> +- >> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], >> joinType=[LeftOuterJoin], async=[false], >> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source >> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, >> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, >> bagent_name]) >> >> >> >> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT >> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> >> lookup=[bg_rel_trans_id=bg_rel_trans_id], >> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND >> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> >>
Re: flink sql关联维表在lookup执行计划中的关联条件问题
Hi iasiuide, 方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。 [1] https://issues.apache.org/jira/browse/FLINK-33365 祝好~ > 2024年3月8日 11:02,iasiuide 写道: > > > > > 图片可能加载不出来,下面是图片中的sql片段 > .. > END AS trans_type, > > a.div_fee_amt, > > a.ts > >FROM > > ods_ymfz_prod_sys_divide_order a > > LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time > AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id > > AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd') > > LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time > AS c ON b.member_id = c.pk_id > > AND c.data_source = 'merch' > > LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time > AS d ON c.agent_id = d.pk_id > > AND ( > >d.data_source = 'ex_agent' > >OR d.data_source = 'agent' > > ) > > LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time > AS d1 ON d.fagent_id = d1.pk_id > > AND d1.data_source = 'agent' > >WHERE > > a.order_state = '2' > > AND a.divide_fee_amt > 0 > > ) dat > > WHERE > > trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd') > > AND CHAR_LENGTH(member_id) > 1; > > > > > > > > > > > > > > 在 2024-03-08 10:54:19,"iasiuide" 写道: > > > > > > 下面的sql片段中 > ods_ymfz_prod_sys_divide_order 为kafka source表 > dim_ymfz_prod_sys_trans_log 为mysql为表 > dim_ptfz_ymfz_merchant_info 为mysql为表 > > > > flink web ui界面的执行计划片段如下: > > [1]:TableSourceScan(table=[[default_catalog, default_database, > ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), > 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), > 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, > order_state, create_time, update_time, divide_fee_amt, divide_fee_flag]) > +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, > IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * > divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time > IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], > where=[((order_state = '2') AND (divide_fee_amt 0) AND (sys_date = > DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))]) > +- > [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], > joinType=[LeftOuterJoin], async=[false], > lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = > DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], > select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, > bg_rel_trans_id, pay_type, member_id, mer_name]) > +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, > member_id, mer_name], where=[(CHAR_LENGTH(member_id) 1)]) > +- > [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], > joinType=[LeftOuterJoin], async=[false], > lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = > 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, > member_id, mer_name, pk_id, agent_id, bagent_id]) >+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, > pay_type, member_id, mer_name, agent_id, bagent_id]) > +- > [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], > joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], > where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], > select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, > mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id]) > +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, > pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS > fagent_id0]) > +- > [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], > joinType=[LeftOuterJoin], async=[false], > lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source > = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, > member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, > bagent_name]) > > > > 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT > (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> > lookup=[bg_rel_trans_id=bg_rel_trans_id], > 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND > c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> > lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], > 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND > (d.data_source = 'ex_agent' OR d.data_source = 'agent') > 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id], >
Re:flink sql关联维表在lookup执行计划中的关联条件问题
Hi, 你的图挂了,可以用图床或者直接贴SQL -- Best! Xuyang 在 2024-03-08 10:54:19,"iasiuide" 写道: 下面的sql片段中 ods_ymfz_prod_sys_divide_order 为kafka source表 dim_ymfz_prod_sys_trans_log 为mysql为表 dim_ptfz_ymfz_merchant_info 为mysql为表 flink web ui界面的执行计划片段如下: [1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, order_state, create_time, update_time, divide_fee_amt, divide_fee_flag]) +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))]) +- [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], joinType=[LeftOuterJoin], async=[false], lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, bg_rel_trans_id, pay_type, member_id, mer_name]) +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) 1)]) +- [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, bagent_id]) +- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id]) +- [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id]) +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0]) +- [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, bagent_name]) 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> lookup=[bg_rel_trans_id=bg_rel_trans_id], 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND (d.data_source = 'ex_agent' OR d.data_source = 'agent') 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id], 关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
Re:flink sql关联维表在lookup执行计划中的关联条件问题
图片可能加载不出来,下面是图片中的sql片段 .. END AS trans_type, a.div_fee_amt, a.ts FROM ods_ymfz_prod_sys_divide_order a LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd') LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS c ON b.member_id = c.pk_id AND c.data_source = 'merch' LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS d ON c.agent_id = d.pk_id AND ( d.data_source = 'ex_agent' OR d.data_source = 'agent' ) LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time AS d1 ON d.fagent_id = d1.pk_id AND d1.data_source = 'agent' WHERE a.order_state = '2' AND a.divide_fee_amt > 0 ) dat WHERE trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd') AND CHAR_LENGTH(member_id) > 1; 在 2024-03-08 10:54:19,"iasiuide" 写道: 下面的sql片段中 ods_ymfz_prod_sys_divide_order 为kafka source表 dim_ymfz_prod_sys_trans_log 为mysql为表 dim_ptfz_ymfz_merchant_info 为mysql为表 flink web ui界面的执行计划片段如下: [1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, order_state, create_time, update_time, divide_fee_amt, divide_fee_flag]) +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))]) +- [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], joinType=[LeftOuterJoin], async=[false], lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, bg_rel_trans_id, pay_type, member_id, mer_name]) +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) 1)]) +- [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, bagent_id]) +- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id]) +- [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id]) +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0]) +- [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, bagent_name]) 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> lookup=[bg_rel_trans_id=bg_rel_trans_id], 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND (d.data_source = 'ex_agent' OR d.data_source = 'agent') 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id], 关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
flink sql关联维表在lookup执行计划中的关联条件问题
下面的sql片段中 ods_ymfz_prod_sys_divide_order 为kafka source表 dim_ymfz_prod_sys_trans_log 为mysql为表 dim_ptfz_ymfz_merchant_info 为mysql为表 flink web ui界面的执行计划片段如下: [1]:TableSourceScan(table=[[default_catalog, default_database, ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, order_state, create_time, update_time, divide_fee_amt, divide_fee_flag]) +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt 0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))]) +- [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log], joinType=[LeftOuterJoin], async=[false], lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, bg_rel_trans_id, pay_type, member_id, mer_name]) +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name], where=[(CHAR_LENGTH(member_id) 1)]) +- [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, bagent_id]) +- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id]) +- [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id]) +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0]) +- [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info], joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, bagent_name]) 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> lookup=[bg_rel_trans_id=bg_rel_trans_id], 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND (d.data_source = 'ex_agent' OR d.data_source = 'agent') 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id], 关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
Re: Re:RE: RE: flink cdc动态加表不生效
Hi, casel chan, 社区已经对增量框架实现动态加表(https://github.com/apache/flink-cdc/pull/3024 ),预计3.1对mongodb和postgres暴露出来,但是Oracle和Sqlserver目前并没暴露,你可以去社区参照这两个框架,将参数打开,并且测试和适配。 Best, Hongshun
Re:Window properties can only be used on windowed tables
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); ListPerson list = new ArrayList();list.add(new Person("Fred",35));list.add(new Person("Wilma",35)); list.add(new Person("Pebbles",2));DataStreamPerson flintstones = env.fromCollection(list);// 为数据流定义事件时间属性 DataStreamPerson flintstonesWithTime = flintstones.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorPerson(Time.seconds(0)) { @Overridepublic long extractTimestamp(Person element) { // 这里假设您的数据流中的每个元素都包含一个时间戳字段,您可以根据实际情况进行修改return System.currentTimeMillis(); // 也可以使用您的数据中的时间字段}}); // 将DataStream转换为Table时定义窗口Table table = tEnv.fromDataStream(flintstonesWithTime, $("name"), $("age"), $("eventTime").rowtime());Table select = table.window(Tumble.over(lit(10).seconds()).on($("eventTime")).as("w")) .groupBy($("name"), $("w")).select($("name"), $("age").sum());tEnv.toAppendStream(select, Row.class).print();env.execute("Flink Window Example");}public static class Person{public String name;public Integer age;public Person(){}public Person(String name,Integer age){this.name = name; this.age = age;}public String toString(){return this.name.toString()+":age "+this.age.toString();}} At 2024-03-08 09:28:10, "ha.fen...@aisino.com" wrote: >public static void main(String[] args) { >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); >StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); >List list = new ArrayList<>(); >list.add(new Persion("Fred",35)); >list.add(new Persion("wilma",35)); >list.add(new Persion("Pebbles",2)); >DataStream flintstones = env.fromCollection(list); >Table table = tEnv.fromDataStream(flintstones); >Table select = table.select($("name"), $("age"), > $("addtime").proctime()); >Table select1 = select.window( >Tumble.over(lit(10).second()) >.on($("addtime")) >.as("w")) >.groupBy($("name"), $("w")) >.select($("name"), $("age").sum()); >select1.execute().print(); > >} > >public static class Persion{ >public String name; >public Integer age; >public Persion(){} >public Persion(String name,Integer age){ >this.name = name; >this.age = age; >} >public String toString(){ >return this.name.toString()+":age "+this.age.toString(); >} >} > >提示Exception in thread "main" org.apache.flink.table.api.ValidationException: >Window properties can only be used on windowed tables >是哪里错了?
Re:Window properties can only be used on windowed tables
在非窗口化的表上使用窗口属性 At 2024-03-08 09:28:10, "ha.fen...@aisino.com" wrote: >public static void main(String[] args) { >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); >StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); >List list = new ArrayList<>(); >list.add(new Persion("Fred",35)); >list.add(new Persion("wilma",35)); >list.add(new Persion("Pebbles",2)); >DataStream flintstones = env.fromCollection(list); >Table table = tEnv.fromDataStream(flintstones); >Table select = table.select($("name"), $("age"), > $("addtime").proctime()); >Table select1 = select.window( >Tumble.over(lit(10).second()) >.on($("addtime")) >.as("w")) >.groupBy($("name"), $("w")) >.select($("name"), $("age").sum()); >select1.execute().print(); > >} > >public static class Persion{ >public String name; >public Integer age; >public Persion(){} >public Persion(String name,Integer age){ >this.name = name; >this.age = age; >} >public String toString(){ >return this.name.toString()+":age "+this.age.toString(); >} >} > >提示Exception in thread "main" org.apache.flink.table.api.ValidationException: >Window properties can only be used on windowed tables >是哪里错了?
Re:Window properties can only be used on windowed tables
Hi, fengqi. 这看起来像是select语句中,不能直接使用非来源于window agg的proctime或者event函数。目前不确定这是不是预期行为,方便的话可以在社区jira[1]上提一个bug看看。 快速绕过的话,可以试试下面的代码: DataStream flintstones = env.fromCollection(list); // Table select = table.select($("name"), $("age"), $("addtime").proctime()); Table table = tEnv.fromDataStream( flintstones, Schema.newBuilder() .column("name", "string") .column("age", "int") .columnByExpression("addtime", "proctime()") .build()); Table select1 = // select.xxx table.window(Tumble.over(lit(10).second()).on($("addtime")).as("w")) .groupBy($("name"), $("w")) .select($("name"), $("age").sum()); select1.execute().print(); [1] https://issues.apache.org/jira/projects/FLINK/issues -- Best! Xuyang At 2024-03-08 09:28:10, "ha.fen...@aisino.com" wrote: >public static void main(String[] args) { >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); >StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); >List list = new ArrayList<>(); >list.add(new Persion("Fred",35)); >list.add(new Persion("wilma",35)); >list.add(new Persion("Pebbles",2)); >DataStream flintstones = env.fromCollection(list); >Table table = tEnv.fromDataStream(flintstones); >Table select = table.select($("name"), $("age"), > $("addtime").proctime()); >Table select1 = select.window( >Tumble.over(lit(10).second()) >.on($("addtime")) >.as("w")) >.groupBy($("name"), $("w")) >.select($("name"), $("age").sum()); >select1.execute().print(); > >} > >public static class Persion{ >public String name; >public Integer age; >public Persion(){} >public Persion(String name,Integer age){ >this.name = name; >this.age = age; >} >public String toString(){ >return this.name.toString()+":age "+this.age.toString(); >} >} > >提示Exception in thread "main" org.apache.flink.table.api.ValidationException: >Window properties can only be used on windowed tables >是哪里错了?
使用avro schema注册confluent schema registry失败
我使用注册kafka topic对应的schema到confluent schema registry时报错,想知道问题的原因是什么?如何fix? io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "rtdp_test-test_schema-value", details: [{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'salary' at path '/fields/10/type/fields/5' in the new schema has no default value and is missing in the old schema', additionalInfo:'salary'}, {errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'salary' at path '/fields/11/type/fields/5' in the new schema has no default value and is missing in the old schema', additionalInfo:'salary'}, {oldSchemaVersion: 4}, {oldSchema: '{"type":"record","name":"Envelope","namespace":"rtdp_test-test_schema","fields":[{"name":"database","type":"string"},{"name":"es","type":"long"},{"name":"id","type":"int"},{"name":"isDdl","type":"boolean"},{"name":"sql","type":"string"},{"name":"table","type":"string"},{"name":"ts","type":"long"},{"name":"type","type":"string"},{"name":"pkNames","type":{"type":"array","items":"string"}},{"name":"data","type":[{"type":"array","items":{"type":"record","name":"Value","fields":[{"name":"id","type":["long","null"],"default":0},{"name":"create_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"update_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"name","type":["string","null"],"default":""},{"name":"gender","type":["string","null"],"default":""}]}},"null"]},{"name":"mysqlType","type":{"type":"record","name":"mysqlType","fields":[{"name":"id","type":"string"},{"name":"create_time","type":"string"},{"name":"update_time","type":"string"},{"name":"name","type":"string"},{"name":"gender","type":"string"}]}},{"name":"sqlType","type":{"type":"record","name":"sqlType","fields":[{"name":"id","type":"int"},{"name":"create_time","type":"int"},{"name":"update_time","type":"int"},{"name":"name","type":"int"},{"name":"gender","type":"int"}]}},{"name":"old","type":[{"type":"array","items":"Value"},"null"]}]}'}, {validateFields: 'false', compatibility: 'BACKWARD'}]; error code: 409 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384) maven依赖: io.confluent kafka-schema-registry-client 7.3.1 java代码: Schema decimalSchema = LogicalTypes.decimal(precision, scale).addToSchema(SchemaBuilder.builder().bytesType()); data = data.name(columnName).type().unionOf().nullType().and().type(decimalSchema).endUnion().nullDefault(); salary字段是decimal类型,报错是说之前有一个不带salary字段版本的schema,而新版本schema里该salary字段定义中缺少default value,可我明明设置了nullDefault呀,这一点从生成的avro schema json string也可验证: { "type": "record", "name": "Envelope", "namespace": "rtdp_test-test_schema", "fields": [ { "name": "database", "type": "string" }, { "name": "es", "type": "long" }, { "name": "id", "type": "int" }, { "name": "isDdl", "type": "boolean" }, { "name": "sql", "type": "string" }, { "name": "table", "type": "string" }, { "name": "ts", "type": "long" }, { "name": "type", "type": "string" }, { "name": "pkNames", "type": { "type": "array", "items": "string" } }, { "name": "data", "type": [ { "type": "array", "items": { "type": "record", "name": "Value", "fields": [ { "name": "id", "type": [ "null", "long" ], "default": null }, { "name": "create_time", "type": [ "null", { "type": "long", "logicalType": "timestamp-millis" } ], "default": null }, {