Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?

2019-03-04 文章 刘 文

感谢大家的回答,我明白了一些了,并整理这个问题的文档

Flink1.7.2 Source、Window数据交互源码分析: 
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/flink-source-window-data-exchange.md
 

Flink1.7.2 并行计算源码分析: 
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md
 







Flink1.7.2 并行计算源码分析

 
源码

源码:https://github.com/opensourceteams/fink-maven-scala-2 

Flink1.7.2 Source、Window数据交互源码分析: 
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/flink-source-window-data-exchange.md
 

 
概述

Flink Window如何进行并行计算
Flink source如何按key,hash分区,并发射到对应分区的下游Window
 
WordCount
 程序

package 
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocal {



  def main(args: Array[String]): Unit = {


val port = 1234
// get the execution environment
   // val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment


val configuration : Configuration = getConfiguration(true)

val env:StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)





// get input data by connecting to the socket
val dataStream = env.socketTextStream("localhost", port, '\n')



import org.apache.flink.streaming.api.scala._
val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
WordWithCount(w,1))
  .keyBy("word")
  /**
* 每20秒刷新一次,相当于重新开始计数,
* 好处,不需要一直拿所有的数据统计
* 只需要在指定时间间隔内的增量数据,减少了数据规模
*/
  .timeWindow(Time.seconds(5))
  //.countWindow(3)
  //.countWindow(3,1)
  //.countWindowAll(3)


  .sum("count" )

textResult
  .setParallelism(3)
  .print()




if(args == null || args.size ==0){


  
println("==以下为执行计划==")
  println("执行地址(firefox效果更好):https://flink.apache.org/visualizer";)
  //执行计划
  //println(env.getExecutionPlan)
 // println("==以上为执行计划 
JSON串==\n")
  //StreamGraph
 println(env.getStreamGraph.getStreamingPlanAsJSON)



  //JsonPlanGenerator.generatePlan(jobGraph)

  env.execute("默认作业")

}else{
  env.execute(args(0))
}

println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long){
//override def toString: String = Thread.currentThread().getName + word + " 
: " + count
  }


  def getConfiguration(isDebug:Boolean = false):Configuration = {

val configuration : Configuration = new Configuration()

if(isDebug){
  val timeout = "10 s"
  val timeoutHeartbeatPause = "100 s"
  configuration.setString("akka.ask.timeout",timeout)
  configuration.setString("akka.lookup.timeout",timeout)
  configuration.setString("akka.tcp.timeout",timeout)
  configuration.setString("akka.transport.heartbeat.interval",timeout)
  
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
  configuration.setString("akka.watch.heartbeat.pause",timeout)
  configuration.setInteger("heartbeat.interval",1000)
  configuration.setInteger("heartbeat.timeout",5000)
}


configuration
  }


}


 
输入数据

1 2 3 4 5 6 7 8 9 10
 
源码分析

 
Execution

回复: Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?

2019-03-04 文章 343122...@qq.com
以下个人理解,可能不100%准确.
是根据keyBy,即你代码中的 .keyBy("word"), 
根据其值的hash值,模并行度得到余数, 来确定 数据该分到哪个分区, 
你代码里没有指定时间特征,默认是处理时间.
所有你的window,则是根据处理时间来分窗口的.




343122...@qq.com
 
发件人: 刘 文
发送时间: 2019-03-04 11:53
收件人: user-zh@flink.apache.org
主题: Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
--
很抱歉,我还是没有理解,我可以再次请求帮助吗?
 
例如:
).并行度调置为2时setParallelism(2),会产生两个window线程
). 流 WordCount local ,flink 1.7.2
).这两个Window线程是如何读取到自己分区中的数据的,Window分区是如何确定的?
).输入数据
  1 2 3 4 5 6 7 8 9 10
).source   ->  operator   -> 
--
change [partition 0]
   
   
key:1partition:0
key:2partition:0
key:3partition:0
key:4partition:0
key:6partition:0
key:10   partition:0
--
change 1  [partition 1]
key:5partition:1
key:7partition:1
key:8partition:1
key:9partition:1
).window 0 (1/2)
window 当前partition是如何确定的?
window 是如何读到当前parition中的数据的?
   
).window 1 (2/2) 
window 当前partition是如何确定的?
window 是如何读到当前parition中的数据的?
 
 
--
 
 
 
 
 
