回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-14 文章 Yuan,Youjun
SQL没有表达这种“最早一分钟”的逻辑。
如果在你的消息的开头,插入一个temperature=0的消息,那么你得到的第一个输出diff_temperature=0,不知道这种方式是否可以接受。

发件人: Chennet Steven 
发送时间: Thursday, November 14, 2019 5:32 PM
收件人: user-zh@flink.apache.org; Yuan,Youjun 
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟 
diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null?

运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":5}  这分钟的1.3 
是否能有方法设置为null?
{"deviceid":"dev1","diff_temperature":0.3,"ts":11}
{"deviceid":"dev1","diff_temperature":0.1,"ts":17}


From stevenchen
 webchat 38798579


发件人: Yuan,Youjun mailto:yuanyou...@baidu.com>>
发送时间: Wednesday, November 13, 2019 11:34:53 PM
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> 
mailto:user-zh@flink.apache.org>>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS 
sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。

假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度

完整的SQL如下:
INSERT INTO mysink
SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature
FROM (
SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER 
(PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS 
sum_temperature
FROM (
SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, 
deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, 
INTERVAL '60' SECOND), deviceid
)
)

我用如下测试数据:
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":5}
{"deviceid":"dev1","diff_temperature":0.3,"ts":11}
{"deviceid":"dev1","diff_temperature":0.1,"ts":17}

如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。

作业定义(json):
{
"注释":{
"说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row 
over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source 
type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
"输入示例": "1000,dev1,2.3",
"输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":5}
},
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "deviceid",
"type": "STRING"
},
{
"name": "temp",
"type": "DOUBLE"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input": [
"1,dev1,1.1",
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
]
}
}],
"sink": {
"schema": {
"format": "JSON"
        },
        "name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT ts, deviceid,  2 * max_temperature - 
sum_temperature AS diff_temperature FROM ( SELECT  deviceid, ts, 
max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts 
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT 
TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS 
max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), 
deviceid)) "
}



-邮件原件-
发件人: Chenn

回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-14 文章 Chennet Steven
Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟 
diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null?

运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":5}  这分钟的1.3 
是否能有方法设置为null?
{"deviceid":"dev1","diff_temperature":0.3,"ts":11}
{"deviceid":"dev1","diff_temperature":0.1,"ts":17}


From stevenchen
 webchat 38798579

________
发件人: Yuan,Youjun 
发送时间: Wednesday, November 13, 2019 11:34:53 PM
收件人: user-zh@flink.apache.org 
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS 
sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。

假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度

完整的SQL如下:
INSERT INTO mysink
SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature
FROM (
SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER 
(PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS 
sum_temperature
FROM (
SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, 
deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, 
INTERVAL '60' SECOND), deviceid
)
)

我用如下测试数据:
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":5}
{"deviceid":"dev1","diff_temperature":0.3,"ts":11}
{"deviceid":"dev1","diff_temperature":0.1,"ts":17}

如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。

作业定义(json):
{
"注释":{
"说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row 
over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source 
type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
"输入示例": "1000,dev1,2.3",
"输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":5}
},
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "deviceid",
"type": "STRING"
},
{
"name": "temp",
"type": "DOUBLE"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input": [
"1,dev1,1.1",
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
]
}
}],
"sink": {
"schema": {
        "format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT ts, deviceid,  2 * max_temperature - 
sum_temperature AS diff_temperature FROM ( SELECT  deviceid, ts, 
max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts 
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT 
TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS 
max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), 
deviceid)) "
}



-邮件原件-
发件人: Chennet Steven 
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: user-zh@flink.apache.org
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是
 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,

回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-13 文章 Yuan,Youjun
这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS 
sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。

假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度

完整的SQL如下:
INSERT INTO mysink 
SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature 
FROM ( 
SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER 
(PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS 
sum_temperature 
FROM (
SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, 
deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, 
INTERVAL '60' SECOND), deviceid
)
)

我用如下测试数据:
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":5}
{"deviceid":"dev1","diff_temperature":0.3,"ts":11}
{"deviceid":"dev1","diff_temperature":0.1,"ts":17}

如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。

