I guess metering deserialization errors may be done by flink metrics, but now it is missing out of the box.
> - I wondered if you might also want to count how many were successfully > parsed in a non-protobuf layer (Dynamic table sort of level) No, this is not a requirement. I have a simple flink-sql job that streams proto-messages from kafka to hive, with quite simple mapping. > - do you know why some messages do not parse? It would interesting to know > the types of non-parsable messages, I assume with the idea to understand and > potentially fix the source / parser limitations. Because of a human mistake: developers can introduce non backward compatible changes in the producer, emitted proto-messages will not be deserialized on a flink-app side. Failing flink-job actually is an option worth considering (because now it's the only one source of proto-messages in the topic and it's ok to halt-and-repair). ср, 19 июн. 2024 г. в 11:39, David Radley <david_rad...@uk.ibm.com>: > > Hi Ilya, > > I have not got any experience of doing this, but wonder if we could use the > Flink Metrics . I wonder: > > - There could be hook point at that part of the code to discover some custom > code that implements the metrics. > > - A better way might be to throw a new unable-to-parse Exception that could > be caught in non-protobuf code and then a count metric be incremented. I > guess the metrics would be scoped to a Job – like the watermark metrics. > > - I wondered if you might also want to count how many were successfully > parsed in a non-protobuf layer (Dynamic table sort of level) > > - do you know why some messages do not parse? It would interesting to know > the types of non-parsable messages, I assume with the idea to understand and > potentially fix the source / parser limitations. > > > > Kind regards, David. > > > > From: Ilya Karpov <idkf...@gmail.com> > Date: Monday, 17 June 2024 at 12:39 > To: user <user@flink.apache.org> > Subject: [EXTERNAL] A way to meter number of deserialization errors > > Hi all, we are planning to use flink as a connector between kafka and > external systems. We use protobuf as a message format in kafka. If > non-backward compatible changes occur we want to skip those messages > ('protobuf. ignore-parse-errors' > > Hi all, > > we are planning to use flink as a connector between kafka and external > systems. We use protobuf as a message format in kafka. If non-backward > compatible changes occur we want to skip those messages > ('protobuf.ignore-parse-errors' = 'true') but record an error and raise an > alert. I didn't find any way to record deserialization errors > (https://github.com/apache/flink/blob/3a15d1ce69ac21d619f60033ec45cae303489c8f/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java#L73), > does it exist? > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU