Hi, >>> 那这个时间戳是kafka接收到数据自动生成的时间吗?还是说消息发送给kafka的时候需要怎么设置把业务时间附上去? 这个时间戳来自Kafka record里的时间戳,可以参考代码 <https://github.com/apache/flink-connector-kafka/blob/00c9c8c74121136a0c1710ac77f307dc53adae99/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L52> 。它的生成逻辑由Kafka配置决定,如果用户没有指定的话,默认是消息创建时间,可以参考Kafka的文档 <https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
>>> 感觉应该是发送数据到kafka的时候需要把业务时间给附上去,那在sink端怎么把时间附上去呢? flink的Kafka connector里KafkaSink的实现是默认用input record里的时间戳,可以参考这里的实现 <https://github.com/apache/flink-connector-kafka/blob/00c9c8c74121136a0c1710ac77f307dc53adae99/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L190> 。 Best, Biao Geng ha.fen...@aisino.com <ha.fen...@aisino.com> 于2024年5月8日周三 10:59写道: > > DataStream<String> stream = env.fromSource( > kafkaSource, > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), > "mySource"); > 这样做使用的是Kafka记录自身的时间戳来定义watermark。 > > 那这个时间戳是kafka接收到数据自动生成的时间吗?还是说消息发送给kafka的时候需要怎么设置把业务时间附上去? > 感觉应该是发送数据到kafka的时候需要把业务时间给附上去,那在sink端怎么把时间附上去呢? >