I guess metering deserialization errors may be done by flink metrics,
but now it is missing out of the box.

> - I wondered if you might also want to count how many were successfully 
> parsed in a non-protobuf layer (Dynamic table sort of level)
No, this is not a requirement. I have a simple flink-sql job that
streams proto-messages from kafka to hive, with quite simple mapping.

> - do you know why some messages do not parse? It would interesting to know 
> the types of non-parsable messages, I assume with the idea to understand and 
> potentially fix the source / parser limitations.
Because of a human mistake: developers can introduce non backward
compatible changes in the producer, emitted proto-messages will not be
deserialized on a flink-app side. Failing flink-job actually is an
option worth considering (because now it's the only one source of
proto-messages in the topic and it's ok to halt-and-repair).


ср, 19 июн. 2024 г. в 11:39, David Radley <david_rad...@uk.ibm.com>:
>
> Hi Ilya,
>
> I have not got any experience of doing this, but wonder if we could use the 
> Flink Metrics . I wonder:
>
> - There could be hook point at that part of the code to discover some custom 
> code that implements the metrics.
>
> - A better way might be to throw a new unable-to-parse Exception that could 
> be caught in non-protobuf code and then a count metric be incremented. I 
> guess the metrics would be scoped to a Job – like the watermark metrics.
>
> - I wondered if you might also want to count how many were successfully 
> parsed in a non-protobuf layer (Dynamic table sort of level)
>
> - do you know why some messages do not parse? It would interesting to know 
> the types of non-parsable messages, I assume with the idea to understand and 
> potentially fix the source / parser limitations.
>
>
>
> Kind regards, David.
>
>
>
> From: Ilya Karpov <idkf...@gmail.com>
> Date: Monday, 17 June 2024 at 12:39
> To: user <user@flink.apache.org>
> Subject: [EXTERNAL] A way to meter number of deserialization errors
>
> Hi all, we are planning to use flink as a connector between kafka and 
> external systems. We use protobuf as a message format in kafka. If 
> non-backward compatible changes occur we want to skip those messages 
> ('protobuf. ignore-parse-errors'
>
> Hi all,
>
> we are planning to use flink as a connector between kafka and external 
> systems. We use protobuf as a message format in kafka. If non-backward 
> compatible changes occur we want to skip those messages 
> ('protobuf.ignore-parse-errors' = 'true') but record an error and raise an 
> alert. I didn't find any way to record deserialization errors 
> (https://github.com/apache/flink/blob/3a15d1ce69ac21d619f60033ec45cae303489c8f/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java#L73),
>  does it exist?
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to