???????? ????????
------------------ ???????? ------------------ ??????: "user-zh" <wangwangdaxian...@gmail.com>; ????????: 2020??8??4??(??????) ????1:13 ??????: "user-zh@flink.apache.org"<user-zh@flink.apache.org>; ????: Re: flink-1.11 ???????? source??????????????In??????,????Out?????? ????source??map??????operator chain??????task????disable????operator chain,??map??????????????task????????map????????????In??Out???? ????????????????????isBackpressured??????????????????operator???????????????????????????????? kcz <573693...@qq.com> ??2020??8??4?????? ????12:41?????? > ???? yeah??ui??????????????????????????souce????????????????map sleep???????????????????????? ??????????????100w???????? > > > > > > ------------------ ???????? ------------------ > ??????: shizk233 <wangwangdaxian...@gmail.com&gt; > ????????: 2020??8??3?? 23:03 > ??????: user-zh@flink.apache.org <user-zh@flink.apache.org&gt; > ????: ??????flink-1.11 ???????? > > > > source??????????????????????????????web ui??task??numOfRecordsIn?? > > kcz <573693...@qq.com&gt; ??2020??8??3?????? ????7:29?????? > > &gt; ????????????????????????????kafka??????100w??????????source?????????????????????????????????????????? > &gt; public static void main(String[] args) throws Exception{ > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env = > &gt; StreamExecutionEnvironment.getExecutionEnvironment(); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.enableCheckpointing(2000L, > CheckpointingMode.EXACTLY_ONCE); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setStateBackend(new MemoryStateBackend()); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setParallelism(4); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties = getLocal(); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; properties.setProperty("group.id","test"); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; FlinkKafkaConsumer<String&amp;gt; consumer = > new > &gt; FlinkKafkaConsumer<&amp;gt;("testOrderTopic", new > SimpleStringSchema(), > &gt; properties); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; DataStream<String&amp;gt; stream = env > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .addSource(consumer); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.map(new MapFunction<String, > Tuple2<Integer,Integer&amp;gt;&amp;gt;() { > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public > Tuple2<Integer,Integer&amp;gt; map(String s) throws Exception { > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > Thread.sleep(1000*60*60*60); > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > return new Tuple2(1,1); > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &gt;&nbsp;&nbsp;&nbsp;&nbsp; }).keyBy(0).sum(0); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.print(); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; //stream.map(); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.execute(); > &gt; > &gt; }