Hey,
I have a question about using metrics based on filtered data.
Basically, I have handlers for many types of events I get from my data source 
(in my case, Kafka), and every handler has it's own filter function.
That given handler also has a Counter, incrementing every time it filters out 
an event (as part of the FilterFunction).

Problem arrises when I use that FilterFunction on the DataSourceStream - the 
handler's open() function hasn't been called and thus the metrics have never 
been initiated.
Do I have a way of making this work? Or any other way of counting events that 
have been filtered out from the DataStreamSource?

Handler:

public abstract class Handler extends RichMapFunction<Event, String> {
    private transient Counter filteredCounter;
    private boolean isInit = false;

    @Override
    public void open(Configuration parameters) throws Exception {
        if (!isInit) {
            MetricGroup metricGroup = 
getRuntimeContext().getMetricGroup().addGroup(getClass().getSimpleName());
            filteredCounter = 
metricGroup.counter(CustomMetricsManager.getFilteredSuffix());
            isInit = true;
        }
    }

    public final FilterFunction getFilter() {
        return (FilterFunction<Event>) event -> {
            boolean res = filter(event);
            if (!res) {
                filteredCounter.inc();
            }
            return res;
        };
    }

    abstract protected boolean filter(Event event);
}

And when I init the DataStreamSource:

Handler handler = (Handler) 
Class.forName(handlerName).getConstructor().newInstance();
dataStream = dataStreamSource.filter(handler.getFilter()).map(handler);

Any help would be much appreciated!

Thanks 🙂



Reply via email to