> 在 2019年3月3日,下午9:26,刘 文  写道:
> 
> WordCount.scala
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism
> 
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
> 
> /**
>  * nc -lk 1234  输入数据
>  */
> object SocketWindowWordCountLocal {
> 
> 
> 
>  def main(args: Array[String]): Unit = {
> 
> 
>val port = 1234
>// get the execution environment
>   // val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
>val configuration : Configuration = getConfiguration(true)
> 
>val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
> 
> 
> 
>// get input data by connecting to the socket
>val dataStream = env.socketTextStream("localhost", port, '\n')
> 
> 
> 
>import org.apache.flink.streaming.api.scala._
>val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
> WordWithCount(w,1))
>  .keyBy("word")
>  /**
>* 每20秒刷新一次,相当于重新开始计数,
>* 好处,不需要一直拿所有的数据统计
>* 只需要在指定时间间隔内的增量数据,减少了数据规模
>*/
>  .timeWindow(Time.seconds(5))
>  //.countWindow(3)
>  //.countWindow(3,1)
>  //.countWindowAll(3)
> 
> 
>  .sum("count" )
> 
>textResult
>  .setParallelism(100)
>  .print()
> 
> 
> 
> 
>if(args == null || args.size ==0){
> 
> 
>  
> println("==以下为执行计划==")
>  println("执行地址(firefox效果更好):https://flink.apache.org/visualizer";)
>  //执行计划
>  println(env.getExecutionPlan)
>  println("==以上为执行计划 
> JSON串==\n")
>  //StreamGraph
> //println(env.getStreamGraph.getStreamingPlanAsJSON)
> 
> 
> 
>  //JsonPlanGenerator.generatePlan(jobGraph)
> 
>  env.execute("默认作业")
> 
>}else{
>  env.execute(args(0))
>}
> 
>println("结束")
> 
>  }
> 
> 
>  // Data type for words with count
>  case class WordWithCount(word: String, count: Long)
> 
> 
>  def getConfiguration(isDebug:Boolean = false):Configuration = {
> 
>val configuration : Configuration = new Configuration()
> 
>if(isDebug){
>  val timeout = "10 s"
>  val timeoutHeartbeatPause = "100 s"
>  configuration.setString("akka.ask.timeout",timeout)
>  configuration.setString("akka.lookup.timeout",timeout)
>  configuration.setString("akka.tcp.timeout",timeout)
>      configuration.setString("akka.transport.heartbeat.interval",timeout)
>  
> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>  configuration.setString("akka.watch.heartbeat.pause",timeout)
>  configuration.setInteger("heartbeat.interval",1000)
>  configuration.setInteger("heartbeat.timeout",5000)
>}
> 
> 
>configuration
>  }
> 
> 
> }
> 
> 
> 
>> 在 2019年3月3日,下午9:05,刘 文  写道:
>> 
>> 
> [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
>> 
>> ).环境 Flink1.7.2 WordCount local,流处理
>> ).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 
>> DataStream.setParallelism(2)决定
>> 
>> public void emit(T record) throws IOException, InterruptedException {
>>emit(record, channelSelector.selectChannels(record, numChannels));
>> }
>> 
>> 通过copyFromSerializerToTargetChannel(int targetChannel) 
>> 往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送)
>> ).有多少个并行度,DataStream.setParallelism(2) ,就开启多少个Window
>> 
> 
 
 


Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?

2019-03-03 文章 刘 文
--
很抱歉,我还是没有理解,我可以再次请求帮助吗?

例如:
).并行度调置为2时setParallelism(2),会产生两个window线程
). 流 WordCount local ,flink 1.7.2
).这两个Window线程是如何读取到自己分区中的数据的,Window分区是如何确定的?
).输入数据
  1 2 3 4 5 6 7 8 9 10
).source   ->  operator   -> 
--
change [partition 0]
   
   
key:1partition:0
key:2partition:0
key:3partition:0
key:4partition:0
key:6partition:0
key:10   partition:0
 --
 change 1  [partition 1]

key:5partition:1
key:7partition:1
key:8partition:1
key:9partition:1
).window 0 (1/2)
window 当前partition是如何确定的?
window 是如何读到当前parition中的数据的?
   
).window 1 (2/2) 
window 当前partition是如何确定的?
window 是如何读到当前parition中的数据的?


--





