Hi Gabriel, You could consider overriding the value.serializer <https://kafka.apache.org/documentation/#producerconfigs_value.serializer> and value.deserializer <https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer> (and similar for key) in the consumer and producer configuration that Flink sets, using the `properties.*` option in the Kafka Connector. <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#properties> Your serializer and deserializer will have access to the headers, and can perform your integrity checks, and can otherwise pass the byte[] around so the formats' logic continues to handle SerDes otherwise.
Flink uses the ByteArray(De|S)erializers by default in its source <https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470> and sink <https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83>. It's currently not possible to override the source serializer, but it's a work in progress via https://issues.apache.org/jira/browse/FLINK-35808 <https://issues.apache.org/jira/browse/FLINK-35808>. Hoping to have it merged soon. Alternatively, you can wait for first-class header support in Flink table Formats. There's some ongoing discussion and work via FLIP-454 <https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format> and this mailing list discussion <https://lists.apache.org/thread/f1y8dzplmsjdzcwsw60cml1rnfl57nol>. On Thu, Jul 11, 2024 at 2:13 PM Gabriel Giussi <gabrielgiu...@gmail.com> wrote: > Reading from a kafka topic with custom serialization/deserialization can > be done using a KafkaSource configured with an implementation > of KafkaRecordDeserializationSchema, which has access even to kafka headers > which are used in my case for checking message integrity. > How can we do the same but using the table API where you can just > configure the value.format with a string to a predefined set of formats? > > Thanks. >