Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
-- 很抱歉,我还是没有理解,我可以再次请求帮助吗? 例如: ).并行度调置为2时setParallelism(2),会产生两个window线程 ). 流 WordCount local ,flink 1.7.2 ).这两个Window线程是如何读取到自己分区中的数据的,Window分区是如何确定的? ).输入数据 1 2 3 4 5 6 7 8 9 10 ).source -> operator -> -- change [partition 0] key:1partition:0 key:2partition:0 key:3partition:0 key:4partition:0 key:6partition:0 key:10 partition:0 -- change 1 [partition 1] key:5partition:1 key:7partition:1 key:8partition:1 key:9partition:1 ).window 0 (1/2) window 当前partition是如何确定的? window 是如何读到当前parition中的数据的? ).window 1 (2/2) window 当前partition是如何确定的? window 是如何读到当前parition中的数据的? -- > 在 2019年3月3日,下午9:26,刘 文 写道: > > WordCount.scala > package > com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism > > import org.apache.flink.configuration.Configuration > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.api.windowing.time.Time > > /** > * nc -lk 1234 输入数据 > */ > object SocketWindowWordCountLocal { > > > > def main(args: Array[String]): Unit = { > > >val port = 1234 >// get the execution environment > // val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > > >val configuration : Configuration = getConfiguration(true) > >val env:StreamExecutionEnvironment = > StreamExecutionEnvironment.createLocalEnvironment(1,configuration) > > > > > >// get input data by connecting to the socket >val dataStream = env.socketTextStream("localhost", port, '\n') > > > >import org.apache.flink.streaming.api.scala._ >val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => > WordWithCount(w,1)) > .keyBy("word") > /** >* 每20秒刷新一次,相当于重新开始计数, >* 好处,不需要一直拿所有的数据统计 >* 只需要在指定时间间隔内的增量数据,减少了数据规模 >*/ > .timeWindow(Time.seconds(5)) > //.countWindow(3) > //.countWindow(3,1) > //.countWindowAll(3) > > > .sum("count" ) > >textResult > .setParallelism(100) > .print() > > > > >if(args == null || args.size ==0){ > > > > println("==以下为执行计划==") > println("执行地址(firefox效果更好):https://flink.apache.org/visualizer";) > //执行计划 > println(env.getExecutionPlan) > println("==以上为执行计划 > JSON串==\n") > //StreamGraph > //println(env.getStreamGraph.getStreamingPlanAsJSON) > > > > //JsonPlanGenerator.generatePlan(jobGraph) > > env.execute("默认作业") > >}else{ > env.execute(args(0)) >} > >println("结束") > > } > > > // Data type for words with count > case class WordWithCount(word: String, count: Long) > > > def getConfiguration(isDebug:Boolean = false):Configuration = { > >val configuration : Configuration = new Configuration() > >if(isDebug){ > val timeout = "10 s" > val timeoutHeartbeatPause = "100 s" > configuration.setString("akka.ask.timeout",timeout) > configuration.setString("akka.lookup.timeout",timeout) > configuration.setString("akka.tcp.timeout",timeout) > configuration.setString("akka.transport.heartbeat.interval",timeout) > > configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) > configuration.setString("akka.watch.heartbeat.pause",timeout) > configuration.setInteger("heartbeat.interval",1000) > configuration.setInteger("heartbeat.timeout",5000) >} > > >configuration > } > > > } > > > >> 在 2019年3月3日,下午9:05,刘 文 写道: >> >> > [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数? >> >> ).环境 Flink1.7.2 WordCount local,流处理 >> ).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 >> DataStream.setParallelism(2)决定 >> >> public void emit(T record) throws IOException, >> InterruptedException { >> emit(record, channelSelector.selectChannels(record, >> numChannels)); >> } >> >> 通过copyFromSerializerToTargetChannel(int targetChannel) >> 往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送) >> ).有多少个并行度,DataStream.setParallelism(2) ,就开启多少个Window >> >
回复:sql-client 支持远程连接flink集群吗
你好! sql-client你现在可以在zeepelin上去写SQL,但只支持DML,DDL暂时 还不支持(如 create table) Best, Han Fei -- 发件人:yuess_coder <642969...@qq.com> 发送时间:2019年3月4日(星期一) 10:29 收件人:user-zh@flink.apache.org 主 题:sql-client 支持远程连接flink集群吗 如题
sql-client ????????????flink??????
Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
WordCount.scala package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 输入数据 */ object SocketWindowWordCountLocal { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val configuration : Configuration = getConfiguration(true) val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration) // get input data by connecting to the socket val dataStream = env.socketTextStream("localhost", port, '\n') import org.apache.flink.streaming.api.scala._ val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1)) .keyBy("word") /** * 每20秒刷新一次,相当于重新开始计数, * 好处,不需要一直拿所有的数据统计 * 只需要在指定时间间隔内的增量数据,减少了数据规模 */ .timeWindow(Time.seconds(5)) //.countWindow(3) //.countWindow(3,1) //.countWindowAll(3) .sum("count" ) textResult .setParallelism(100) .print() if(args == null || args.size ==0){ println("==以下为执行计划==") println("执行地址(firefox效果更好):https://flink.apache.org/visualizer";) //执行计划 println(env.getExecutionPlan) println("==以上为执行计划 JSON串==\n") //StreamGraph //println(env.getStreamGraph.getStreamingPlanAsJSON) //JsonPlanGenerator.generatePlan(jobGraph) env.execute("默认作业") }else{ env.execute(args(0)) } println("结束") } // Data type for words with count case class WordWithCount(word: String, count: Long) def getConfiguration(isDebug:Boolean = false):Configuration = { val configuration : Configuration = new Configuration() if(isDebug){ val timeout = "10 s" val timeoutHeartbeatPause = "100 s" configuration.setString("akka.ask.timeout",timeout) configuration.setString("akka.lookup.timeout",timeout) configuration.setString("akka.tcp.timeout",timeout) configuration.setString("akka.transport.heartbeat.interval",timeout) configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) configuration.setString("akka.watch.heartbeat.pause",timeout) configuration.setInteger("heartbeat.interval",1000) configuration.setInteger("heartbeat.timeout",5000) } configuration } } > 在 2019年3月3日,下午9:05,刘 文 写道: > > [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数? > > ).环境 Flink1.7.2 WordCount local,流处理 > ).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 > DataStream.setParallelism(2)决定 > > public void emit(T record) throws IOException, > InterruptedException { > emit(record, channelSelector.selectChannels(record, > numChannels)); > } > > 通过copyFromSerializerToTargetChannel(int targetChannel) > 往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送) > ).有多少个并行度,DataStream.setParallelism(2),就开启多少个Window >
[问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
).环境 Flink1.7.2 WordCount local,流处理 ).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 DataStream.setParallelism(2)决定 public void emit(T record) throws IOException, InterruptedException { emit(record, channelSelector.selectChannels(record, numChannels)); } 通过copyFromSerializerToTargetChannel(int targetChannel) 往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送) ).有多少个并行度,DataStream.setParallelism(2) ,就开启多少个Window