已经解决了,去掉循环,把每个kafka topic单独处理,再union
在 2021-06-01 08:54:42,"13631283359" <13631283...@163.com> 写道:
大家好,
我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多
代码如下:
/**
* 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env
Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics:
List[String], env: StreamExecutionEnvironment): DataStream[String] = { var
total: DataStream[String] = null for (str <- topics) { val topicName =
str.split(":")(0) val groupId = str.split(":")(1) val source_data =
getSourceData(topicName, groupId, env) if (total != null) { total =
total.union(source_data) } else { total = source_data } } total }