Hey all,

so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON that 
create messages with a magic byte followed by a 4 byte schema id followed by 
the actual payload (refer the docs 
 When I try to read such messages with the regular Protobuf, Avro and JSON 
formats in my Table API Program, it of course does not work. For Avro, Flink 
also has a Confluent-Avro format that can deal with this. However for Protobuf 
and JSON, there is nothing like this yet. I saw a ticket in the JIRA 
 but I cannot wait for this. Hence I wonder, how much effort it would be, to 
implement this myself - not in a production-ready way, but just in a way that 
makes my program not break. Meaning I would be happy with a solution that just 
ignores the first 5 bytes and passes the rest on to the existing handlers of 
Protobuf and JSON formats.

Now lets take for Example the existing Protobuf Format: I assume I have to 
implement the DeserializationFormatFactory, create a few Decoding and Encoding 
Formats, just like the PbDecodingFormat for example, then a new 
DeserializationSchema and there I would have such a method 

public RowData deserialize(byte[] message) throws IOException {
    try {
        return protoToRowConverter.convertProtoBinaryToRow(message);
    } catch (Throwable t) {
        if (formatConfig.isIgnoreParseErrors()) {
            return null;
        throw new IOException("Failed to deserialize PB object.", t);
But instead of converting the message immediately, I would slice the first few 
Bytes off and go from there. Is this pretty much it, or is there more to it?


Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to