At least I hope it has been fixed. Which version and planner are you using?


On 11.12.19 11:47, Arujit Pradhan wrote:
Hi Timo,

Thanks for the bug reference.

You mentioned that this bug has been fixed. Is the fix available for flink 1.9+ and default query planner.

Thanks and regards,
/Arujit/

On Wed, Dec 11, 2019 at 3:56 PM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    I remember that we fixed some bug around this topic recently. The
    legacy
    planner should not be affected.

    There is another user reporting this:
    https://issues.apache.org/jira/browse/FLINK-15040

    Regards,
    Timo

    On 11.12.19 10:34, Dawid Wysakowicz wrote:
     > Hi Arujit,
     >
     > Could you also share the query where you use this UDF? It would also
     > help if you said which version of Flink you are using and which
    planner.
     >
     > Best,
     >
     > Dawid
     >
     > On 11/12/2019 10:21, Arujit Pradhan wrote:
     >> Hi all,
     >>
     >> So we are creating some User Defined Functions of type
     >> AggregateFunction. And we want to send some static metrics from the
     >> *open()* method of the UDFs as we can get *MetricGroup *by
     >> *FunctionContext *which is only exposed in the open method. Our
    code
     >> looks something like this(Which is an implementation of count
    distinct
     >> in SQL) :
     >>
     >> public class DistinctCount extends AggregateFunction<Integer,
     >> DistinctCountAccumulator> { @Override public
    DistinctCountAccumulator
     >> createAccumulator() { return new DistinctCountAccumulator(); }
     >> @Override public void open(FunctionContext context) throws
    Exception { super.open(context); MetricGroup metricGroup =
    context.getMetricGroup(); // add some metric to the group here
     >> System.out.println("in the open of UDF"); } @Override public void
     >> close() throws Exception { super.close(); } @Override public
    Integer
     >> getValue(DistinctCountAccumulator distinctCountAccumulator) {
    System.out.println("in the udf"); return
    distinctCountAccumulator.count(); } public void
    accumulate(DistinctCountAccumulator distinctCountAccumulator, String
    item) { if (item== null) { return; }
    distinctCountAccumulator.add(item); } }
     >>
     >> But when we use this UDF in FlinkSQL, it seems like the open
    method is
     >> not being called at all.
     >>
     >> From the filnk UDF documentation we find :
     >>
     >> *The |open()| method is called once before the evaluation
    method. The
     >> |close()| method after the last call to the evaluation method.*
     >>
     >> *The |open()| method provides a |FunctionContext| that contains
     >> information about the context in which user-defined functions are
     >> executed, such as the metric group, the distributed cache files, or
     >> the global job parameters.*
     >>
     >> Then is there any reason that open is not working in
     >> AggragateFunctions. Btw it works fine in case of
    ScalarFunctions. Is
     >> there any alternative scope where we can register some static
    metrics
     >> in a UDF.
     >>
     >>
     >> Thanks and regards,
     >> /Arujit/
     >>


Reply via email to