SQL没有表达这种“最早一分钟”的逻辑。 如果在你的消息的开头,插入一个temperature=0的消息,那么你得到的第一个输出diff_temperature=0,不知道这种方式是否可以接受。
发件人: Chennet Steven <stevenchen...@live.com> 发送时间: Thursday, November 14, 2019 5:32 PM 收件人: user-zh@flink.apache.org; Yuan,Youjun <yuanyou...@baidu.com> 主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟 diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null? 运行得到如下结果: {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} ---- 这分钟的1.3 是否能有方法设置为null? {"deviceid":"dev1","diff_temperature":0.3,"ts":119999} {"deviceid":"dev1","diff_temperature":0.1,"ts":179999} From stevenchen webchat 38798579 ________________________________ 发件人: Yuan,Youjun <yuanyou...@baidu.com<mailto:yuanyou...@baidu.com>> 发送时间: Wednesday, November 13, 2019 11:34:53 PM 收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> <user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>> 主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 这个场景应可以通过标准的SQL完成计算。大致思路如下: 1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口 2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS sum_temperature 3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。 假设输入消息有三个字段: Ts: 时间戳 Deviceid:设备编号 Temp: 设备温度 完整的SQL如下: INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM ( SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid ) ) 我用如下测试数据: "20000,dev1,1.2", "50000,dev1,1.3", "60000,dev1,1.4", "100000,dev1,1.5", "110000,dev1,1.6", "120000,dev1,1.7" 运行得到如下结果: {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} {"deviceid":"dev1","diff_temperature":0.3,"ts":119999} {"deviceid":"dev1","diff_temperature":0.1,"ts":179999} 如果你向完整的验证我的方法,你可以: 1,登陆 http://creek.baidubce.com/ 2,在作业订阅输入框,输入邮件末尾的作业定义(json) 3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定 耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。 作业定义(json): { "注释":{ "说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。", "输入示例": "1000,dev1,2.3", "输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999} }, "sources": [{ "schema": { "format": "CSV", "fields": [{ "name": "ts", "type": "SQL_TIMESTAMP" }, { "name": "deviceid", "type": "STRING" }, { "name": "temp", "type": "DOUBLE" }] }, "watermark": 0, "name": "mysrc", "eventTime": "ts", "type": "COLLECTION", "attr": { "input": [ "10000,dev1,1.1", "20000,dev1,1.2", "50000,dev1,1.3", "60000,dev1,1.4", "100000,dev1,1.5", "110000,dev1,1.6", "120000,dev1,1.7" ] } }], "sink": { "schema": { "format": "JSON" }, "name": "mysink", "type": "STDOUT" }, "name": "demojob", "timeType": "EVENTTIME", "sql": "INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) " } -----邮件原件----- 发件人: Chennet Steven <stevenchen...@live.com<mailto:stevenchen...@live.com>> 发送时间: Wednesday, November 13, 2019 3:36 PM 收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> 主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么? 请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥? From stevenchen webchat 38798579 发件人: Dian Fu<mailto:dian0511...@gmail.com> 发送时间: Thursday, November 7, 2019 19:41 收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org<mailto:user-zh@flink.apache.org%3cmailto:user-zh@flink.apache.org>> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java> > 在 2019年11月7日,下午7:06,Chennet Steven > <stevenchen...@live.com<mailto:stevenchen...@live.com>> 写道: > > 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如 > 何在自定义函数中使用, > 能否给个example或者是test代码的链接啊? > > From stevenchen > webchat 38798579 > > ________________________________ > 发件人: wenlong.lwl <wenlong88....@gmail.com<mailto:wenlong88....@gmail.com>> > 发送时间: Thursday, November 7, 2019 2:13:43 PM > 收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> > <user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>> > 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态 > > 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。 > > On Thu, 7 Nov 2019 at 09:22, Chennet Steven > <stevenchen...@live.com<mailto:stevenchen...@live.com>> wrote: > >> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex >> t >> 如何在聚合函数中使用State? >> >> >> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, >> TypeInformation} >> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import >> org.apache.flink.api.java.typeutils.TupleTypeInfo >> import org.apache.flink.table.functions.{AggregateFunction, >> FunctionContext} >> import java.lang.{Iterable => JIterable} >> >> >> class IntDiffSumAccumulator extends JTuple2[Int, Boolean] >> >> class IntDiffSumFunction extends AggregateFunction[Int, >> IntDiffSumAccumulator] { >> >> override def open(context: FunctionContext): Unit = { >> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State >> //getRuntimeContext.getState(desc) >> val a = this.hashCode() >> print(s"hashCode:$a") >> super.open(context) >> } >> >> override def createAccumulator(): IntDiffSumAccumulator = { >> val acc = new IntDiffSumAccumulator() >> acc.f0 = 0 >> acc.f1 = false >> acc >> } >> >> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = { >> accumulator.f0 += value >> accumulator.f1 = true >> } >> >> override def getValue(accumulator: IntDiffSumAccumulator): Int = { >> if (accumulator.f1) { >> >> accumulator.f0 >> } else { >> Int.MinValue >> } >> } >> >> def merge(acc: IntDiffSumAccumulator, its: >> JIterable[IntDiffSumAccumulator]) = { >> val iter = its.iterator() >> while (true) { >> val a = iter.next() >> if (a.f1) { >> acc.f0 += a.f0 >> acc.f1 = true >> } >> } >> } >> >> def resetAccumulator(acc: IntDiffSumAccumulator) = { >> acc.f0 = 0 >> acc.f1 = false >> } >> >> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] = >> new >> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, >> BasicTypeInfo.BOOLEAN_TYPE_INFO) >> } >> >> >> From stevenchen >> webchat 38798579 >> >> >>