自定义PatternProcessFunction编译出错,type erasure相关。

2020-08-11 文章 lgs
Hi, 我定义了一个PatternProcessFunction, public static class MyPatternProcessFunction extends PatternProcessFunction implements TimedOutPartialMatchHandler { @Override public void processMatch(Map> pattern, PatternProcessFunction.Context ctx, Collector out) { System.out.println ("pattern

Re: flink table api 中数据库字段大小写问题

2020-08-06 文章 lgs
schema是public 问题在这里:ERROR: column "recordid" of relation "alarm_history_data" does not exist 数据库表里面是“recordId”,这里的提示变成了“recordid” -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

2020-08-06 文章 lgs
Hi Jincheng, 我现在碰到同样的问题,udf运行的时候会打印这样的log: 2020-08-07 03:06:45,920 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/usr/local/lib64/python3.6/site-packages/pyflink/bin/pyflink-udf-runner.sh' for worker id 1-1 然后过一阵就pyfl

flink table api 中数据库字段大小写问题

2020-08-03 文章 lgs
Hi, postgres字段包含大小写。 postgres_sink = """ CREATE TABLE alarm_history_data ( `recordId` STRING, `rowtime` TIMESTAMP(3), `action` STRING, `originalState`STRING, `newState` STRING, `originalCause`STRIN

Re: flink1.11 pyflink stream job 退出

2020-07-21 文章 lgs
谢谢。加上后就可以了。 改成原来的sql_update然后st_env.execute("job")好像也可以。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 pyflink stream job 退出

2020-07-21 文章 lgs
python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。 代码如下,使用了MATCH_RECOGNIZE: s_env = StreamExecutionEnvironment.get_execution_environment() b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() st_env = StreamTableEnvironment.create(s_env, envir

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-10 文章 lgs
代码顺序是指我先写第一个sink的代码,再写第二个sink的代码。 我设置了'connector.write.flush.max-rows' = '1' 第一个sink没有窗口,所以直接写了 第二个sink有窗口,所以是会在一个小时的最后触发。 可能这样就能保证第二个sink能够读到最新的数据。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-10 文章 lgs
我也有类似的需求。 期望第一个sink能先执行,然后第二个sink再执行。因为第二个sink要去读第一个sink保存的数据。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-10 文章 lgs
这次可以了。谢谢 另外还有一个问题请教一下: 我实际上是有另一个sink,source是同一个。 第一个sink是直接保存kafka数据到DB。 第二个sink是读取kafka,tumble window,然后在udf里面去读取DB。 要怎么样保证第一个sink写完了DB,然后第二个sink的udf能读取到最新的数据? 代码的顺序就能保证吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 lgs
谢谢建议。 我照着代码试了一下,发现还是一样的结果。 udf还是会被调用两次 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 lgs
谢谢提示。 我打印出来explain,发现确实调用了两次udf,条件是那个eventtime.isNotNull: st_env.scan("source") \ .where("action === 'Insert'") \ .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \ .group_by("hourlywindow") \ .select("action.max as action1, conv_strin

pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 文章 lgs
Hi, 我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。 log显示这个udf会被调用两次。相隔不到一秒。这个是什么原因?requests库跟beam冲突了? 2020-07-09 17:44:17,501 INFO flink_test_stream_time_kafka.py:22 [] - start to ad 2020-07-09 17:44:17,530 INFO flink_test_stream_time_kafka.

Re: 一个source多个sink的同步问题

2020-07-07 文章 lgs
是1个小时才到来。10:00- 11:00的数据,11:01分到来。 但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

一个source多个sink的同步问题

2020-07-07 文章 lgs
source是kafka,有一个rowtime定义: .field("rowtime", DataTypes.TIMESTAMP(0)) .rowtime(Rowtime() .timestamps_from_field("actionTime") .watermarks_periodic_bounded(6) ) 有两个sink,第一个sink是直接把kafa的数据保存到postgres。 第二个sink是定义一个1小时的tumble wind