Hi ,应该是Kafka 可能存在空闲分区,如果只是partition 数量少于并发数的话,并不会影响水位推进,只是会浪费资源。默认程序不指定并行度,使用电脑cpu 核数。
如果是table api 的话,可以添加如下参数解决,table.exec.source.idle-timeout | | 飞雨 | | bigdata drewfrank...@126.com | ---- 回复的原邮件 ---- | 发件人 | Weihua Hu<huweihua....@gmail.com> | | 发送日期 | 2023年02月7日 18:48 | | 收件人 | <user-zh@flink.apache.org> | | 主题 | Re: Kafka 数据源无法实现基于事件时间的窗口聚合 | Hi, 问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task 的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。 可以尝试通过以下办法解决: 1. 将 source 并发控制为 1 2. 为 watermark 策略开始 idleness 处理,参考 [#1] fromElement 数据源会强制指定并发为 1 [#1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources Best, Weihua On Tue, Feb 7, 2023 at 1:31 PM wei_yuze <wei_y...@qq.com.invalid> wrote: 您好! 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource 和 kafkaSource 。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。 public class WindowReduceTest2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 使用fromElement数据源 DataStreamSource<Event2> streamSource = env.fromElements( new Event2("Alice", "./home", "2023-02-04 17:10:11"), new Event2("Bob", "./cart", "2023-02-04 17:10:12"), new Event2("Alice", "./home", "2023-02-04 17:10:13"), new Event2("Alice", "./home", "2023-02-04 17:10:15"), new Event2("Cary", "./home", "2023-02-04 17:10:16"), new Event2("Cary", "./home", "2023-02-04 17:10:16") ); // 使用Kafka数据源 JsonDeserializationSchema<Event2> jsonFormat = new JsonDeserializationSchema<>(Event2.class); KafkaSource<Event2> source = KafkaSource.<Event2>builder() .setBootstrapServers(Config.KAFKA_BROKERS) .setTopics(Config.KAFKA_TOPIC) .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(jsonFormat) .build(); DataStreamSource<Event2> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); kafkaSource.print(); // 生成watermark,从数据中提取时间作为事件时间 SingleOutputStreamOperator<Event2> watermarkedStream = kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event2>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner<Event2>() { @Override public long extractTimestamp(Event2 element, long recordTimestamp) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = null; try { date = simpleDateFormat.parse(element.getTime()); } catch (ParseException e) { throw new RuntimeException(e); } long time = date.getTime(); System.out.println(time); return time; } })); // 窗口聚合 watermarkedStream.map(new MapFunction<Event2, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(Event2 value) throws Exception { // 将数据转换成二元组,方便计算 return Tuple2.of(value.getUser(), 1L); } }) .keyBy(r -> r.f0) // 设置滚动事件时间窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { // 定义累加规则,窗口闭合时,向下游发送累加结果 return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }) .print("Aggregated stream"); env.execute(); } } 值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。 感谢您花时间查看这个问题! Lucas