Hi,

>>> 那这个时间戳是kafka接收到数据自动生成的时间吗?还是说消息发送给kafka的时候需要怎么设置把业务时间附上去?
这个时间戳来自Kafka record里的时间戳,可以参考代码
<https://github.com/apache/flink-connector-kafka/blob/00c9c8c74121136a0c1710ac77f307dc53adae99/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L52>
。它的生成逻辑由Kafka配置决定,如果用户没有指定的话,默认是消息创建时间,可以参考Kafka的文档
<https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>

>>> 感觉应该是发送数据到kafka的时候需要把业务时间给附上去,那在sink端怎么把时间附上去呢?
flink的Kafka connector里KafkaSink的实现是默认用input record里的时间戳,可以参考这里的实现
<https://github.com/apache/flink-connector-kafka/blob/00c9c8c74121136a0c1710ac77f307dc53adae99/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L190>
。


Best,
Biao Geng


ha.fen...@aisino.com <ha.fen...@aisino.com> 于2024年5月8日周三 10:59写道:

>
> DataStream<String> stream = env.fromSource(
>     kafkaSource,
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)),
> "mySource");
> 这样做使用的是Kafka记录自身的时间戳来定义watermark。
>
> 那这个时间戳是kafka接收到数据自动生成的时间吗?还是说消息发送给kafka的时候需要怎么设置把业务时间附上去?
> 感觉应该是发送数据到kafka的时候需要把业务时间给附上去,那在sink端怎么把时间附上去呢?
>

回复