Hi,

Is there a way to retrieve the timestamps that Kafka associates with each
key-value pair within Flink? I would like to be able to use these as values
within my application flow, and defining them before or after Kafka is not
acceptable for the use case due to the latency involved in sending or
receiving from Kafka.

It seems that Flink supports Kafka event time (link
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>)
but after a brief trace it seems that KafkaConsumer010 still relies on the
Kafka09Fetcher
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L137>
for
iterating through each Kafka record and deserializing it. The
KeyedDeserializationSchema api does not seem to have support for including
timestamp as additional metadata (just offset, topic, and partition) so
something such as JSONKeyValueDeserializationSchema will not return the
Kafka-specified timestamp.

For reference, I am using Kafka 0.10.2 and the Flink-Streaming API + Kafka
Connector (1.2.1).

Thanks,
Jia Teoh

Reply via email to