Hi,
我定义了一个PatternProcessFunction,
public static class MyPatternProcessFunction extends
PatternProcessFunction implements
TimedOutPartialMatchHandler {
@Override
public void processMatch(Map> pattern,
PatternProcessFunction.Context ctx, Collector out) {
System.out.println ("pattern
schema是public
问题在这里:ERROR: column "recordid" of relation "alarm_history_data" does not
exist
数据库表里面是“recordId”,这里的提示变成了“recordid”
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi Jincheng,
我现在碰到同样的问题,udf运行的时候会打印这样的log:
2020-08-07 03:06:45,920 INFO
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory []
- Still waiting for startup of environment
'/usr/local/lib64/python3.6/site-packages/pyflink/bin/pyflink-udf-runner.sh'
for worker id 1-1
然后过一阵就pyfl
Hi,
postgres字段包含大小写。
postgres_sink = """
CREATE TABLE alarm_history_data (
`recordId` STRING,
`rowtime` TIMESTAMP(3),
`action` STRING,
`originalState`STRING,
`newState` STRING,
`originalCause`STRIN
谢谢。加上后就可以了。
改成原来的sql_update然后st_env.execute("job")好像也可以。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。
代码如下,使用了MATCH_RECOGNIZE:
s_env = StreamExecutionEnvironment.get_execution_environment()
b_s_settings =
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
st_env = StreamTableEnvironment.create(s_env,
envir
代码顺序是指我先写第一个sink的代码,再写第二个sink的代码。
我设置了'connector.write.flush.max-rows' = '1'
第一个sink没有窗口,所以直接写了
第二个sink有窗口,所以是会在一个小时的最后触发。
可能这样就能保证第二个sink能够读到最新的数据。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我也有类似的需求。
期望第一个sink能先执行,然后第二个sink再执行。因为第二个sink要去读第一个sink保存的数据。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
这次可以了。谢谢
另外还有一个问题请教一下:
我实际上是有另一个sink,source是同一个。
第一个sink是直接保存kafka数据到DB。
第二个sink是读取kafka,tumble window,然后在udf里面去读取DB。
要怎么样保证第一个sink写完了DB,然后第二个sink的udf能读取到最新的数据?
代码的顺序就能保证吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
谢谢建议。
我照着代码试了一下,发现还是一样的结果。
udf还是会被调用两次
--
Sent from: http://apache-flink.147419.n8.nabble.com/
谢谢提示。
我打印出来explain,发现确实调用了两次udf,条件是那个eventtime.isNotNull:
st_env.scan("source") \
.where("action === 'Insert'") \
.window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
.group_by("hourlywindow") \
.select("action.max as action1, conv_strin
Hi,
我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。
log显示这个udf会被调用两次。相隔不到一秒。这个是什么原因?requests库跟beam冲突了?
2020-07-09 17:44:17,501 INFO flink_test_stream_time_kafka.py:22
[] - start to ad
2020-07-09 17:44:17,530 INFO flink_test_stream_time_kafka.
是1个小时才到来。10:00- 11:00的数据,11:01分到来。
但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble
window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
source是kafka,有一个rowtime定义:
.field("rowtime", DataTypes.TIMESTAMP(0))
.rowtime(Rowtime()
.timestamps_from_field("actionTime")
.watermarks_periodic_bounded(6)
)
有两个sink,第一个sink是直接把kafa的数据保存到postgres。
第二个sink是定义一个1小时的tumble wind
14 matches
Mail list logo