> 在 2019年3月3日,下午9:26,刘 文  写道:
> 
> WordCount.scala
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism
> 
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
> 
> /**
>  * nc -lk 1234  输入数据
>  */
> object SocketWindowWordCountLocal {
> 
> 
> 
>  def main(args: Array[String]): Unit = {
> 
> 
>val port = 1234
>// get the execution environment
>   // val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
>val configuration : Configuration = getConfiguration(true)
> 
>val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
> 
> 
> 
>// get input data by connecting to the socket
>val dataStream = env.socketTextStream("localhost", port, '\n')
> 
> 
> 
>import org.apache.flink.streaming.api.scala._
>val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
> WordWithCount(w,1))
>  .keyBy("word")
>  /**
>* 每20秒刷新一次,相当于重新开始计数,
>* 好处,不需要一直拿所有的数据统计
>* 只需要在指定时间间隔内的增量数据,减少了数据规模
>*/
>  .timeWindow(Time.seconds(5))
>  //.countWindow(3)
>  //.countWindow(3,1)
>  //.countWindowAll(3)
> 
> 
>  .sum("count" )
> 
>textResult
>  .setParallelism(100)
>  .print()
> 
> 
> 
> 
>if(args == null || args.size ==0){
> 
> 
>  
> println("==以下为执行计划==")
>  println("执行地址(firefox效果更好):https://flink.apache.org/visualizer";)
>  //执行计划
>  println(env.getExecutionPlan)
>  println("==以上为执行计划 
> JSON串==\n")
>  //StreamGraph
> //println(env.getStreamGraph.getStreamingPlanAsJSON)
> 
> 
> 
>  //JsonPlanGenerator.generatePlan(jobGraph)
> 
>  env.execute("默认作业")
> 
>}else{
>  env.execute(args(0))
>}
> 
>println("结束")
> 
>  }
> 
> 
>  // Data type for words with count
>  case class WordWithCount(word: String, count: Long)
> 
> 
>  def getConfiguration(isDebug:Boolean = false):Configuration = {
> 
>val configuration : Configuration = new Configuration()
> 
>if(isDebug){
>  val timeout = "10 s"
>  val timeoutHeartbeatPause = "100 s"
>  configuration.setString("akka.ask.timeout",timeout)
>  configuration.setString("akka.lookup.timeout",timeout)
>  configuration.setString("akka.tcp.timeout",timeout)
>      configuration.setString("akka.transport.heartbeat.interval",timeout)
>  
> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>  configuration.setString("akka.watch.heartbeat.pause",timeout)
>  configuration.setInteger("heartbeat.interval",1000)
>  configuration.setInteger("heartbeat.timeout",5000)
>}
> 
> 
>configuration
>  }
> 
> 
> }
> 
> 
> 
>> 在 2019年3月3日,下午9:05,刘 文  写道:
>> 
>> 
> [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
>> 
>> ).环境 Flink1.7.2 WordCount local,流处理
>> ).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 
>> DataStream.setParallelism(2)决定
>>  
>>  public void emit(T record) throws IOException, 
>> InterruptedException {
>> emit(record, channelSelector.selectChannels(record, 
>> numChannels));
>>  }
>>  
>>  通过copyFromSerializerToTargetChannel(int targetChannel) 
>> 往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送)
>> ).有多少个并行度,DataStream.setParallelism(2)   ,就开启多少个Window
>>  
> 



Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?

2019-03-03 文章 刘 文
WordCount.scala
package 
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocal {



  def main(args: Array[String]): Unit = {


val port = 1234
// get the execution environment
   // val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment


val configuration : Configuration = getConfiguration(true)

val env:StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)





// get input data by connecting to the socket
val dataStream = env.socketTextStream("localhost", port, '\n')



import org.apache.flink.streaming.api.scala._
val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
WordWithCount(w,1))
  .keyBy("word")
  /**
* 每20秒刷新一次,相当于重新开始计数,
* 好处,不需要一直拿所有的数据统计
* 只需要在指定时间间隔内的增量数据,减少了数据规模
*/
  .timeWindow(Time.seconds(5))
  //.countWindow(3)
  //.countWindow(3,1)
  //.countWindowAll(3)


  .sum("count" )

textResult
  .setParallelism(100)
  .print()




if(args == null || args.size ==0){


  
println("==以下为执行计划==")
  println("执行地址(firefox效果更好):https://flink.apache.org/visualizer";)
  //执行计划
  println(env.getExecutionPlan)
  println("==以上为执行计划 
JSON串==\n")
  //StreamGraph
 //println(env.getStreamGraph.getStreamingPlanAsJSON)



  //JsonPlanGenerator.generatePlan(jobGraph)

  env.execute("默认作业")

}else{
  env.execute(args(0))
}

println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long)


  def getConfiguration(isDebug:Boolean = false):Configuration = {

val configuration : Configuration = new Configuration()

if(isDebug){
  val timeout = "10 s"
  val timeoutHeartbeatPause = "100 s"
  configuration.setString("akka.ask.timeout",timeout)
  configuration.setString("akka.lookup.timeout",timeout)
  configuration.setString("akka.tcp.timeout",timeout)
  configuration.setString("akka.transport.heartbeat.interval",timeout)
  
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
  configuration.setString("akka.watch.heartbeat.pause",timeout)
  configuration.setInteger("heartbeat.interval",1000)
      configuration.setInteger("heartbeat.timeout",5000)
}


configuration
  }


}



> 在 2019年3月3日,下午9:05,刘 文  写道:
> 
> 
 [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
> 
> ).环境 Flink1.7.2 WordCount local,流处理
> ).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 
> DataStream.setParallelism(2)决定
>   
>   public void emit(T record) throws IOException, 
> InterruptedException {
>  emit(record, channelSelector.selectChannels(record, 
> numChannels));
>   }
>   
>   通过copyFromSerializerToTargetChannel(int targetChannel) 
> 往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送)
> ).有多少个并行度,DataStream.setParallelism(2),就开启多少个Window
>   



[问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?

2019-03-03 文章 刘 文


).环境 Flink1.7.2 WordCount local,流处理
).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 
DataStream.setParallelism(2)决定

public void emit(T record) throws IOException, 
InterruptedException {
   emit(record, channelSelector.selectChannels(record, 
numChannels));
}

通过copyFromSerializerToTargetChannel(int targetChannel) 
往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送)
).有多少个并行度,DataStream.setParallelism(2)  ,就开启多少个Window