Hi
I am trying to create a flinksql program using python udf & using metrics. This is my sample python file custom_udf_2.py ``` from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes class MyUDF(ScalarFunction): def __init__(self): self.counter = None def open(self, function_context): self.counter = function_context.get_metric_group().counter("my_counter") def eval(self, x, y): self.counter.inc() return x + y ``` This is my sql script ``` CREATE FUNCTION add AS 'custom_udf_2.MyUDF' LANGUAGE PYTHON; CREATE TABLE datagen ( a BIGINT, b BIGINT ) WITH ( 'connector' = 'datagen', 'fields.a.kind'='sequence', 'fields.a.start'='1', 'fields.a.end'='8', 'fields.b.kind'='sequence', 'fields.b.start'='4', 'fields.b.end'='11' ); CREATE TABLE print_sink ( `sum` BIGINT ) WITH ( 'connector' = 'print' ); INSERT into print_sink ( select add(a,b) FROM datagen ); ``` When I try to execute this program I get the following ``` /bin/sql-client.sh -f ~/python_udf_lab.sql --pyFiles ~/custom_udf_2.py Flink SQL> [INFO] Execute statement succeed. Flink SQL> > CREATE TABLE datagen ( > a BIGINT, > b BIGINT > ) WITH ( > 'connector' = 'datagen', > 'fields.a.kind'='sequence', > 'fields.a.start'='1', > 'fields.a.end'='8', > 'fields.b.kind'='sequence', > 'fields.b.start'='4', > 'fields.b.end'='11' > )[INFO] Execute statement succeed. Flink SQL> > CREATE TABLE print_sink ( > `sum` BIGINT > ) WITH ( > 'connector' = 'print' > )[INFO] Execute statement succeed. Flink SQL> > > INSERT into print_sink ( > select add(a,b) FROM datagen > )[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: Instantiating python function 'custom_udf_2.MyUDF' failed. ``` Ive tried multiple variations of CREATE FUNCTION add AS 'custom_udf_2.MyUDF' LANGUAGE PYTHON; CREATE FUNCTION add AS 'MyUDF' LANGUAGE PYTHON; fyi this is on flink 1.16.1 & python 3.9.13 Admittingly I haven’t any documentation on the official documentation with this usage. Is this usecase currently supported? I know that it works with sql if I change the add function as, @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j but then it doesn’t create any metrics Does anyone has any idea how I can get this to work specifically with flinksql with python udf metrics Thanks, Tom