Hi, Lucas. 
What do you mean by saying "unable to do event time window aggregation with 
watermarkedStream"? 
What exception it will throw? 

Best regards, 
Yuxia 


发件人: "wei_yuze" <wei_y...@qq.com> 
收件人: "User" <user@flink.apache.org> 
发送时间: 星期二, 2023年 2 月 07日 下午 1:43:59 
主题: Unable to do event time window aggregation with Kafka source 



Hello! 




I was unable to do event time window aggregation with Kafka source, but had no 
problem with "fromElement" source. The code is attached as follow. The code has 
two data sources, named "streamSource" and "kafkaSource" respectively. The 
program works well with "streamSource", but not with "watermarkedStream". 


public class WindowReduceTest2 { 
public static void main(String[] args) throws Exception { 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 

// 使用fromElement数据源 
DataStreamSource<Event2> streamSource = env.fromElements( 
new Event2("Alice", "./home", "2023-02-04 17:10:11"), 
new Event2("Bob", "./cart", "2023-02-04 17:10:12"), 
new Event2("Alice", "./home", "2023-02-04 17:10:13"), 
new Event2("Alice", "./home", "2023-02-04 17:10:15"), 
new Event2("Cary", "./home", "2023-02-04 17:10:16"), 
new Event2("Cary", "./home", "2023-02-04 17:10:16") 
); 

// 使用Kafka数据源 
JsonDeserializationSchema<Event2> jsonFormat = new 
JsonDeserializationSchema<>(Event2.class); 
KafkaSource<Event2> source = KafkaSource.<Event2>builder() 
.setBootstrapServers(Config.KAFKA_BROKERS) 
.setTopics(Config.KAFKA_TOPIC) 
.setGroupId("my-group") 
.setStartingOffsets(OffsetsInitializer.earliest()) 
.setValueOnlyDeserializer(jsonFormat) 
.build(); 
DataStreamSource<Event2> kafkaSource = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "Kafka Source"); 
kafkaSource.print(); 

// 生成watermark,从数据中提取时间作为事件时间 
SingleOutputStreamOperator<Event2> watermarkedStream = 
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event2>forBoundedOutOfOrderness(Duration.ZERO)
 
.withTimestampAssigner(new SerializableTimestampAssigner<Event2>() { 
@Override 
public long extractTimestamp(Event2 element, long recordTimestamp) { 
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss"); 
Date date = null; 
try { 
date = simpleDateFormat.parse(element.getTime()); 
} catch (ParseException e) { 
throw new RuntimeException(e); 
} 
long time = date.getTime(); 
System.out.println(time); 
return time; 
} 
})); 

// 窗口聚合 
watermarkedStream.map(new MapFunction<Event2, Tuple2<String, Long>>() { 
@Override 
public Tuple2<String, Long> map(Event2 value) throws Exception { 
// 将数据转换成二元组,方便计算 
return Tuple2.of(value.getUser(), 1L); 
} 
}) 
.keyBy(r -> r.f0) 
// 设置滚动事件时间窗口 
.window(TumblingEventTimeWindows.of(Time.seconds(5))) 
.reduce(new ReduceFunction<Tuple2<String, Long>>() { 
@Override 
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, 
Long> value2) throws Exception { 
// 定义累加规则,窗口闭合时,向下游发送累加结果 
return Tuple2.of(value1.f0, value1.f1 + value2.f1); 
} 
}) 
.print("Aggregated stream"); 

env.execute(); 
} 
} 



Notably, if TumblingEventTimeWindows was replaced with 
TumblingProcessingTimeWindows, the program works well even with 
"watermarkedStream" 

Thanks for your time! 
Lucas 

Reply via email to