Re: Re: Kafka 数据源无法实现基于事件时间的窗口聚合

2023-02-08 文章 yidan zhao
实际使用你肯定不会是console producer吧。或者你换java代码写kafka,方便控制些。

wei_yuze  于2023年2月8日周三 13:30写道:
>
> 非常感谢各位的回答!
>
>
>
> Weihua和飞雨正确定位出了问题。问题出在Flink 并发数大于Kafka分区数,导致部分Flink task slot 
> 接收不到数据,进而导致watermark(取所有task slot的最小值)无法推进。
>
>
> 我尝试了Weihua提供的两个解决方案后都可以推进watermark求得窗口聚合结果。
>
>
> 后来我想,理想的解决方式应该是使Flink的并发数接近于或等于Kafka的分区数。我的Kafka分区数为3,于是Flink setParallelism 
> 为3。后来发现又无法推进watermark。检查Kafka后发现,kafka Console Producer把所有的数据都推送到了第0号分区。
>
>
>
> 请问哪位能指点一下,让Kafka topic的每个分区都能收到数据?
>
>
>
>
>
> Best,
>
> Lucas
>
>
>
> Original Email
>
>
>
> Sender:"Weihua Hu"< huweihua@gmail.com ;
>
> Sent Time:2023/2/7 18:48
>
> To:"user-zh"< user-zh@flink.apache.org ;
>
> Subject:Re: Kafka 数据源无法实现基于事件时间的窗口聚合
>
>
> Hi,
>
> 问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task
> 的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。
> 可以尝试通过以下办法解决:
> 1. 将 source 并发控制为 1
> 2. 为 watermark 策略开始 idleness 处理,参考 [#1]
>
> fromElement 数据源会强制指定并发为 1
>
> [#1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>
>
> Best,
> Weihua
>
>
> On Tue, Feb 7, 2023 at 1:31 PM wei_yuze  wrote:
>
>  您好!
> 
> 
> 
> 
>  
> 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource
>  和 kafkaSource
>  。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。
> 
> 
> 
> 
>  public class WindowReduceTest2 {  public static void
>  main(String[] args) throws Exception {
>  StreamExecutionEnvironment env =
>  StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
>  // 使用fromElement数据源
>  DataStreamSource env.fromElements(
>  new
>  Event2("Alice", "./home", "2023-02-04 17:10:11"),
>  new Event2("Bob",
>  "./cart", "2023-02-04 17:10:12"),
>  new
>  Event2("Alice", "./home", "2023-02-04 17:10:13"),
>  new
>  Event2("Alice", "./home", "2023-02-04 17:10:15"),
>  new 
> Event2("Cary",
>  "./home", "2023-02-04 17:10:16"),
>  new 
> Event2("Cary",
>  "./home", "2023-02-04 17:10:16")
>  );
> 
> 
>  // 使用Kafka数据源
>  JsonDeserializationSchema  jsonFormat = new JsonDeserializationSchema<(Event2.class);
>  KafkaSource 
> KafkaSource. 
>  .setBootstrapServers(Config.KAFKA_BROKERS)
> 
>  .setTopics(Config.KAFKA_TOPIC)
> 
>  .setGroupId("my-group")
> 
>  .setStartingOffsets(OffsetsInitializer.earliest())
> 
>  .setValueOnlyDeserializer(jsonFormat)
>  .build();
>  DataStreamSource env.fromSource(source, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
>  kafkaSource.print();
> 
> 
>  // 生成watermark,从数据中提取时间作为事件时间
>  SingleOutputStreamOperator  watermarkedStream =
>  
> kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy. 
>  .withTimestampAssigner(new SerializableTimestampAssigner  
> 
>  @Override
>   
>  public long extractTimestamp(Event2 element, long recordTimestamp) {
>   
>SimpleDateFormat simpleDateFormat = new
>  SimpleDateFormat("-MM-dd HH:mm:ss");
>   
>Date date = null;
>   
>try {
>   
>  date =
>  simpleDateFormat.parse(element.getTime());
>   
>} catch (ParseException e) {
>   
>  throw new RuntimeException(e);
>   
>}
>   
>long time = date.getTime();
>   
>System.out.println(time);
>   
>return time;
>}
>  }));
> 
> 
>  // 窗口聚合
>  watermarkedStream.map(new MapFunction  Tuple2 
>  
>  @Override
>   
>  public Tuple2
>   
>// 将数据转换成二元组,方便计算
>   
>return Tuple2.of(value.getUser(), 1L);
>}
>  })
>  .keyBy(r -
>  r.f0)
>  // 设置滚动事件时间窗口
> 
>  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>  .reduce(new
>  ReduceFunction
>  @Override
>   
>  public Tuple2 Tuple2   
>
>// 定义累加规则,窗口闭合时,向下游发送累加结果
>   
>return Tuple2.of(value1.f0, value1.f1 + value2.f1);
>}
>  })
>  
> .print("Aggregated
>  stream");
> 
> 
>  env.execute();
>}
>  }
> 
> 
> 
> 
> 
> 
>  值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows
>  ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。
> 
> 
> 
>  感谢您花时间查看这个问题!
>  Lucas


