谢谢提示。 我打印出来explain,发现确实调用了两次udf,条件是那个eventtime.isNotNull:
st_env.scan("source") \ .where("action === 'Insert'") \ .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \ .group_by("hourlywindow") \ .select("action.max as action1, conv_string(eventTime.collect) as etlist, hourlywindow.start as time1") \ .select("action1 as action, hbf_thres(etlist) as eventtime, time1 as actiontime") \ * .filter("eventtime.isNotNull") \ * .insert_into("alarm_ad") LegacySink(name=[`default_catalog`.`default_database`.`alarm_ad`], fields=[action, eventtime, actiontime]) +- Calc(select=[EXPR$0 AS action, f0 AS eventtime, EXPR$2 AS actiontime]) * +- PythonCalc(select=[EXPR$0, EXPR$2, simple_udf(f0) AS f0]) +- Calc(select=[EXPR$0, EXPR$2, UDFLength(EXPR$1) AS f0], where=[IS NOT NULL(f0)]) * +- PythonCalc(select=[EXPR$0, EXPR$1, EXPR$2, simple_udf(f0) AS f0]) +- Calc(select=[EXPR$0, EXPR$1, EXPR$2, UDFLength(EXPR$1) AS f0]) +- GroupWindowAggregate(window=[TumblingGroupWindow('hourlywindow, actionTime, 3600000)], properties=[EXPR$2], select=[MAX(action) AS EXPR$0, COLLECT(eventTime) AS EXPR$1, start('hourlywindow) AS EXPR$2]) +- Exchange(distribution=[single]) +- Calc(select=[recordId, action, originalState, newState, originalCause, newCause, ser_name, enb, eventTime, ceasedTime, duration, acked, pmdId, pmdTime, actionTime], where=[=(action, _UTF-16LE'Insert')]) +- Reused(reference_id=[1]) 我这里是想过滤python udf的返回,如果返回是空,我就不要sink。是我的sql写错了吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/