Re: Re: Kafka 数据源无法实现基于事件时间的窗口聚合
实际使用你肯定不会是console producer吧。或者你换java代码写kafka,方便控制些。 wei_yuze 于2023年2月8日周三 13:30写道: > > 非常感谢各位的回答! > > > > Weihua和飞雨正确定位出了问题。问题出在Flink 并发数大于Kafka分区数,导致部分Flink task slot > 接收不到数据,进而导致watermark(取所有task slot的最小值)无法推进。 > > > 我尝试了Weihua提供的两个解决方案后都可以推进watermark求得窗口聚合结果。 > > > 后来我想,理想的解决方式应该是使Flink的并发数接近于或等于Kafka的分区数。我的Kafka分区数为3,于是Flink setParallelism > 为3。后来发现又无法推进watermark。检查Kafka后发现,kafka Console Producer把所有的数据都推送到了第0号分区。 > > > > 请问哪位能指点一下,让Kafka topic的每个分区都能收到数据? > > > > > > Best, > > Lucas > > > > Original Email > > > > Sender:"Weihua Hu"< huweihua....@gmail.com ; > > Sent Time:2023/2/7 18:48 > > To:"user-zh"< user-zh@flink.apache.org ; > > Subject:Re: Kafka 数据源无法实现基于事件时间的窗口聚合 > > > Hi, > > 问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task > 的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。 > 可以尝试通过以下办法解决: > 1. 将 source 并发控制为 1 > 2. 为 watermark 策略开始 idleness 处理,参考 [#1] > > fromElement 数据源会强制指定并发为 1 > > [#1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources > > > Best, > Weihua > > > On Tue, Feb 7, 2023 at 1:31 PM wei_yuze wrote: > > 您好! > > > > > > 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource > 和 kafkaSource > 。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。 > > > > > public class WindowReduceTest2 { public static void > main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > // 使用fromElement数据源 > DataStreamSource env.fromElements( > new > Event2("Alice", "./home", "2023-02-04 17:10:11"), > new Event2("Bob", > "./cart", "2023-02-04 17:10:12"), > new > Event2("Alice", "./home", "2023-02-04 17:10:13"), > new > Event2("Alice", "./home", "2023-02-04 17:10:15"), > new > Event2("Cary", > "./home", "2023-02-04 17:10:16"), > new > Event2("Cary", > "./home", "2023-02-04 17:10:16") > ); > > > // 使用Kafka数据源 > JsonDeserializationSchema jsonFormat = new JsonDeserializationSchema<(Event2.class); > KafkaSource > KafkaSource. > .setBootstrapServers(Config.KAFKA_BROKERS) > > .setTopics(Config.KAFKA_TOPIC) > > .setGroupId("my-group") > > .setStartingOffsets(OffsetsInitializer.earliest()) > > .setValueOnlyDeserializer(jsonFormat) > .build(); > DataStreamSource env.fromSource(source, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > kafkaSource.print(); > > > // 生成watermark,从数据中提取时间作为事件时间 > SingleOutputStreamOperator watermarkedStream = > > kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy. > .withTimestampAssigner(new SerializableTimestampAssigner > > @Override > > public long extractTimestamp(Event2 element, long recordTimestamp) { > >SimpleDateFormat simpleDateFormat = new > SimpleDateFormat("-MM-dd HH:mm:ss"); > >Date date = null; > >try { > > date = > simpleDateFormat.parse(element.getTime()); > >} catch (ParseException e) { > > throw new RuntimeException(e); > >} > >long time = date.getTime(); > >System.out.println(time); > >return time; >} > })); > > > // 窗口聚合 > watermarkedStream.map(new MapFunction Tuple2 > > @Override > > public Tuple2 > >// 将数据转换成二元组,方便计算 > >return Tuple2.of(value.getUser(), 1L); >} > }) > .keyBy(r - > r.f0) > // 设置滚动事件时间窗口 > > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > .reduce(new > ReduceFunction > @Override > > public Tuple2 Tuple2 > >// 定义累加规则,窗口闭合时,向下游发送累加结果 > >return Tuple2.of(value1.f0, value1.f1 + value2.f1); >} > }) > > .print("Aggregated > stream"); > > > env.execute(); >} > } > > > > > > > 值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows > ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。 > > > > 感谢您花时间查看这个问题! > Lucas
回复: Kafka 数据源无法实现基于事件时间的窗口聚合
Hi ,应该是Kafka 可能存在空闲分区,如果只是partition 数量少于并发数的话,并不会影响水位推进,只是会浪费资源。默认程序不指定并行度,使用电脑cpu 核数。 如果是table api 的话,可以添加如下参数解决,table.exec.source.idle-timeout | | 飞雨 | | bigdata drewfrank...@126.com | 回复的原邮件 | 发件人 | Weihua Hu | | 发送日期 | 2023年02月7日 18:48 | | 收件人 | | | 主题 | Re: Kafka 数据源无法实现基于事件时间的窗口聚合 | Hi, 问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task 的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。 可以尝试通过以下办法解决: 1. 将 source 并发控制为 1 2. 为 watermark 策略开始 idleness 处理,参考 [#1] fromElement 数据源会强制指定并发为 1 [#1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources Best, Weihua On Tue, Feb 7, 2023 at 1:31 PM wei_yuze wrote: 您好! 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource 和 kafkaSource 。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。 public class WindowReduceTest2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 使用fromElement数据源 DataStreamSource
Re: Kafka 数据源无法实现基于事件时间的窗口聚合
Hi, 问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task 的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。 可以尝试通过以下办法解决: 1. 将 source 并发控制为 1 2. 为 watermark 策略开始 idleness 处理,参考 [#1] fromElement 数据源会强制指定并发为 1 [#1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources Best, Weihua On Tue, Feb 7, 2023 at 1:31 PM wei_yuze wrote: > 您好! > > > > > 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource > 和 kafkaSource > 。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。 > > > > > public class WindowReduceTest2 { public static void > main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > // 使用fromElement数据源 > DataStreamSource env.fromElements( > new > Event2("Alice", "./home", "2023-02-04 17:10:11"), > new Event2("Bob", > "./cart", "2023-02-04 17:10:12"), > new > Event2("Alice", "./home", "2023-02-04 17:10:13"), > new > Event2("Alice", "./home", "2023-02-04 17:10:15"), > new Event2("Cary", > "./home", "2023-02-04 17:10:16"), > new Event2("Cary", > "./home", "2023-02-04 17:10:16") > ); > > > // 使用Kafka数据源 > JsonDeserializationSchema jsonFormat = new JsonDeserializationSchema<(Event2.class); > KafkaSource KafkaSource. > .setBootstrapServers(Config.KAFKA_BROKERS) > > .setTopics(Config.KAFKA_TOPIC) > > .setGroupId("my-group") > > .setStartingOffsets(OffsetsInitializer.earliest()) > > .setValueOnlyDeserializer(jsonFormat) > .build(); > DataStreamSource env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); > kafkaSource.print(); > > > // 生成watermark,从数据中提取时间作为事件时间 > SingleOutputStreamOperator watermarkedStream = > kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy. > .withTimestampAssigner(new SerializableTimestampAssigner > @Override > > public long extractTimestamp(Event2 element, long recordTimestamp) { > > SimpleDateFormat simpleDateFormat = new > SimpleDateFormat("-MM-dd HH:mm:ss"); > > Date date = null; > > try { > > date = > simpleDateFormat.parse(element.getTime()); > > } catch (ParseException e) { > > throw new RuntimeException(e); > > } > > long time = date.getTime(); > > System.out.println(time); > > return time; > } > })); > > > // 窗口聚合 > watermarkedStream.map(new MapFunction Tuple2 > @Override > > public Tuple2 > // 将数据转换成二元组,方便计算 > > return Tuple2.of(value.getUser(), 1L); > } > }) > .keyBy(r - > r.f0) > // 设置滚动事件时间窗口 > > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > .reduce(new > ReduceFunction > @Override > > public Tuple2 Tuple2 > // 定义累加规则,窗口闭合时,向下游发送累加结果 > > return Tuple2.of(value1.f0, value1.f1 + value2.f1); > } > }) > .print("Aggregated > stream"); > > > env.execute(); > } > } > > > > > > > 值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows > ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。 > > > > 感谢您花时间查看这个问题! > Lucas