回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-07 Thread 阿华田
 获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: Flink Checkpoint & Offset Commit

2024-03-07 Thread Yanfei Lei
Hi Jacob,

> I have multiple upstream sources to connect to depending on the business 
> model which are not Kafka. Based on criticality of the system and publisher 
> dependencies, we cannot switch to Kafka for these.

Sounds like you want to implement some custom connectors, [1][2] may
be helpful to implement a custom Flink’s Table API connector.

Specifically in terms of “Flink Checkpoint & Offset Commit”, the
custom source needs to inherit the `SourceReader` interfaces, and you
can override `snapshotState()` and `notifyCheckpointComplete()` into
your implementations.
[3] is the related code of kafka connector under datastream API, [4]
is the related code of kafka connector under TABLE API & SQL.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
[2] 
https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-one/
[3] 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L98-L177
[4] 
https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java#L354

xia rui  于2024年3月8日周五 10:12写道:
>
> Hi Jacob.
>
> Flink uses "notification" to let an operator callback the completion of a 
> checkpoint. After gathering all checkpoint done messages from TMs, JM sends a 
> "notify checkpoint completed" RPC to all TMs. Operators will handle this 
> notification, where checkpoint success callbacks are invoked. For example, 
> Kafka sources commit the current consuming offset. I think this doc 
> (https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/)
>  may be helpful.
>
> You can override the `notifyCheckpointComlete()` to customize the behavior of 
> handling checkpoint completion.
>
> Best regards Rui Xia
>
> On Fri, Mar 8, 2024 at 3:03 AM Jacob Rollings  
> wrote:
>>
>>
>> Hello,
>>
>> I am implementing proof of concepts based Flink realtime streaming solutions.
>>
>> I came across below lines in out-of-the-box Flink Kafka connector documents.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/
>> Consumer Offset Committing #
>>
>> Kafka source commits the current consuming offset when checkpoints are 
>> completed, for ensuring the consistency between Flink’s checkpoint state and 
>> committed offsets on Kafka brokers.
>>
>>
>> How is Flink able to control the callbacks from checkpointing? Is there a 
>> way to override this into my implementations. I have multiple upstream 
>> sources to connect to depending on the business model which are not Kafka. 
>> Based on criticality of the system and publisher dependencies, we cannot 
>> switch to Kafka for these. So I was hoping to do the same which kafka 
>> connector is doing.
>>
>>
>> Cheers,
>>
>> JR



-- 
Best,
Yanfei


Re:Re:flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 Thread iasiuide
好的,已经贴了sql片段

在 2024-03-08 11:02:34,"Xuyang"  写道:
>Hi, 你的图挂了,可以用图床或者直接贴SQL
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>在 2024-03-08 10:54:19,"iasiuide"  写道:
>
>
>
>
>
>下面的sql片段中
>ods_ymfz_prod_sys_divide_order  为kafka source表
>dim_ymfz_prod_sys_trans_log   为mysql为表
>dim_ptfz_ymfz_merchant_info   为mysql为表
>
>
>
>flink web ui界面的执行计划片段如下:
>
> [1]:TableSourceScan(table=[[default_catalog, default_database, 
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
>+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
>IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
>divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time 
>IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], 
>where=[((order_state = '2') AND (divide_fee_amt  0) AND (sys_date = 
>DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))])
>   +- 
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
> bg_rel_trans_id, pay_type, member_id, mer_name])
>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
> +- 
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 
> 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, pk_id, agent_id, bagent_id])
>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, agent_id, bagent_id])
>   +- 
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
> fagent_id0])
> +- 
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
> bagent_name])
>  
>
>
>为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
>(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
>lookup=[bg_rel_trans_id=bg_rel_trans_id],
>关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
>c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
>lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
>关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
>(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
>中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
>关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。
>
>
>
>
>


Re:Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 Thread iasiuide
你好,我们用的是1.13.2和1.15.4版本的,看了下flink ui,这两种版本针对下面sql片段的lookup执行计划中的关联维表条件是一样的


在 2024-03-08 11:08:51,"Yu Chen"  写道:
>Hi iasiuide,
>方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc 
>connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。
>
>[1] https://issues.apache.org/jira/browse/FLINK-33365
>
>祝好~
>
>> 2024年3月8日 11:02,iasiuide  写道:
>> 
>> 
>> 
>> 
>> 图片可能加载不出来,下面是图片中的sql片段 
>> ..
>> END AS trans_type,
>> 
>>  a.div_fee_amt,
>> 
>>  a.ts
>> 
>>FROM
>> 
>>  ods_ymfz_prod_sys_divide_order a
>> 
>>  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
>> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
>> 
>>  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS c ON b.member_id = c.pk_id
>> 
>>  AND c.data_source = 'merch'
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS d ON c.agent_id = d.pk_id
>> 
>>  AND (
>> 
>>d.data_source = 'ex_agent'
>> 
>>OR d.data_source = 'agent'
>> 
>>  ) 
>> 
>>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
>> AS d1 ON d.fagent_id = d1.pk_id
>> 
>>  AND d1.data_source = 'agent'
>> 
>>WHERE 
>> 
>>  a.order_state = '2' 
>> 
>>  AND a.divide_fee_amt > 0
>> 
>>  ) dat
>> 
>> WHERE
>> 
>>  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')
>> 
>>  AND CHAR_LENGTH(member_id) > 1;
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2024-03-08 10:54:19,"iasiuide"  写道:
>> 
>> 
>> 
>> 
>> 
>> 下面的sql片段中
>> ods_ymfz_prod_sys_divide_order  为kafka source表
>> dim_ymfz_prod_sys_trans_log   为mysql为表
>> dim_ptfz_ymfz_merchant_info   为mysql为表
>> 
>> 
>> 
>> flink web ui界面的执行计划片段如下:
>> 
>> [1]:TableSourceScan(table=[[default_catalog, default_database, 
>> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
>> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
>> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
>> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
>> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
>> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
>> divide_fee_amt), divide_fee_amt) AS div_fee_amt, 
>> Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, CAST(create_time 
>> AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND (divide_fee_amt 
>>  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
>> TIMESTAMP(9)), '-MM-dd')))])
>>   +- 
>> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
>> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
>> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
>> bg_rel_trans_id, pay_type, member_id, mer_name])
>>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
>> +- 
>> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source 
>> = 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name, pk_id, agent_id, bagent_id])
>>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
>> pay_type, member_id, mer_name, agent_id, bagent_id])
>>   +- 
>> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
>> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
>> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
>> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
>> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
>> fagent_id0])
>> +- 
>> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>>  joinType=[LeftOuterJoin], async=[false], 
>> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
>> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
>> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
>> bagent_name])
>>  
>> 
>> 
>> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
>> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
>> lookup=[bg_rel_trans_id=bg_rel_trans_id],
>> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
>> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
>> 

Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 Thread Yu Chen
Hi iasiuide,
方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc 
connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。

