Hi

I am trying to create a flinksql program using python udf & using
metrics. This is my sample python file

custom_udf_2.py

```
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes

class MyUDF(ScalarFunction):

  def __init__(self):
    self.counter = None

  def open(self, function_context):
    self.counter = function_context.get_metric_group().counter("my_counter")

  def eval(self, x, y):
    self.counter.inc()
    return x + y

```

This is my sql script

```
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;

CREATE TABLE datagen (
    a BIGINT,
    b BIGINT
) WITH (
    'connector' = 'datagen',
    'fields.a.kind'='sequence',
    'fields.a.start'='1',
    'fields.a.end'='8',
    'fields.b.kind'='sequence',
    'fields.b.start'='4',
    'fields.b.end'='11'
);

CREATE TABLE print_sink (
    `sum` BIGINT
) WITH (
    'connector' = 'print'
);


INSERT into print_sink (
    select add(a,b) FROM datagen
);

```

When I try to execute this program I get the following


```
/bin/sql-client.sh -f ~/python_udf_lab.sql
    --pyFiles ~/custom_udf_2.py

Flink SQL> [INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE datagen (
>     a BIGINT,
>     b BIGINT
> ) WITH (
>     'connector' = 'datagen',
>     'fields.a.kind'='sequence',
>     'fields.a.start'='1',
>     'fields.a.end'='8',
>     'fields.b.kind'='sequence',
>     'fields.b.start'='4',
>     'fields.b.end'='11'
> )[INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE print_sink (
>     `sum` BIGINT
> ) WITH (
>     'connector' = 'print'
> )[INFO] Execute statement succeed.

Flink SQL>
>
> INSERT into print_sink (
>     select add(a,b) FROM datagen
> )[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Instantiating python function
'custom_udf_2.MyUDF' failed.
```


Ive tried multiple variations of
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;

CREATE FUNCTION add AS 'MyUDF'
LANGUAGE PYTHON;


fyi this is on flink 1.16.1 & python 3.9.13

Admittingly I haven’t any documentation on the official documentation
with this usage. Is this usecase currently supported?
I know that it works with sql if I change the add function as,

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT())
def add(i, j):
  return i + j

but then it doesn’t create any metrics

Does anyone has any idea how I can get this to work specifically with
flinksql with python udf metrics

Thanks,
Tom

Reply via email to