xiechenling created FLINK-32725:
-----------------------------------
Summary: Add option to control writing of timestamp to Kafka topic
in KafkaRecordSerializationSchema.builder
Key: FLINK-32725
URL: https://issues.apache.org/jira/browse/FLINK-32725
Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Affects Versions: 1.14.0
Environment: flink 1.16.2
Reporter: xiechenling
In the older versions of Kafka sink for Flink, it was possible to configure
whether the message timestamp should be written to Kafka. This was achievable
using the method `FlinkKafkaProducer.setWriteTimestampToKafka(true)`.
However, in the newer versions of Kafka sink, when using
`KafkaRecordSerializationSchema.builder()`, the message timestamp is
automatically written to the Kafka topic using the context's timestamp.
{code:scala}
KafkaSink
...
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
...
.build()
{code}
If a user wishes to exclude the timestamp from being written to Kafka, they
currently need to create a custom `KafkaRecordSerializationSchema` by extending
it and overriding the `serialize` method.
{code:scala}
KafkaSink.builder[(String, String)]()
.setBootstrapServers(kafkaAddress)
.setRecordSerializer((element: (String, String), context:
KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long) => {
new ProducerRecord(sinkTopic, element._1.getBytes, element._2.getBytes)
})
{code}
I propose adding a new method, similar to `setWriteTimestampToKafka`, to
`KafkaRecordSerializationSchema.builder()`, which allows users to control
whether the timestamp should be included in the output to the Kafka topic. This
would provide a more straightforward and consistent approach for users who do
not want the timestamp to be written to Kafka.
Thank you for considering this enhancement.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)