[1] https://issues.apache.org/jira/browse/FLINK-33365

祝好~

> 2024年3月8日 11:02,iasiuide  写道:
> 
> 
> 
> 
> 图片可能加载不出来,下面是图片中的sql片段 
> ..
> END AS trans_type,
> 
>  a.div_fee_amt,
> 
>  a.ts
> 
>FROM
> 
>  ods_ymfz_prod_sys_divide_order a
> 
>  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
> 
>  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS c ON b.member_id = c.pk_id
> 
>  AND c.data_source = 'merch'
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS d ON c.agent_id = d.pk_id
> 
>  AND (
> 
>d.data_source = 'ex_agent'
> 
>OR d.data_source = 'agent'
> 
>  ) 
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS d1 ON d.fagent_id = d1.pk_id
> 
>  AND d1.data_source = 'agent'
> 
>WHERE 
> 
>  a.order_state = '2' 
> 
>  AND a.divide_fee_amt > 0
> 
>  ) dat
> 
> WHERE
> 
>  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')
> 
>  AND CHAR_LENGTH(member_id) > 1;
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-03-08 10:54:19,"iasiuide"  写道:
> 
> 
> 
> 
> 
> 下面的sql片段中
> ods_ymfz_prod_sys_divide_order  为kafka source表
> dim_ymfz_prod_sys_trans_log   为mysql为表
> dim_ptfz_ymfz_merchant_info   为mysql为表
> 
> 
> 
> flink web ui界面的执行计划片段如下:
> 
> [1]:TableSourceScan(table=[[default_catalog, default_database, 
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
> divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time 
> IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], 
> where=[((order_state = '2') AND (divide_fee_amt  0) AND (sys_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))])
>   +- 
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
> bg_rel_trans_id, pay_type, member_id, mer_name])
>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
> +- 
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 
> 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, pk_id, agent_id, bagent_id])
>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, agent_id, bagent_id])
>   +- 
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
> fagent_id0])
> +- 
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
> bagent_name])
>  
> 
> 
> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
> (d.data_source = 'ex_agent' OR d.data_source = 'agent') 
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
> 

Re:flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 Thread Xuyang
Hi, 你的图挂了,可以用图床或者直接贴SQL




--

Best!
Xuyang




在 2024-03-08 10:54:19,"iasiuide"  写道:





下面的sql片段中
ods_ymfz_prod_sys_divide_order  为kafka source表
dim_ymfz_prod_sys_trans_log   为mysql为表
dim_ptfz_ymfz_merchant_info   为mysql为表



flink web ui界面的执行计划片段如下:

 [1]:TableSourceScan(table=[[default_catalog, default_database, 
ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, 
Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS 
div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, 
CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND 
(divide_fee_amt  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
TIMESTAMP(9)), '-MM-dd')))])
   +- 
[3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
 joinType=[LeftOuterJoin], async=[false], 
lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
bg_rel_trans_id, pay_type, member_id, mer_name])
  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
 +- 
[5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', 
pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, 
bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, agent_id, bagent_id])
   +- 
[7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, 
agent_id, bagent_id, pk_id, bagent_id, fagent_id])
  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
 +- 
[9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', 
pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, 
bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
  


为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。







Re:flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 Thread iasiuide



图片可能加载不出来,下面是图片中的sql片段 
 ..
 END AS trans_type,

  a.div_fee_amt,

  a.ts

FROM

  ods_ymfz_prod_sys_divide_order a

  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id

  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')

  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
AS c ON b.member_id = c.pk_id

  AND c.data_source = 'merch'

  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
AS d ON c.agent_id = d.pk_id

  AND (

d.data_source = 'ex_agent'

OR d.data_source = 'agent'

  ) 

  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
AS d1 ON d.fagent_id = d1.pk_id

  AND d1.data_source = 'agent'

WHERE 

  a.order_state = '2' 

  AND a.divide_fee_amt > 0

  ) dat

