[jira] [Created] (FLINK-11198) Access to MetricGroup in an AggregateFunction(Non Rich)

2018-12-18 Thread Chirag Dewan (JIRA)
Chirag Dewan created FLINK-11198:


 Summary: Access to MetricGroup in an AggregateFunction(Non Rich)
 Key: FLINK-11198
 URL: https://issues.apache.org/jira/browse/FLINK-11198
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API
Affects Versions: 1.6.2
Reporter: Chirag Dewan
 Fix For: 1.6.3


The only way to add custom metrics from UDF is through RuntimeContext. And, 
RuntimeContext is wired in every RichFunction implementation.

However, for aggregate() in Windowed Stream, we cannot use the Rich version of 
AggregateFunction. As I remotely understand, is done to avoid exposing the 
state in the Aggregate UDF. 

But can we have some minimal context which does not expose state but provide 
metrics, classloader etc.in the UDF? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-33290) Custom counters to capture encoding/decoding failure in flink

2023-10-17 Thread Chirag Dewan (Jira)
Chirag Dewan created FLINK-33290:


 Summary: Custom counters to capture encoding/decoding failure in 
flink
 Key: FLINK-33290
 URL: https://issues.apache.org/jira/browse/FLINK-33290
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Chirag Dewan


I need to get the difference between records that are collected by the source 
and the records that are emitted. (To capture deserialization failure)

Similarly, the difference between the records that have been received by sink 
and the records sent out of the sink. (To capture serialization failures)


For e.g. - If deserialization fails while reading records from kafka, in that 
case, I want to expose the difference between records collected from Kafka 
Broker and records emitted from Kafka operator after deserialization as a 
metric.

But I think flink does not provide any such metrics.

In Kafka Source I can have a workaround to get this metric:

I can override the open method from KafkaRecordDeserializationSchema where a 
metric can be added to show decoding failures:

    @Override
    public void open(DeserializationSchema.InitializationContext context) 
throws Exception {
    context.getMetricGroup().gauge("decodingFailures", new Gauge()
        {
            @Override
            public Integer getValue()
            {
                return decodingFailures;
            }
        });
    }

and at the time of deserialization I can increment that counter as below:

    @Override
    public void deserialize(ConsumerRecord record, Collector 
out)
    {
        try
        {
            //deserialize
        }
        catch (IOException | MMException e)
        {
            logger.error(String.format("Error received while decoding, in 
partition [%d] for topic [%s] at offset [%d]: %s",            
                partition, topic, offset, e.toString()));
           
            decodingFailures++;
}

*But there is no such way to implement this in FileSource, as 
SimpleStreamFormat/Reader does not provide access to Context in any way.*

*Similarly, I did not find any way to expose serialization related metrics in 
any of the sinks as well.*

Would it be possible to provide a way to implement custom counters to count 
serialization/deserialization failures in all Flink connectors (sinks & 
sources)?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33367) Invalid Check in DefaultFileFilter

2023-10-25 Thread Chirag Dewan (Jira)
Chirag Dewan created FLINK-33367:


 Summary: Invalid Check in DefaultFileFilter
 Key: FLINK-33367
 URL: https://issues.apache.org/jira/browse/FLINK-33367
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.16.2
Reporter: Chirag Dewan


There is a null check in DefaultFileFilter:

 

if (fileName == null || fileName.length() == 0) {
  return true;
}

 

So 2 questions here -

1) Can a file name ever be null?

2) What will be the behavior with return true? Should be it return false rather?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)