作业定义(json):
{
"注释":{
"说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row 
over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source 
type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
"输入示例": "1000,dev1,2.3",
"输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":5}
},
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "deviceid",
"type": "STRING"
},
{
"name": "temp",
"type": "DOUBLE"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input": [
"1,dev1,1.1",
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
]
}
}],
"sink": {
"schema": {
"format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT ts, deviceid,  2 * max_temperature - 
sum_temperature AS diff_temperature FROM ( SELECT  deviceid, ts, 
max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts 
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT 
TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS 
max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), 
deviceid)) "
}



-邮件原件-
发件人: Chennet Steven  
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: user-zh@flink.apache.org
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是
 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?

From stevenchen
 webchat 38798579

发件人: Dian Fu<mailto:dian0511...@gmail.com>
发送时间: Thursday, November 7, 2019 19:41
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java
 
<https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/jav

回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-12 文章 Chennet Steven
场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是
 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?

From stevenchen
 webchat 38798579

发件人: Dian Fu
发送时间: Thursday, November 7, 2019 19:41
收件人: user-zh@flink.apache.org
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java
 

> 在 2019年11月7日,下午7:06,Chennet Steven  写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
> webchat 38798579
>
> 
> 发件人: wenlong.lwl 
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org 
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven  wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>>  override def open(context: FunctionContext): Unit = {
>>// Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>//getRuntimeContext.getState(desc)
>>val a = this.hashCode()
>>print(s"hashCode:$a")
>>super.open(context)
>>  }
>>
>>  override def createAccumulator(): IntDiffSumAccumulator = {
>>val acc = new IntDiffSumAccumulator()
>>acc.f0 = 0
>>acc.f1 = false
>>acc
>>  }
>>
>>  def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>accumulator.f0 += value
>>accumulator.f1 = true
>>  }
>>
>>  override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>if (accumulator.f1) {
>>
>>  accumulator.f0
>>} else {
>>  Int.MinValue
>>}
>>  }
>>
>>  def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>>val iter = its.iterator()
>>while (true) {
>>  val a = iter.next()
>>  if (a.f1) {
>>acc.f0 += a.f0
>>acc.f1 = true
>>  }
>>}
>>  }
>>
>>  def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>acc.f0 = 0
>>acc.f1 = false
>>  }
>>
>>  override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>> webchat 38798579
>>
>>
>>



回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-07 文章 Chennet Steven
在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
能否给个example或者是test代码的链接啊?

From stevenchen
 webchat 38798579


发件人: wenlong.lwl 
发送时间: Thursday, November 7, 2019 2:13:43 PM
收件人: user-zh@flink.apache.org 
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。

On Thu, 7 Nov 2019 at 09:22, Chennet Steven  wrote:

> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
> 如何在聚合函数中使用State?
>
>
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
> TypeInformation}
> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.functions.{AggregateFunction,
> FunctionContext}
> import java.lang.{Iterable => JIterable}
>
>
> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>
> class IntDiffSumFunction extends AggregateFunction[Int,
> IntDiffSumAccumulator] {
>
>   override def open(context: FunctionContext): Unit = {
> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
> //getRuntimeContext.getState(desc)
> val a = this.hashCode()
> print(s"hashCode:$a")
> super.open(context)
>   }
>
>   override def createAccumulator(): IntDiffSumAccumulator = {
> val acc = new IntDiffSumAccumulator()
> acc.f0 = 0
> acc.f1 = false
> acc
>   }
>
>   def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
> accumulator.f0 += value
> accumulator.f1 = true
>   }
>
>   override def getValue(accumulator: IntDiffSumAccumulator): Int = {
> if (accumulator.f1) {
>
>   accumulator.f0
> } else {
>   Int.MinValue
> }
>   }
>
>   def merge(acc: IntDiffSumAccumulator, its:
> JIterable[IntDiffSumAccumulator]) = {
> val iter = its.iterator()
> while (true) {
>   val a = iter.next()
>   if (a.f1) {
> acc.f0 += a.f0
> acc.f1 = true
>   }
> }
>   }
>
>   def resetAccumulator(acc: IntDiffSumAccumulator) = {
> acc.f0 = 0
> acc.f1 = false
>   }
>
>   override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO)
> }
>
>
> From stevenchen
>  webchat 38798579
>
>
>