WHERE

  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')

  AND CHAR_LENGTH(member_id) > 1;













在 2024-03-08 10:54:19,"iasiuide"  写道:





下面的sql片段中
ods_ymfz_prod_sys_divide_order  为kafka source表
dim_ymfz_prod_sys_trans_log   为mysql为表
dim_ptfz_ymfz_merchant_info   为mysql为表



flink web ui界面的执行计划片段如下:

 [1]:TableSourceScan(table=[[default_catalog, default_database, 
ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, 
Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS 
div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, 
CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND 
(divide_fee_amt  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
TIMESTAMP(9)), '-MM-dd')))])
   +- 
[3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
 joinType=[LeftOuterJoin], async=[false], 
lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
bg_rel_trans_id, pay_type, member_id, mer_name])
  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
 +- 
[5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', 
pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, 
bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, agent_id, bagent_id])
   +- 
[7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, 
agent_id, bagent_id, pk_id, bagent_id, fagent_id])
  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
 +- 
[9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', 
pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, 
bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
  


为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。







flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 Thread iasiuide




下面的sql片段中
ods_ymfz_prod_sys_divide_order  为kafka source表
dim_ymfz_prod_sys_trans_log   为mysql为表
dim_ptfz_ymfz_merchant_info   为mysql为表



flink web ui界面的执行计划片段如下:

 [1]:TableSourceScan(table=[[default_catalog, default_database, 
ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, 
Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS 
div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, 
CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND 
(divide_fee_amt  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
TIMESTAMP(9)), '-MM-dd')))])
   +- 
[3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
 joinType=[LeftOuterJoin], async=[false], 
lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
bg_rel_trans_id, pay_type, member_id, mer_name])
  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
 +- 
[5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', 
pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, 
bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, agent_id, bagent_id])
   +- 
[7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, 
agent_id, bagent_id, pk_id, bagent_id, fagent_id])
  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
 +- 
[9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', 
pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, 
bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
  


为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。







Re:Re: Using User-defined Table Aggregates Functions in SQL

2024-03-07 Thread Xuyang
Hi, Jad.
IIUC, TableAggregateFunfunction has not been supported in SQL. The original 
Flip[1] only implements it in Table API. You can send an email to dev maillist 
for more detail and create an improvement jira[2] for it.


[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
[2] https://issues.apache.org/jira/projects/FLINK/issues



--

Best!
Xuyang




在 2024-03-08 03:12:19,"Jad Naous"  写道:

Hi Junrui,
Thank you for the pointer. I had read that page, and I can use the function 
with the Java Table API ok, but I'm trying to use the Top2 accumulator with a 
SQL function. I can't use a left lateral join on it since the planner fails 
with "not a table function". I don't think a join is the right thing anyway, 
since it's an aggregation table function.


tEnv.createTemporaryFunction("TOP2", Top2.class);

var calculated2 = tEnv.sqlQuery(
"SELECT " +
"  TUMBLE_START(ts, INTERVAL '1' SECOND) as w_start, " +
"  TUMBLE_END(ts, INTERVAL '1' SECOND) as w_end, " +
"  TUMBLE_ROWTIME(ts, INTERVAL '1' SECOND) as w_rowtime, " +
"  id, " +
"  top1, " +
"  top2 " +
"FROM " +
"  source " +
"  LEFT JOIN LATERAL TABLE(TOP2(val)) ON TRUE " +
"GROUP BY " +
"  TUMBLE(ts, INTERVAL '1' SECOND), " +
"  id"
).printExplain();



Gives the following:


   org.apache.flink.table.api.ValidationException: SQL validation failed. 
Function 'default_catalog.default_database.TOP2' cannot be used as a table 
function.
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at 
app//org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
at 
app//io.grepr.query.MetricsTableApiTest.test(MetricsTableApiTest.java:129)
Caused by:
org.apache.flink.table.api.ValidationException: Function 
'default_catalog.default_database.TOP2' cannot be used as a table function.
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.verifyFunctionKind(FunctionCatalogOperatorTable.java:200)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:133)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:126)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
at java.base@11.0.22/java.util.Optional.flatMap(Optional.java:294)
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:100)
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1310)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:993)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
 


Jad Naous

Grepr, CEO/Founder




ᐧ


On Thu, Mar 7, 2024 at 9:43 AM Junrui Lee  wrote:

Hi Jad,

You can refer to the CREATE FUNCTION section 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function)
 and the Table Aggregate Functions section 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions)
 for details on creating and using these functions.

Best regards,

Junrui


Jad Naous  于2024年3月7日周四 22:19写道:

Hi,
The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it 
possible 

Re: Flink Checkpoint & Offset Commit

2024-03-07 Thread xia rui
Hi Jacob.

Flink uses "notification" to let an operator callback the completion of a
checkpoint. After gathering all checkpoint done messages from TMs, JM sends
a "notify checkpoint completed" RPC to all TMs. Operators will handle this
notification, where checkpoint success callbacks are invoked. For example,
Kafka sources commit the current consuming offset. I think this doc (
https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/)
may be helpful.

You can override the `notifyCheckpointComlete()` to customize the behavior
of handling checkpoint completion.

Best regards Rui Xia

On Fri, Mar 8, 2024 at 3:03 AM Jacob Rollings 
wrote:

