Hi Gabriel,

You could consider overriding the value.serializer
<https://kafka.apache.org/documentation/#producerconfigs_value.serializer>
 and value.deserializer
<https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer>
 (and similar for key) in the consumer and producer configuration that
Flink sets, using the `properties.*` option in the Kafka Connector.
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#properties>
Your serializer
and deserializer will have access to the headers, and can perform your
integrity checks, and can otherwise pass the byte[] around so the formats'
logic continues to handle SerDes otherwise.

Flink uses the ByteArray(De|S)erializers by default in its source
<https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470>
 and sink
<https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83>.
It's currently not possible to override the source serializer, but it's a
work in progress via https://issues.apache.org/jira/browse/FLINK-35808
<https://issues.apache.org/jira/browse/FLINK-35808>. Hoping to have it
merged soon.

Alternatively, you can wait for first-class header support in Flink table
Formats. There's some ongoing discussion and work via FLIP-454
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format>
 and this mailing list discussion
<https://lists.apache.org/thread/f1y8dzplmsjdzcwsw60cml1rnfl57nol>.

On Thu, Jul 11, 2024 at 2:13 PM Gabriel Giussi <gabrielgiu...@gmail.com>
wrote:

> Reading from a kafka topic with custom serialization/deserialization can
> be done using a KafkaSource configured with an implementation
> of KafkaRecordDeserializationSchema, which has access even to kafka headers
> which are used in my case for checking message integrity.
> How can we do the same but using the table API where you can just
> configure the value.format with a string to a predefined set of formats?
>
> Thanks.
>

Reply via email to