官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口



---原始邮件---
发件人: "Zhefu PENG"<pengzf0...@gmail.com&gt;
发送时间: 2020年5月6日(周三) 晚上8:34
收件人: "user-zh"<user-zh@flink.apache.org&gt;;
主题: flink窗口函数AggregateFunction中,merge的作用和应用场景


Hi all,

在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。

之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。

代码(简单伪代码)如下:
简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
第二项是计算的方差结果,输出的结果是
class MergeAggregateExample extends AggregateFunction[Double, (Set,
Double), Double] {
&nbsp;&nbsp;&nbsp; override def createAccumulator(): (Set, Double) = {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //Set用来对数据进行去重, 后一项是计算方差后的结果
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; (Set(), 0.0)
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; override def add(item: Double, accumulator: (Set, Double)): 
(Set,
Double) = {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //判断新来的数据是否之前出现过,如果之前没出现过: 更新set, 
并且增量更新方差;如果没出现过,直接跳过
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new Tuple2(Set, Double)
&nbsp;&nbsp;&nbsp; }


&nbsp;&nbsp;&nbsp; override def getResult(accumulator: (Set, Double)): Double = 
{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 返回计算的方差结果
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; accumulator._2
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; override def merge(a: (Set, Double), b: (Set, Double)): 
(Set, Double) =
{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
&nbsp; }


目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。

Looking forward to your reply and help.

Best,
Zhefu

回复