>
> Hello,
>
> I am implementing proof of concepts based Flink realtime streaming
> solutions.
>
> I came across below lines in out-of-the-box Flink Kafka connector
> documents.
>
>
>
> *https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/*
> 
>
> *Consumer Offset Committing #
> *
>
> *Kafka source commits the current consuming offset when checkpoints
> are completed, for ensuring the consistency between Flink’s checkpoint
> state and committed offsets on Kafka brokers*.
>
>
> How is Flink able to control the callbacks from checkpointing? Is there a
> way to override this into my implementations. I have multiple upstream
> sources to connect to depending on the business model which are not Kafka.
> Based on criticality of the system and publisher dependencies, we cannot
> switch to Kafka for these. So I was hoping to do the same which kafka
> connector is doing.
>
>
> Cheers,
>
> JR
>


Re:Re: Handling late events with Table API / SQL

2024-03-07 Thread Xuyang
Hi, Sunny.
A watermark always comes from one subtask of this window operator's input(s),  
and this window operator will retain all watermarks about multi input subtasks.
The `currentWatermark` in the window operator is the min value of these 
watermarks.

--

Best!
Xuyang




At 2024-03-07 23:03:39, "Sunny S"  wrote:

Thanks for the response! Sad that that side output for late data is not 
supported in Table API and SQL. I will start the discussions regarding this.


In the meanwhile, I am trying to use the built-in function 
CURRENT_WATERMARK(rowtime) to be able to collect late data. The scenario I have 
is : I am creating a table with Kafka connector and defining the watermark in 
that table. Reference to this table definition can be found in the mail above. 
Next, I apply a tumbling window SQL query on this table. I want to collect the 
late data for this window operation. I am not clear how would CURRENT_WATERMARK 
function help me in getting the late data for the window operator.


Also, I am a bit confused regarding the way we determine if an event is late 
for a window operator. From the WindowOperator code : 


protected boolean isElementLate(StreamRecord element) {
return (windowAssigner.isEventTime())
&& (element.getTimestamp() + allowedLateness
<= internalTimerService.currentWatermark());
}


it seems the operator maintains a currentWatermark. I am trying to understand 
how does this currentWatermark change during the course of the operator 
receiving the first event that belongs to this window until the time this 
window fires.  


Please help understanding these.


Thanks 


















From: Feng Jin 
Sent: 06 March 2024 07:08
To: Sunny S 
Cc: user@flink.apache.org 
Subject: Re: Handling late events with Table API / SQL
 


You can use the  CURRENT_WATERMARK(rowtime)  function for some filtering, 
please refer to [1] for details.




https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/


Best,
Feng



On Wed, Mar 6, 2024 at 1:56 AM Sunny S  wrote:

Hi,


I am using Flink SQL to create a table something like this :


CREATE TABLE some-table ( 
  ...,
  ...,
  ...,
  ...,
  event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
  WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'some-topic', +
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'value.format' = 'csv'
)


I want to understand how can I deal with late events / out of order events when 
using Flink SQL / Table API? How can I collect the late / out of order events 
to a side output with Table API / SQL?


Thanks 

Re: Re:RE: RE: flink cdc动态加表不生效

2024-03-07 Thread Hongshun Wang
Hi, casel chan,
社区已经对增量框架实现动态加表(https://github.com/apache/flink-cdc/pull/3024
),预计3.1对mongodb和postgres暴露出来,但是Oracle和Sqlserver目前并没暴露,你可以去社区参照这两个框架,将参数打开,并且测试和适配。
Best,
Hongshun


Re:Window properties can only be used on windowed tables

2024-03-07 Thread 周尹
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
ListPerson list = new ArrayList();list.add(new 
Person("Fred",35));list.add(new Person("Wilma",35));
list.add(new Person("Pebbles",2));DataStreamPerson flintstones 
= env.fromCollection(list);// 为数据流定义事件时间属性
DataStreamPerson flintstonesWithTime = 
flintstones.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractorPerson(Time.seconds(0)) {   
 @Overridepublic long extractTimestamp(Person element) {  
  // 这里假设您的数据流中的每个元素都包含一个时间戳字段,您可以根据实际情况进行修改return 
System.currentTimeMillis(); // 也可以使用您的数据中的时间字段}});   
 // 将DataStream转换为Table时定义窗口Table table = 
tEnv.fromDataStream(flintstonesWithTime, $("name"), $("age"), 
$("eventTime").rowtime());Table select = 
table.window(Tumble.over(lit(10).seconds()).on($("eventTime")).as("w"))
.groupBy($("name"), $("w")).select($("name"), 
$("age").sum());tEnv.toAppendStream(select, 
Row.class).print();env.execute("Flink Window 
Example");}public static class Person{public String 
name;public Integer age;public Person(){}public 
Person(String name,Integer age){this.name = name;
this.age = age;}public String toString(){return 
this.name.toString()+":age "+this.age.toString();}}
At 2024-03-08 09:28:10, "ha.fen...@aisino.com"  wrote:
>public static void main(String[] args) {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>List list = new ArrayList<>();
>list.add(new Persion("Fred",35));
>list.add(new Persion("wilma",35));
>list.add(new Persion("Pebbles",2));
>DataStream flintstones = env.fromCollection(list);
>Table table = tEnv.fromDataStream(flintstones);
>Table select = table.select($("name"), $("age"), 
> $("addtime").proctime());
>Table select1 = select.window(
>Tumble.over(lit(10).second())
>.on($("addtime"))
>.as("w"))
>.groupBy($("name"), $("w"))
>.select($("name"), $("age").sum());
>select1.execute().print();
>
>}
>
>public static class Persion{
>public String name;
>public Integer age;
>public Persion(){}
>public Persion(String name,Integer age){
>this.name = name;
>this.age = age;
>}
>public String toString(){
>return this.name.toString()+":age "+this.age.toString();
>}
>}
>
>提示Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Window properties can only be used on windowed tables
>是哪里错了?


Re:Window properties can only be used on windowed tables

2024-03-07 Thread 周尹
在非窗口化的表上使用窗口属性
At 2024-03-08 09:28:10, "ha.fen...@aisino.com"  wrote:
>public static void main(String[] args) {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>List list = new ArrayList<>();
>list.add(new Persion("Fred",35));
>list.add(new Persion("wilma",35));
>list.add(new Persion("Pebbles",2));
>DataStream flintstones = env.fromCollection(list);
>Table table = tEnv.fromDataStream(flintstones);
>Table select = table.select($("name"), $("age"), 
> $("addtime").proctime());
>Table select1 = select.window(
>Tumble.over(lit(10).second())
>.on($("addtime"))
>.as("w"))
>.groupBy($("name"), $("w"))
>.select($("name"), $("age").sum());
>select1.execute().print();
>
>}
>
>public static class Persion{
>public String name;
>public Integer age;
>public Persion(){}
>public Persion(String name,Integer age){
>this.name = name;
>this.age = age;
>}
>public String toString(){
>return this.name.toString()+":age "+this.age.toString();
>}
>}
>
>提示Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Window properties can only be used on windowed tables
>是哪里错了?


Re:Window properties can only be used on windowed tables

2024-03-07 Thread Xuyang
Hi, fengqi.
这看起来像是select语句中,不能直接使用非来源于window 
agg的proctime或者event函数。目前不确定这是不是预期行为,方便的话可以在社区jira[1]上提一个bug看看。
快速绕过的话,可以试试下面的代码:



DataStream flintstones = env.fromCollection(list); // Table select = 
table.select($("name"), $("age"), $("addtime").proctime()); Table table = 
tEnv.fromDataStream( flintstones, Schema.newBuilder() .column("name", "string") 
.column("age", "int") .columnByExpression("addtime", "proctime()") .build()); 
Table select1 = // select.xxx 
table.window(Tumble.over(lit(10).second()).on($("addtime")).as("w")) 
.groupBy($("name"), $("w")) .select($("name"), $("age").sum()); 
select1.execute().print();


[1] https://issues.apache.org/jira/projects/FLINK/issues

--

Best!
Xuyang





At 2024-03-08 09:28:10, "ha.fen...@aisino.com"  wrote:
>public static void main(String[] args) {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>List list = new ArrayList<>();
>list.add(new Persion("Fred",35));
>list.add(new Persion("wilma",35));
>list.add(new Persion("Pebbles",2));
>DataStream flintstones = env.fromCollection(list);
>Table table = tEnv.fromDataStream(flintstones);
>Table select = table.select($("name"), $("age"), 
> $("addtime").proctime());
>Table select1 = select.window(
>Tumble.over(lit(10).second())
>.on($("addtime"))
>.as("w"))
>.groupBy($("name"), $("w"))
>.select($("name"), $("age").sum());
>select1.execute().print();
>
>}
>
>public static class Persion{
>public String name;
>public Integer age;
>public Persion(){}
>public Persion(String name,Integer age){
>this.name = name;
>this.age = age;
>}
>public String toString(){
>return this.name.toString()+":age "+this.age.toString();
>}
>}
>
>提示Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Window properties can only be used on windowed tables
>是哪里错了?


Re: Re: Running Flink SQL in production

2024-03-07 Thread Feng Jin
Hi,

If you need to use Flink SQL in a production environment, I think it would
be better to use the Table API [1] and package it into a jar.
Then submit the jar to the cluster environment.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/common/#sql

Best,
Feng

On Thu, Mar 7, 2024 at 9:56 PM Xuyang  wrote:

> Hi.
> Hmm, if I'm mistaken, please correct me. Using a SQL client might not be
> very convenient for those who need to verify the
> results of submissions, such as checking for exceptions related to
> submission failures, and so on.
>
>
> --
> Best!
> Xuyang
>
>
> 在 2024-03-07 17:32:07,"Robin Moffatt"  写道:
>
> Thanks for the reply.
> In terms of production, my thinking is you'll have your SQL in a file
> under code control. Whether that SQL ends up getting submitted via an
> invocation of SQL Client with -f or via REST API seems moot. WDYT?
>
>
>
> On Thu, 7 Mar 2024 at 01:53, Xuyang  wrote:
>
>> Hi, IMO, both the SQL Client and the Restful API can provide connections
>> to the SQL Gateway service for submitting jobs. A slight difference is that
>> the SQL Client also offers a command-line visual interface for users to
>> view results.
>> In your production scenes, placing the SQL to be submitted into a file
>> and then using the '-f' command in SQL Client to submit the file sounds a
>> bit roundabout. You can just use the Restful API to submit them directly?
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2024-03-07 04:11:01, "Robin Moffatt via user" 
>> wrote:
>>
>> I'm reading the deployment guide[1] and wanted to check my understanding.
>> For deploying a SQL job into production, would the pattern be to write the
>> SQL in a file that's under source control, and pass that file as an
>> argument to SQL Client with -f argument (as in this docs example[2])?
>> Or script a call to the SQL Gateway's REST API?
>>
>> Are there pros and cons to each approach?
>>
>> thanks, Robin
>>
>> [1]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/
>> [2]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sqlclient/#execute-sql-files
>>
>>


Re: Using User-defined Table Aggregates Functions in SQL

2024-03-07 Thread Jad Naous
Hi Junrui,
Thank you for the pointer. I had read that page, and I can use the function
with the Java Table API ok, but I'm trying to use the Top2 accumulator with
a SQL function. I can't use a left lateral join on it since the planner
fails with "not a table function". I don't think a join is the right thing
anyway, since it's an aggregation table function.

tEnv.createTemporaryFunction("TOP2", Top2.class);
>
> var calculated2 = tEnv.sqlQuery(
> "SELECT " +
> "  TUMBLE_START(ts, INTERVAL '1' SECOND) as w_start, " +
> "  TUMBLE_END(ts, INTERVAL '1' SECOND) as w_end, " +
> "  TUMBLE_ROWTIME(ts, INTERVAL '1' SECOND) as w_rowtime, " +
> "  id, " +
> "  top1, " +
> "  top2 " +
> "FROM " +
> "  source " +
> "  LEFT JOIN LATERAL TABLE(TOP2(val)) ON TRUE " +
> "GROUP BY " +
> "  TUMBLE(ts, INTERVAL '1' SECOND), " +
> "  id"
> ).printExplain();
>

Gives the following:

   org.apache.flink.table.api.ValidationException: SQL validation failed.
> Function 'default_catalog.default_database.TOP2' cannot be used as a table
> function.
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
> at
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
> at
> app//org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
> at
> app//io.grepr.query.MetricsTableApiTest.test(MetricsTableApiTest.java:129)
> Caused by:
> org.apache.flink.table.api.ValidationException: Function
> 'default_catalog.default_database.TOP2' cannot be used as a table function.
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.verifyFunctionKind(FunctionCatalogOperatorTable.java:200)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:133)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:126)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
> at java.base@11.0.22
> /java.util.Optional.flatMap(Optional.java:294)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:100)
> at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1310)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:993)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)


Jad Naous

Grepr, CEO/Founder


ᐧ

On Thu, Mar 7, 2024 at 9:43 AM Junrui Lee  wrote:

> Hi Jad,
>
> You can refer to the CREATE FUNCTION section (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function)
> and the Table Aggregate Functions section (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions)
> for details on creating and using these functions.
>
> Best regards,
> Junrui
>
> Jad Naous  于2024年3月7日周四 22:19写道:
>
>> Hi,
>> The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it
>> possible to use them with SQL?
>> Thanks,
>> Jad Naous
>> 
>> Grepr, CEO/Founder
>>
>> ᐧ
>>
>


Fwd: Flink Checkpoint & Offset Commit

2024-03-07 Thread Jacob Rollings
Hello,

I am implementing proof of concepts based Flink realtime streaming
solutions.

I came across below lines in out-of-the-box Flink Kafka connector documents.


*https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/*


*Consumer Offset Committing #
*

*Kafka source commits the current consuming offset when checkpoints
are completed, for ensuring the consistency between Flink’s checkpoint
state and committed offsets on Kafka brokers*.


How is Flink able to control the callbacks from checkpointing? Is there a
way to override this into my implementations. I have multiple upstream
sources to connect to depending on the business model which are not Kafka.
Based on criticality of the system and publisher dependencies, we cannot
switch to Kafka for these. So I was hoping to do the same which kafka
connector is doing.


Cheers,

JR


Re: Using User-defined Table Aggregates Functions in SQL

2024-03-07 Thread Junrui Lee
Hi Jad,

You can refer to the CREATE FUNCTION section (
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function)
and the Table Aggregate Functions section (
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions)
for details on creating and using these functions.

Best regards,
Junrui

Jad Naous  于2024年3月7日周四 22:19写道:

> Hi,
> The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it
> possible to use them with SQL?
> Thanks,
> Jad Naous
> 
> Grepr, CEO/Founder
>
> ᐧ
>


Re: Handling late events with Table API / SQL

2024-03-07 Thread Sunny S
Thanks for the response! Sad that that side output for late data is not 
supported in Table API and SQL. I will start the discussions regarding this.

In the meanwhile, I am trying to use the built-in function 
CURRENT_WATERMARK(rowtime) to be able to collect late data. The scenario I have 
is : I am creating a table with Kafka connector and defining the watermark in 
that table. Reference to this table definition can be found in the mail above. 
Next, I apply a tumbling window SQL query on this table. I want to collect the 
late data for this window operation. I am not clear how would CURRENT_WATERMARK 
function help me in getting the late data for the window operator.

Also, I am a bit confused regarding the way we determine if an event is late 
for a window operator. From the WindowOperator code :

protected boolean isElementLate(StreamRecord element) {
return (windowAssigner.isEventTime())
&& (element.getTimestamp() + allowedLateness
<= internalTimerService.currentWatermark());
}

it seems the operator maintains a currentWatermark. I am trying to understand 
how does this currentWatermark change during the course of the operator 
receiving the first event that belongs to this window until the time this 
window fires.

Please help understanding these.

Thanks










From: Feng Jin 
Sent: 06 March 2024 07:08
To: Sunny S 
Cc: user@flink.apache.org 
Subject: Re: Handling late events with Table API / SQL


You can use the  CURRENT_WATERMARK(rowtime)  function for some filtering, 
please refer to [1] for details.


https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/

Best,
Feng

On Wed, Mar 6, 2024 at 1:56 AM Sunny S 
mailto:sunny8...@outlook.in>> wrote:
Hi,

I am using Flink SQL to create a table something like this :

CREATE TABLE some-table (
  ...,
  ...,
  ...,
  ...,
  event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
  WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'some-topic', +
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'value.format' = 'csv'
)

I want to understand how can I deal with late events / out of order events when 
using Flink SQL / Table API? How can I collect the late / out of order events 
to a side output with Table API / SQL?

Thanks


Using User-defined Table Aggregates Functions in SQL

2024-03-07 Thread Jad Naous
Hi,
The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it
possible to use them with SQL?
Thanks,
Jad Naous

Grepr, CEO/Founder

ᐧ


Re: TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Note, that in Java code, it prints `State: Null`, `State: Null`, as I was 
expecting in, unlike pyflink code
On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote:
> Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
> work. I have reproduced the exact same code in Java and it works!
>
> Is this a pyflink bug? If so - how can I report it? If not - what can I try 
> to do?
>
> Flink: 1.18.0
> image: flink:1.18.0-scala_2.12-java11
>
> Code to reproduce. I expect this code to print:  all 
> the time. But it prints  and state value
>
> ```python
> import time
>
> from datetime import datetime
>
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
>
>
> class Processor(KeyedProcessFunction):
>     def open(self, runtime_context: RuntimeContext):
>         state_descriptor = ValueStateDescriptor(
>             name="my_state",
>             value_type_info=Types.STRING(),
>         )
>
>         state_descriptor.enable_time_to_live(
>             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
>             .cleanup_incrementally(cleanup_size=10, 
> run_cleanup_for_every_record=True)
>             .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             
> .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .build()
>         )
>
>         self.state = runtime_context.get_state(state_descriptor)
>
>     def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
>         current_state = self.state.value()
>
>         print(datetime.now(), current_state)
>
>         if current_state is None:
>             self.state.update(str(datetime.now()))
>
>         time.sleep(1.5)
>
>
> if __name__ == "__main__":
>     # - Init environment
>
>     environment = 
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>
>     # - Setup pipeline
>
>     (
>         environment.set_parallelism(1)
>         .from_collection(
>             collection=list(range(10)),
>         )
>         .key_by(lambda value: 0)
>         .process(Processor())
>
>
>
>     )
>
>     # - Execute pipeline
>
>     environment.execute("ttl_test")
>
>
>
> ```
>
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.time.LocalDateTime;
>
> public class GameHistoryProcessor extends KeyedProcessFunction String, String> {
>
>
>     private transient ValueState state;
>
>
>     @Override
>     public void open(Configuration parameters) {
>         var stateTtlConfig = StateTtlConfig
>                 .newBuilder(Time.seconds(1))
> //                .cleanupFullSnapshot()
>                 .cleanupIncrementally(10, true)
>                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>                 
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                 .build();
>
>         var stateDescriptor = new ValueStateDescriptor<>("state", 
> String.class);
>         stateDescriptor.enableTimeToLive(stateTtlConfig);
>
>         state = getRuntimeContext().getState(stateDescriptor);
>
>     }
>
>     @Override
>     public void processElement(String event, Context context, 
> Collector collector) throws IOException, InterruptedException {
>         var state = state.value();
>         System.out.println("State: " + state);
>
>         if (state == null) {
>             state = LocalDateTime.now().toString();
>             state.update(state);
>         }
>
>         Thread.sleep(1500);
>     }
> }```


TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
work. I have reproduced the exact same code in Java and it works!

Is this a pyflink bug? If so - how can I report it? If not - what can I try to 
do?

Flink: 1.18.0
image: flink:1.18.0-scala_2.12-java11

Code to reproduce. I expect this code to print:  all 
the time. But it prints  and state value

```python
import time

from datetime import datetime

from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor(
            name="my_state",
            value_type_info=Types.STRING(),
        )

        state_descriptor.enable_time_to_live(
            ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
            .cleanup_incrementally(cleanup_size=10, 
run_cleanup_for_every_record=True)
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
        )

        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        current_state = self.state.value()

        print(datetime.now(), current_state)

        if current_state is None:
            self.state.update(str(datetime.now()))

        time.sleep(1.5)


if __name__ == "__main__":
    # - Init environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Setup pipeline

    (
        environment.set_parallelism(1)
        .from_collection(
            collection=list(range(10)),
        )
        .key_by(lambda value: 0)
        .process(Processor())



    )

    # - Execute pipeline

    environment.execute("ttl_test")



```

```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.time.LocalDateTime;

public class GameHistoryProcessor extends KeyedProcessFunction {


    private transient ValueState state;


    @Override
    public void open(Configuration parameters) {
        var stateTtlConfig = StateTtlConfig
                .newBuilder(Time.seconds(1))
//                .cleanupFullSnapshot()
                .cleanupIncrementally(10, true)
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
        stateDescriptor.enableTimeToLive(stateTtlConfig);

        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(String event, Context context, Collector 
collector) throws IOException, InterruptedException {
        var state = state.value();
        System.out.println("State: " + state);

        if (state == null) {
            state = LocalDateTime.now().toString();
            state.update(state);
        }

        Thread.sleep(1500);
    }
}```


使用avro schema注册confluent schema registry失败

2024-03-07 Thread casel.chen
我使用注册kafka topic对应的schema到confluent schema registry时报错,想知道问题的原因是什么?如何fix?




io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Schema being registered is incompatible with an earlier schema for subject 
"rtdp_test-test_schema-value", details: 
[{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 
'salary' at path '/fields/10/type/fields/5' in the new schema has no default 
value and is missing in the old schema', additionalInfo:'salary'}, 
{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 
'salary' at path '/fields/11/type/fields/5' in the new schema has no default 
value and is missing in the old schema', additionalInfo:'salary'}, 
{oldSchemaVersion: 4}, {oldSchema: 
'{"type":"record","name":"Envelope","namespace":"rtdp_test-test_schema","fields":[{"name":"database","type":"string"},{"name":"es","type":"long"},{"name":"id","type":"int"},{"name":"isDdl","type":"boolean"},{"name":"sql","type":"string"},{"name":"table","type":"string"},{"name":"ts","type":"long"},{"name":"type","type":"string"},{"name":"pkNames","type":{"type":"array","items":"string"}},{"name":"data","type":[{"type":"array","items":{"type":"record","name":"Value","fields":[{"name":"id","type":["long","null"],"default":0},{"name":"create_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"update_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"name","type":["string","null"],"default":""},{"name":"gender","type":["string","null"],"default":""}]}},"null"]},{"name":"mysqlType","type":{"type":"record","name":"mysqlType","fields":[{"name":"id","type":"string"},{"name":"create_time","type":"string"},{"name":"update_time","type":"string"},{"name":"name","type":"string"},{"name":"gender","type":"string"}]}},{"name":"sqlType","type":{"type":"record","name":"sqlType","fields":[{"name":"id","type":"int"},{"name":"create_time","type":"int"},{"name":"update_time","type":"int"},{"name":"name","type":"int"},{"name":"gender","type":"int"}]}},{"name":"old","type":[{"type":"array","items":"Value"},"null"]}]}'},
 {validateFields: 'false', compatibility: 'BACKWARD'}]; error code: 409

at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)

at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)




maven依赖:



io.confluent

kafka-schema-registry-client

7.3.1






java代码:

Schema decimalSchema = LogicalTypes.decimal(precision, 
scale).addToSchema(SchemaBuilder.builder().bytesType());

data = 
data.name(columnName).type().unionOf().nullType().and().type(decimalSchema).endUnion().nullDefault();




salary字段是decimal类型,报错是说之前有一个不带salary字段版本的schema,而新版本schema里该salary字段定义中缺少default
 value,可我明明设置了nullDefault呀,这一点从生成的avro schema json string也可验证:




{

"type": "record",

"name": "Envelope",

"namespace": "rtdp_test-test_schema",

"fields": [

{

"name": "database",

"type": "string"

},

{

"name": "es",

"type": "long"

},

{

"name": "id",

"type": "int"

},

{

"name": "isDdl",

"type": "boolean"

},

{

"name": "sql",

"type": "string"

},

{

"name": "table",

"type": "string"

},

{

"name": "ts",

"type": "long"

},

{

"name": "type",

"type": "string"

},

{

"name": "pkNames",

"type": {

"type": "array",

"items": "string"

}

},

{

"name": "data",

"type": [

{

"type": "array",

"items": {

"type": "record",

"name": "Value",

"fields": [

{

"name": "id",

"type": [

"null",

"long"

],

"default": null

},

{

"name": "create_time",

"type": [

"null",

{

"type": "long",

"logicalType": "timestamp-millis"

}

],

"default": null

},

{


Re:Re: Running Flink SQL in production

2024-03-07 Thread Xuyang
Hi. 
Hmm, if I'm mistaken, please correct me. Using a SQL client might not be very 
convenient for those who need to verify the 
results of submissions, such as checking for exceptions related to submission 
failures, and so on.




--

Best!
Xuyang




在 2024-03-07 17:32:07,"Robin Moffatt"  写道:

Thanks for the reply. 
In terms of production, my thinking is you'll have your SQL in a file under 
code control. Whether that SQL ends up getting submitted via an invocation of 
SQL Client with -f or via REST API seems moot. WDYT? 







On Thu, 7 Mar 2024 at 01:53, Xuyang  wrote:

Hi, IMO, both the SQL Client and the Restful API can provide connections to the 
SQL Gateway service for submitting jobs. A slight difference is that the SQL 
Client also offers a command-line visual interface for users to view results. 
In your production scenes, placing the SQL to be submitted into a file and then 
using the '-f' command in SQL Client to submit the file sounds a bit 
roundabout. You can just use the Restful API to submit them directly?




--

Best!
Xuyang




At 2024-03-07 04:11:01, "Robin Moffatt via user"  wrote:

I'm reading the deployment guide[1] and wanted to check my understanding. For 
deploying a SQL job into production, would the pattern be to write the SQL in a 
file that's under source control, and pass that file as an argument to SQL 
Client with -f argument (as in this docs example[2])?
Or script a call to the SQL Gateway's REST API? 


Are there pros and cons to each approach?


thanks, Robin


[1]: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/
[2]: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sqlclient/#execute-sql-files