大家好:
我最近做的项目是用flink消费kafka数据写到doris,但过程中发生doris写不进去,kafka的数据已经被消费,造成数据丢失。
请问怎么去做容错处理,希望知道的能回答下,谢谢了
1.不知道这边有没有flink数据链路监控
2.flink反压可能有以下几种原因:
数据倾斜
TaskManager配置内存太小,导致full gc
checkpoint太慢
状态太大
在 2021-06-08 10:36:49,"LakeShen" 写道:
>你可以先结合你的任务逻辑,以及 Flink Web UI 反压监控,看看到底是什么地方引起反压。
>一般 source 反压,是下游的算子一直反压到 source。可以看看那个算子引起
>
>Best,
>LakeShen
>
>
>yidan zhao 于2021年6月8日周二 上午10:28
已经解决了,去掉循环,把每个kafka topic单独处理,再union
在 2021-06-01 08:54:42,"13631283359" <13631283...@163.com> 写道:
大家好,
我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多
代码如下:
/**
* 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者
大家好,
我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多
代码如下:
/**
* 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env
Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics:
List[String], env: StreamExecutionEnvironme