代码: val env = StreamExecutionEnvironment.getExecutionEnvironment env.setRuntimeMode(RuntimeExecutionMode.BATCH) // 在DataStream API上以批处理方式执行
// 本地测试文件 val inputStream = env.readTextFile(getClass.getResource("/hello.txt").getPath) // 分词统计,问题:批处理模式的时候,sum 为 1 的单词不会被打印 val resultStream = inputStream .flatMap(_.split(",")) .filter(_.nonEmpty) .map((_, 1)) .keyBy(_._1) .sum(1) resultStream.print() env.execute("word count") 测试文件的数据内容: hello,flink hello,flink hello,hive hello,hive hello,hbase hello,hbase hello,scala hello,kafka hello,kafka 测试结果:hello/flink/hive/hbase/kafka的和大于1,会打印出来;但是 scala的个数为1,不会被打印出来 -- Sent from: http://apache-flink.147419.n8.nabble.com/