On Thursday, 28 February 2019, 5:32:00 pm GMT+8, 
thinktothi...@yahoo.com.INVALID <thinktothi...@yahoo.com.INVALID> wrote:  
 
   我明白了,感谢大家).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值, 
函数中会一条一条数据发送(a,1),(a,1),(a,1)).WindowOperator.processElement()函数中,收到数据后,调用 
windowState.add(element.value), 其实调的是 
HeapReducingState.add()函数,这个state值在WindowOperator.windowState.stateTable.primaryTable.state
 这个里边存着(key,value)  
HeapReducingState.add()中调用transform,最终调用ReduceTransformation.apply,该函数会调用reduce函数,在同一次window中,每来一个相同key,就更新一次,实现累加
       public V apply(V previousState, V value) throws Exception { return 
previousState != null ? reduceFunction.reduce(previousState, value) : value; }

    On Thursday, 28 February 2019, 4:42:00 pm GMT+8, mayangyang02 
<mayangyan...@imdada.cn> wrote:  
 
 @thinktothings
这个sum其实是维护在WindowOperator的state里的。
你看下WindowedStream#sum()会发现最终会调用到WindowedStream#reduce()。
而对reduce来说,这个state的实现类是HeapReducingState,对这个state来说,调用其add方法是就会实现聚合(对sum来说就是相加)。在window
 emit时,就会将这个维护的state发送出去。




原始邮件
发件人:Yaoting gongfall.for.you....@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年2月28日(周四) 15:29
主题:Re: flink wordcount中 sum是在什么时候,哪个地方调用的?


@thinktothings 不知道是否我理解正确,我觉得你可以对flink的一些计算流程不是很清楚。 SumAggregator内的reduce 
方法就可以计算出需要的“sum”结果。 
你一直问什么时候调用sum,是指代码中的“sum("count")”吗?这个在构建steamgraph的时候已经调用了,目的就是获取内部返回的SumAggregator对象。
 有个文章可以了解下。看下第一次即可:https://www.jianshu.com/p/13070729289c 如果我理解的不对,请忽略 On Thu, 
Feb 28, 2019 at 2:57 PM thinktothi...@yahoo.com.invalid wrote:  
--------------------------------------  ).本地环境: scala WordCount ,程序在附件中 
SocketWindowWordCountLocal.scala  ).输入数据:  a b a  ).设置的 
timeWindow(Time.seconds(20))  ).[问题]想调试Flink源码中具体在哪一步进行sum操作  
-------------------------------------------------  调试:  
).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值,  函数中会一条一条数据发送(a,1),(b,1),(a,1) 
 ).调用 StreamSink.processElement 函数打印输出结果  
).没明白地方,是在调用StreamSink.processElement之前,在哪个地方调用了sum,对相同key进行了聚合操作    On 
Thursday, 28 February 2019, 2:47:39 pm GMT+8, Yaoting Gong  
fall.for.you....@gmail.com wrote:    你好。  我不知道你的是什么项目的代码。我从flink 官方的样例代码 
SocketWindowWordCount.scala找到。  从sum跟进去,最终能找到一个SumAggregator。  On Thu, Feb 28, 
2019 at 2:34 PM thinktothi...@yahoo.com.invalid wrote:    Flink wordCount    
本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-------------------------------------package
  com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc    
import org.apache.flink.configuration.Configuration  import 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment  import 
org.apache.flink.streaming.api.windowing.time.Time    /**  * nc -lk 1234 输入数据  
*/  object SocketWindowWordCountLocal {    def main(args: Array[String]): Unit 
= {      val port = 1234  // get the execution environment  // val env: 
StreamExecutionEnvironment =  
StreamExecutionEnvironment.getExecutionEnvironment      val configuration : 
Configuration = new Configuration()  val timeout = "100000 s"  val 
timeoutHeartbeatPause = "1000000 s"  
configuration.setString("akka.ask.timeout",timeout)  
configuration.setString("akka.lookup.timeout",timeout)  
configuration.setString("akka.tcp.timeout",timeout)  
configuration.setString("akka.transport.heartbeat.interval",timeout)      
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) 
 configuration.setString("akka.watch.heartbeat.pause",timeout)  
configuration.setInteger("heartbeat.interval",10000000)  
configuration.setInteger("heartbeat.timeout",50000000)  val 
env:StreamExecutionEnvironment =  
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)            
// get input data by connecting to the socket  val dataStream = 
env.socketTextStream("localhost", port, '\n')        import 
org.apache.flink.streaming.api.scala._  val textResult = dataStream.flatMap( w 
= w.split("\\s") ).map( w =  WordWithCount(w,1))  .keyBy("word")  /**  * 
每20秒刷新一次,相当于重新开始计数,  * 好处,不需要一直拿所有的数据统计  * 只需要在指定时间间隔内的增量数据,减少了数据规模  */  
.timeWindow(Time.seconds(20))  //.countWindow(3)  //.countWindow(3,1)  
//.countWindowAll(3)      .sum("count" )    
textResult.print().setParallelism(1)        if(args == null || args.size ==0){  
env.execute("默认作业")    //执行计划  //println(env.getExecutionPlan)  //StreamGraph  
//println(env.getStreamGraph.getStreamingPlanAsJSON)        
//JsonPlanGenerator.generatePlan(jobGraph)    }else{  env.execute(args(0))  }   
 println("结束")    }      // Data type for words with count  case class 
WordWithCount(word: String, count: Long)    }      On Thursday, 28 February 
2019, 2:08:00 pm GMT+8, Yaoting Gong    fall.for.you....@gmail.com wrote:    
@Yuan Yifan    *不能贴图的。*    On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan 
tsingjyuj...@163.com wrote:        你说的应该是这里的代码:            
http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code 
      其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。      
本质上,sum是执行了一个Sum类型的Aggregate:    其AggregateFunction是:          
org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
    org.apache.flink.api.common.typeinfo.TypeInformationT,    
org.apache.flink.api.common.ExecutionConfig)          其中实现了reduce方法:      
所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。            在 
2019-02-28 13:04:59," " thinktothi...@yahoo.com.INVALID 写道:    请问: flink 
wordcount中 sum是在什么时候,哪个地方调用的?    

回复