已经解决了,去掉循环,把每个kafka topic单独处理,再union













在 2021-06-01 08:54:42,"13631283359" <13631283...@163.com> 写道:

大家好,
            我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多
            代码如下:
            /**
* 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env 
Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics: 
List[String], env: StreamExecutionEnvironment): DataStream[String] = { var 
total: DataStream[String] = null for (str <- topics) { val topicName = 
str.split(":")(0) val groupId = str.split(":")(1) val source_data = 
getSourceData(topicName, groupId, env) if (total != null) { total = 
total.union(source_data) } else { total = source_data } } total }





 

回复