Re: Table API: Custom Kafka formats for Confluent Protobuf and JSON

2022-11-29 Thread Theodor Wübker
Hey Yaroslav!,

I suppose I will try it like this. The lookup would indeed be nice too, I will 
have a closer look at the corresponding source code. Thanks!

-Theo
> On 29. Nov 2022, at 17:41, Yaroslav Tkachenko  wrote:
> 
> Hey Theodor,
> 
> That's pretty much it, assuming your Protobuf schema is more or less fixed. 
> But for a production workload, you'd need to add a Schema Registry lookup. I 
> guess the implementation for that would be similar to what's in the Avro 
> format.
> 
> On Tue, Nov 29, 2022 at 2:26 AM Theodor Wübker  > wrote:
> Hey all,
> 
> so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON that 
> create messages with a magic byte followed by a 4 byte schema id followed by 
> the actual payload (refer the docs 
> ).
>  When I try to read such messages with the regular Protobuf, Avro and JSON 
> formats in my Table API Program, it of course does not work. For Avro, Flink 
> also has a Confluent-Avro format that can deal with this. However for 
> Protobuf and JSON, there is nothing like this yet. I saw a ticket in the JIRA 
> ,
>  but I cannot wait for this. Hence I wonder, how much effort it would be, to 
> implement this myself - not in a production-ready way, but just in a way that 
> makes my program not break. Meaning I would be happy with a solution that 
> just ignores the first 5 bytes and passes the rest on to the existing 
> handlers of Protobuf and JSON formats.
> 
> 
> Now lets take for Example the existing Protobuf Format: I assume I have to 
> implement the DeserializationFormatFactory, create a few Decoding and 
> Encoding Formats, just like the PbDecodingFormat for example, then a new 
> DeserializationSchema and there I would have such a method 
> :
> 
> @Override
> public RowData deserialize(byte[] message) throws IOException {
> try {
> return protoToRowConverter.convertProtoBinaryToRow(message);
> } catch (Throwable t) {
> if (formatConfig.isIgnoreParseErrors()) {
> return null;
> }
> throw new IOException("Failed to deserialize PB object.", t);
> }
> }
> But instead of converting the message immediately, I would slice the first 
> few Bytes off and go from there. Is this pretty much it, or is there more to 
> it?
> 
> -Theo
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Table API: Custom Kafka formats for Confluent Protobuf and JSON

2022-11-29 Thread Yaroslav Tkachenko
Hey Theodor,

That's pretty much it, assuming your Protobuf schema is more or less fixed.
But for a production workload, you'd need to add a Schema Registry lookup.
I guess the implementation for that would be similar to what's in the Avro
format.

On Tue, Nov 29, 2022 at 2:26 AM Theodor Wübker 
wrote:

> Hey all,
>
> so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON
> that create messages with a magic byte followed by a 4 byte schema id
> followed by the actual payload (refer the docs
> ).
> When I try to read such messages with the regular Protobuf, Avro and JSON
> formats in my Table API Program, it of course does not work. For Avro,
> Flink also has a Confluent-Avro format that can deal with this. However for
> Protobuf and JSON, there is nothing like this yet. I saw a ticket in the
> JIRA
> ,
> but I cannot wait for this. Hence I wonder, how much effort it would be, to
> implement this myself - not in a production-ready way, but just in a way
> that makes my program not break. Meaning I would be happy with a solution
> that just ignores the first 5 bytes and passes the rest on to the existing
> handlers of Protobuf and JSON formats.
>
>
> Now lets take for Example the existing Protobuf Format: I assume I have to
> implement the DeserializationFormatFactory, create a few Decoding and
> Encoding Formats, just like the PbDecodingFormat for example, then a new
> DeserializationSchema and there I would have such a method
> 
> :
>
> @Override
> public RowData deserialize(byte[] message) throws IOException {
> try {
> return protoToRowConverter.convertProtoBinaryToRow(message);
> } catch (Throwable t) {
> if (formatConfig.isIgnoreParseErrors()) {
> return null;
> }
> throw new IOException("Failed to deserialize PB object.", t);
> }
> }
>
> But instead of converting the message immediately, I would slice the first
> few Bytes off and go from there. Is this pretty much it, or is there more
> to it?
>
> -Theo
>
>


Table API: Custom Kafka formats for Confluent Protobuf and JSON

2022-11-29 Thread Theodor Wübker
Hey all,

so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON that 
create messages with a magic byte followed by a 4 byte schema id followed by 
the actual payload (refer the docs 
).
 When I try to read such messages with the regular Protobuf, Avro and JSON 
formats in my Table API Program, it of course does not work. For Avro, Flink 
also has a Confluent-Avro format that can deal with this. However for Protobuf 
and JSON, there is nothing like this yet. I saw a ticket in the JIRA 
,
 but I cannot wait for this. Hence I wonder, how much effort it would be, to 
implement this myself - not in a production-ready way, but just in a way that 
makes my program not break. Meaning I would be happy with a solution that just 
ignores the first 5 bytes and passes the rest on to the existing handlers of 
Protobuf and JSON formats.


Now lets take for Example the existing Protobuf Format: I assume I have to 
implement the DeserializationFormatFactory, create a few Decoding and Encoding 
Formats, just like the PbDecodingFormat for example, then a new 
DeserializationSchema and there I would have such a method 
:

@Override
public RowData deserialize(byte[] message) throws IOException {
try {
return protoToRowConverter.convertProtoBinaryToRow(message);
} catch (Throwable t) {
if (formatConfig.isIgnoreParseErrors()) {
return null;
}
throw new IOException("Failed to deserialize PB object.", t);
}
}
But instead of converting the message immediately, I would slice the first few 
Bytes off and go from there. Is this pretty much it, or is there more to it?

-Theo



smime.p7s
Description: S/MIME cryptographic signature