Chirag Dewan created FLINK-33290:
------------------------------------
Summary: Custom counters to capture encoding/decoding failure in
flink
Key: FLINK-33290
URL: https://issues.apache.org/jira/browse/FLINK-33290
Project: Flink
Issue Type: Improvement
Components: Runtime / Metrics
Reporter: Chirag Dewan
I need to get the difference between records that are collected by the source
and the records that are emitted. (To capture deserialization failure)
Similarly, the difference between the records that have been received by sink
and the records sent out of the sink. (To capture serialization failures)
For e.g. - If deserialization fails while reading records from kafka, in that
case, I want to expose the difference between records collected from Kafka
Broker and records emitted from Kafka operator after deserialization as a
metric.
But I think flink does not provide any such metrics.
In Kafka Source I can have a workaround to get this metric:
I can override the open method from KafkaRecordDeserializationSchema where a
metric can be added to show decoding failures:
@Override
public void open(DeserializationSchema.InitializationContext context)
throws Exception {
context.getMetricGroup().gauge("decodingFailures", new Gauge<Integer>()
{
@Override
public Integer getValue()
{
return decodingFailures;
}
});
}
and at the time of deserialization I can increment that counter as below:
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T>
out)
{
try
{
//deserialize
}
catch (IOException | MMException e)
{
logger.error(String.format("Error received while decoding, in
partition [%d] for topic [%s] at offset [%d]: %s",
partition, topic, offset, e.toString()));
decodingFailures++;
}
*But there is no such way to implement this in FileSource, as
SimpleStreamFormat/Reader does not provide access to Context in any way.*
*Similarly, I did not find any way to expose serialization related metrics in
any of the sinks as well.*
Would it be possible to provide a way to implement custom counters to count
serialization/deserialization failures in all Flink connectors (sinks &
sources)?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)