Re: Kafka 数据源无法实现基于事件时间的窗口聚合

2023-02-07 文章 Weihua Hu
Hi,

问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task
的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。
可以尝试通过以下办法解决:
1. 将 source 并发控制为 1
2. 为 watermark 策略开始 idleness 处理,参考 [#1]

fromElement 数据源会强制指定并发为 1

[#1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources


Best,
Weihua


On Tue, Feb 7, 2023 at 1:31 PM wei_yuze  wrote:

> 您好!
>
>
>
>
> 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource
> 和 kafkaSource
> 。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。
>
>
>
>
> public class WindowReduceTest2 {  public static void
> main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
> // 使用fromElement数据源
> DataStreamSource env.fromElements(
> new
> Event2("Alice", "./home", "2023-02-04 17:10:11"),
> new Event2("Bob",
> "./cart", "2023-02-04 17:10:12"),
> new
> Event2("Alice", "./home", "2023-02-04 17:10:13"),
> new
> Event2("Alice", "./home", "2023-02-04 17:10:15"),
> new Event2("Cary",
> "./home", "2023-02-04 17:10:16"),
> new Event2("Cary",
> "./home", "2023-02-04 17:10:16")
> );
>
>
> // 使用Kafka数据源
> JsonDeserializationSchema jsonFormat = new JsonDeserializationSchema<(Event2.class);
> KafkaSource KafkaSource.
> .setBootstrapServers(Config.KAFKA_BROKERS)
>
> .setTopics(Config.KAFKA_TOPIC)
>
> .setGroupId("my-group")
>
> .setStartingOffsets(OffsetsInitializer.earliest())
>
> .setValueOnlyDeserializer(jsonFormat)
> .build();
> DataStreamSource env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
> kafkaSource.print();
>
>
> // 生成watermark,从数据中提取时间作为事件时间
> SingleOutputStreamOperator watermarkedStream =
> kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.
> .withTimestampAssigner(new SerializableTimestampAssigner  
> @Override
>  
> public long extractTimestamp(Event2 element, long recordTimestamp) {
>  
>   SimpleDateFormat simpleDateFormat = new
> SimpleDateFormat("-MM-dd HH:mm:ss");
>  
>   Date date = null;
>  
>   try {
>  
> date =
> simpleDateFormat.parse(element.getTime());
>  
>   } catch (ParseException e) {
>  
> throw new RuntimeException(e);
>  
>   }
>  
>   long time = date.getTime();
>  
>   System.out.println(time);
>  
>   return time;
>   }
> }));
>
>
> // 窗口聚合
> watermarkedStream.map(new MapFunction Tuple2  
> @Override
>  
> public Tuple2  
>   // 将数据转换成二元组,方便计算
>  
>   return Tuple2.of(value.getUser(), 1L);
>   }
> })
> .keyBy(r -
> r.f0)
> // 设置滚动事件时间窗口
>
> .window(TumblingEventTimeWindows.of(Time.seconds(5)))
> .reduce(new
> ReduceFunction  
> @Override
>  
> public Tuple2 Tuple2  
>   // 定义累加规则,窗口闭合时,向下游发送累加结果
>  
>   return Tuple2.of(value1.f0, value1.f1 + value2.f1);
>   }
> })
> .print("Aggregated
> stream");
>
>
> env.execute();
>   }
> }
>
>
>
>
>
>
> 值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows
> ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。
>
>
>
> 感谢您花时间查看这个问题!
> Lucas