Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-10 文章 Dian Fu
大部分情况下,可以work,但是有一些边界的情况,可能会有问题。比如第一个sink的作业,由于某种原因,处理得比较慢,延迟比较大? 也就是说,通常情况下可能没有问题,但是由于这2个作业之间没有任何依赖关系,这个先后顺序是得不到保证的。 我觉得你可以测一下,如果能接受那些极端情况,就可以。 > 在 2020年7月10日,下午5:08,lgs <9925...@qq.com> 写道: > > 代码顺序是指我先写第一个sink的代码,再写第二个sink的代码。 > > 我设置了'connector.write.flush.max-rows' = '1' >

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-10 文章 lgs
代码顺序是指我先写第一个sink的代码,再写第二个sink的代码。 我设置了'connector.write.flush.max-rows' = '1' 第一个sink没有窗口,所以直接写了 第二个sink有窗口,所以是会在一个小时的最后触发。 可能这样就能保证第二个sink能够读到最新的数据。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-10 文章 Dian Fu
我不太明白你说的“代码顺序”指的什么? 据我所知,应该没有什么太好的办法。从执行图上来看,这2个之间没有依赖关系,所以也就无法保证先后顺序。 如果必须这样干的话,你得从业务的角度想一下,改造一下业务逻辑。 > 在 2020年7月10日,下午4:10,lgs <9925...@qq.com> 写道: > > 这次可以了。谢谢 > > 另外还有一个问题请教一下: > 我实际上是有另一个sink,source是同一个。 > 第一个sink是直接保存kafka数据到DB。 > 第二个sink是读取kafka,tumble window,然后在udf里面去读取DB。 > >

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-10 文章 lgs
这次可以了。谢谢 另外还有一个问题请教一下: 我实际上是有另一个sink,source是同一个。 第一个sink是直接保存kafka数据到DB。 第二个sink是读取kafka,tumble window,然后在udf里面去读取DB。 要怎么样保证第一个sink写完了DB,然后第二个sink的udf能读取到最新的数据? 代码的顺序就能保证吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-10 文章 Dian Fu
这样再试试? tmp_table = st_env.scan("source") \ .where("action === 'Insert'") \ .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \ .group_by("hourlywindow") \ .select("action.max as action1, conv_string(eventTime.collect) as etlist,

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-10 文章 lgs
谢谢建议。 我照着代码试了一下,发现还是一样的结果。 udf还是会被调用两次 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 Dian Fu
好的,针对你这个case,这个是个已知问题:https://issues.apache.org/jira/browse/FLINK-15973 ,暂时还没有修复。 你可以这样改写一下,应该可以绕过去这个问题: table = st_env.scan("source") \ .where("action === 'Insert'") \

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 lgs
谢谢提示。 我打印出来explain,发现确实调用了两次udf,条件是那个eventtime.isNotNull: st_env.scan("source") \ .where("action === 'Insert'") \ .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \ .group_by("hourlywindow") \ .select("action.max as action1,

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 Dian Fu
Table API的作业在执行之前会经过一系列的rule优化,最终的执行计划,存在一个UDF调用多次的可能,你可以把执行计划打印出来看看(TableEnvironment#explain)。 具体原因,需要看一下作业逻辑。可以发一下你的作业吗?可重现代码即可。 > 在 2020年7月9日,下午5:50,lgs <9925...@qq.com> 写道: > > Hi, > > 我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。 >

pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 lgs
Hi, 我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。 log显示这个udf会被调用两次。相隔不到一秒。这个是什么原因?requests库跟beam冲突了? 2020-07-09 17:44:17,501 INFO flink_test_stream_time_kafka.py:22 [] - start to ad 2020-07-09 17:44:17,530 INFO