如果你只想知道在哪里做SUM的话,看这个类: org.apache.flink.streaming.api.functions.aggregation.SumAggregator 的reduce函数就行了。
但是如果你要想知道啥时候把SUM的结果发射出来的话,这个就太复杂了,因为你后面有可能接个Trigger,在满足一定条件的时候也可能“手动”调用FireAndPurge,所以未必是在窗口时间结束的时候发射。 详细可以看下这里的代码: org.apache.flink.streaming.api.datastream.WindowedStream#reduce(org.apache.flink.api.common.functions.ReduceFunction<T>, org.apache.flink.streaming.api.functions.windowing.WindowFunction<T,R,K,W>, org.apache.flink.api.common.typeinfo.TypeInformation<R>) 在 2019-02-28 14:34:40," " <thinktothi...@yahoo.com.INVALID> 写道: > Flink wordCount > 本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-------------------------------------package > com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc > >import org.apache.flink.configuration.Configuration >import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >import org.apache.flink.streaming.api.windowing.time.Time > >/** > * nc -lk 1234 输入数据 > */ >object SocketWindowWordCountLocal { > > def main(args: Array[String]): Unit = { > > > val port = 1234 > // get the execution environment > // val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > > > val configuration : Configuration = new Configuration() > val timeout = "100000 s" > val timeoutHeartbeatPause = "1000000 s" > configuration.setString("akka.ask.timeout",timeout) > configuration.setString("akka.lookup.timeout",timeout) > configuration.setString("akka.tcp.timeout",timeout) > configuration.setString("akka.transport.heartbeat.interval",timeout) > > configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) > configuration.setString("akka.watch.heartbeat.pause",timeout) > configuration.setInteger("heartbeat.interval",10000000) > configuration.setInteger("heartbeat.timeout",50000000) > val env:StreamExecutionEnvironment = > StreamExecutionEnvironment.createLocalEnvironment(1,configuration) > > > > > > // get input data by connecting to the socket > val dataStream = env.socketTextStream("localhost", port, '\n') > > > > import org.apache.flink.streaming.api.scala._ > val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => > WordWithCount(w,1)) > .keyBy("word") > /** > * 每20秒刷新一次,相当于重新开始计数, > * 好处,不需要一直拿所有的数据统计 > * 只需要在指定时间间隔内的增量数据,减少了数据规模 > */ > .timeWindow(Time.seconds(20)) > //.countWindow(3) > //.countWindow(3,1) > //.countWindowAll(3) > > > .sum("count" ) > > textResult.print().setParallelism(1) > > > > if(args == null || args.size ==0){ > env.execute("默认作业") > > //执行计划 > //println(env.getExecutionPlan) > //StreamGraph > //println(env.getStreamGraph.getStreamingPlanAsJSON) > > > > //JsonPlanGenerator.generatePlan(jobGraph) > > }else{ > env.execute(args(0)) > } > > println("结束") > > } > > > // Data type for words with count > case class WordWithCount(word: String, count: Long) > >} > > > On Thursday, 28 February 2019, 2:08:00 pm GMT+8, Yaoting Gong > <fall.for.you....@gmail.com> wrote: > > @Yuan Yifan > >*不能贴图的。* > >On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan <tsingjyuj...@163.com> wrote: > >> >> 你说的应该是这里的代码: >> >> >> http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code >> >> 其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。 >> >> 本质上,sum是执行了一个Sum类型的Aggregate: >> 其AggregateFunction是: >> >> org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int, >> org.apache.flink.api.common.typeinfo.TypeInformation<T>, >> org.apache.flink.api.common.ExecutionConfig) >> >> >> 其中实现了reduce方法: >> >> 所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。 >> >> >> >> 在 2019-02-28 13:04:59," " <thinktothi...@yahoo.com.INVALID> 写道: >> >请问: flink wordcount中 sum是在什么时候,哪个地方调用的? >> >> >> >> >> >