On Thursday, 28 February 2019, 5:32:00 pm GMT+8,
thinktothi...@yahoo.com.INVALID <thinktothi...@yahoo.com.INVALID> wrote:
我明白了,感谢大家).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值,
函数中会一条一条数据发送(a,1),(a,1),(a,1)).WindowOperator.processElement()函数中,收到数据后,调用
windowState.add(element.value), 其实调的是
HeapReducingState.add()函数,这个state值在WindowOperator.windowState.stateTable.primaryTable.state
这个里边存着(key,value)
HeapReducingState.add()中调用transform,最终调用ReduceTransformation.apply,该函数会调用reduce函数,在同一次window中,每来一个相同key,就更新一次,实现累加
public V apply(V previousState, V value) throws Exception { return
previousState != null ? reduceFunction.reduce(previousState, value) : value; }
On Thursday, 28 February 2019, 4:42:00 pm GMT+8, mayangyang02
<mayangyan...@imdada.cn> wrote:
@thinktothings
这个sum其实是维护在WindowOperator的state里的。
你看下WindowedStream#sum()会发现最终会调用到WindowedStream#reduce()。
而对reduce来说,这个state的实现类是HeapReducingState,对这个state来说,调用其add方法是就会实现聚合(对sum来说就是相加)。在window
emit时,就会将这个维护的state发送出去。
原始邮件
发件人:Yaoting gongfall.for.you....@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年2月28日(周四) 15:29
主题:Re: flink wordcount中 sum是在什么时候,哪个地方调用的?
@thinktothings 不知道是否我理解正确,我觉得你可以对flink的一些计算流程不是很清楚。 SumAggregator内的reduce
方法就可以计算出需要的“sum”结果。
你一直问什么时候调用sum,是指代码中的“sum("count")”吗?这个在构建steamgraph的时候已经调用了,目的就是获取内部返回的SumAggregator对象。
有个文章可以了解下。看下第一次即可:https://www.jianshu.com/p/13070729289c 如果我理解的不对,请忽略 On Thu,
Feb 28, 2019 at 2:57 PM thinktothi...@yahoo.com.invalid wrote:
-------------------------------------- ).本地环境: scala WordCount ,程序在附件中
SocketWindowWordCountLocal.scala ).输入数据: a b a ).设置的
timeWindow(Time.seconds(20)) ).[问题]想调试Flink源码中具体在哪一步进行sum操作
------------------------------------------------- 调试:
).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值, 函数中会一条一条数据发送(a,1),(b,1),(a,1)
).调用 StreamSink.processElement 函数打印输出结果
).没明白地方,是在调用StreamSink.processElement之前,在哪个地方调用了sum,对相同key进行了聚合操作 On
Thursday, 28 February 2019, 2:47:39 pm GMT+8, Yaoting Gong
fall.for.you....@gmail.com wrote: 你好。 我不知道你的是什么项目的代码。我从flink 官方的样例代码
SocketWindowWordCount.scala找到。 从sum跟进去,最终能找到一个SumAggregator。 On Thu, Feb 28,
2019 at 2:34 PM thinktothi...@yahoo.com.invalid wrote: Flink wordCount
本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-------------------------------------package
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
import org.apache.flink.configuration.Configuration import
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import
org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 输入数据
*/ object SocketWindowWordCountLocal { def main(args: Array[String]): Unit
= { val port = 1234 // get the execution environment // val env:
StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment val configuration :
Configuration = new Configuration() val timeout = "100000 s" val
timeoutHeartbeatPause = "1000000 s"
configuration.setString("akka.ask.timeout",timeout)
configuration.setString("akka.lookup.timeout",timeout)
configuration.setString("akka.tcp.timeout",timeout)
configuration.setString("akka.transport.heartbeat.interval",timeout)
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
configuration.setString("akka.watch.heartbeat.pause",timeout)
configuration.setInteger("heartbeat.interval",10000000)
configuration.setInteger("heartbeat.timeout",50000000) val
env:StreamExecutionEnvironment =
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
// get input data by connecting to the socket val dataStream =
env.socketTextStream("localhost", port, '\n') import
org.apache.flink.streaming.api.scala._ val textResult = dataStream.flatMap( w
= w.split("\\s") ).map( w = WordWithCount(w,1)) .keyBy("word") /** *
每20秒刷新一次,相当于重新开始计数, * 好处,不需要一直拿所有的数据统计 * 只需要在指定时间间隔内的增量数据,减少了数据规模 */
.timeWindow(Time.seconds(20)) //.countWindow(3) //.countWindow(3,1)
//.countWindowAll(3) .sum("count" )
textResult.print().setParallelism(1) if(args == null || args.size ==0){
env.execute("默认作业") //执行计划 //println(env.getExecutionPlan) //StreamGraph
//println(env.getStreamGraph.getStreamingPlanAsJSON)
//JsonPlanGenerator.generatePlan(jobGraph) }else{ env.execute(args(0)) }
println("结束") } // Data type for words with count case class
WordWithCount(word: String, count: Long) } On Thursday, 28 February
2019, 2:08:00 pm GMT+8, Yaoting Gong fall.for.you....@gmail.com wrote:
@Yuan Yifan *不能贴图的。* On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan
tsingjyuj...@163.com wrote: 你说的应该是这里的代码:
http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code
其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。
本质上,sum是执行了一个Sum类型的Aggregate: 其AggregateFunction是:
org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
org.apache.flink.api.common.typeinfo.TypeInformationT,
org.apache.flink.api.common.ExecutionConfig) 其中实现了reduce方法:
所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。 在
2019-02-28 13:04:59," " thinktothi...@yahoo.com.INVALID 写道: 请问: flink
wordcount中 sum是在什么时候,哪个地方调用的?