Hey all, so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON that create messages with a magic byte followed by a 4 byte schema id followed by the actual payload (refer the docs <https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format>). When I try to read such messages with the regular Protobuf, Avro and JSON formats in my Table API Program, it of course does not work. For Avro, Flink also has a Confluent-Avro format that can deal with this. However for Protobuf and JSON, there is nothing like this yet. I saw a ticket in the JIRA <https://issues.apache.org/jira/browse/FLINK-29731?filter=-4&jql=project%20=%20FLINK%20AND%20issuetype%20=%20%22New%20Feature%22%20AND%20text%20~%20%22protobuf%22%20order%20by%20created%20DESC>, but I cannot wait for this. Hence I wonder, how much effort it would be, to implement this myself - not in a production-ready way, but just in a way that makes my program not break. Meaning I would be happy with a solution that just ignores the first 5 bytes and passes the rest on to the existing handlers of Protobuf and JSON formats.
Now lets take for Example the existing Protobuf Format: I assume I have to implement the DeserializationFormatFactory, create a few Decoding and Encoding Formats, just like the PbDecodingFormat for example, then a new DeserializationSchema and there I would have such a method <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.html>: @Override public RowData deserialize(byte[] message) throws IOException { try { return protoToRowConverter.convertProtoBinaryToRow(message); } catch (Throwable t) { if (formatConfig.isIgnoreParseErrors()) { return null; } throw new IOException("Failed to deserialize PB object.", t); } } But instead of converting the message immediately, I would slice the first few Bytes off and go from there. Is this pretty much it, or is there more to it? -Theo
smime.p7s
Description: S/MIME cryptographic signature