Re: python udf with flinksql

2023-05-23 Thread tom yang
Thanks Dian, that resolved my issues.

On Sun, May 21, 2023 at 6:55 PM Dian Fu  wrote:
>
> Hi Tom,
>
> The following statement is incorrect.
> ```
> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
> LANGUAGE PYTHON;
> ```
>
> You should define it as following:
> custom_udf_2.py
> ```
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes
>
> class MyUDF(ScalarFunction):
>
>   def __init__(self):
> self.counter = None
>
>   def open(self, function_context):
> self.counter = function_context.get_metric_group().counter("my_counter")
>
>   def eval(self, x, y):
> self.counter.inc()
> return x + y
>
> my_udf = udf(MyUDF(), result_type=DataTypes.BIGINT())
> ```
>
> And then use it in SQL as following:
> ```
> CREATE FUNCTION add AS 'custom_udf_2.my_udf'
> LANGUAGE PYTHON;
> ```
>
> Regards,
> Dian
>
> On Fri, May 19, 2023 at 6:23 AM tom yang  wrote:
>>
>> Hi
>>
>>
>> I am trying to create a flinksql program using python udf & using
>> metrics. This is my sample python file
>>
>> custom_udf_2.py
>>
>> ```
>> from pyflink.table.udf import ScalarFunction, udf
>> from pyflink.table import DataTypes
>>
>> class MyUDF(ScalarFunction):
>>
>>   def __init__(self):
>> self.counter = None
>>
>>   def open(self, function_context):
>> self.counter = function_context.get_metric_group().counter("my_counter")
>>
>>   def eval(self, x, y):
>> self.counter.inc()
>> return x + y
>>
>> ```
>>
>> This is my sql script
>>
>> ```
>> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
>> LANGUAGE PYTHON;
>>
>> CREATE TABLE datagen (
>> a BIGINT,
>> b BIGINT
>> ) WITH (
>> 'connector' = 'datagen',
>> 'fields.a.kind'='sequence',
>> 'fields.a.start'='1',
>> 'fields.a.end'='8',
>> 'fields.b.kind'='sequence',
>> 'fields.b.start'='4',
>> 'fields.b.end'='11'
>> );
>>
>> CREATE TABLE print_sink (
>> `sum` BIGINT
>> ) WITH (
>> 'connector' = 'print'
>> );
>>
>>
>> INSERT into print_sink (
>> select add(a,b) FROM datagen
>> );
>>
>> ```
>>
>> When I try to execute this program I get the following
>>
>>
>> ```
>> /bin/sql-client.sh -f ~/python_udf_lab.sql
>> --pyFiles ~/custom_udf_2.py
>>
>> Flink SQL> [INFO] Execute statement succeed.
>>
>> Flink SQL>
>> > CREATE TABLE datagen (
>> > a BIGINT,
>> > b BIGINT
>> > ) WITH (
>> > 'connector' = 'datagen',
>> > 'fields.a.kind'='sequence',
>> > 'fields.a.start'='1',
>> > 'fields.a.end'='8',
>> > 'fields.b.kind'='sequence',
>> > 'fields.b.start'='4',
>> > 'fields.b.end'='11'
>> > )[INFO] Execute statement succeed.
>>
>> Flink SQL>
>> > CREATE TABLE print_sink (
>> > `sum` BIGINT
>> > ) WITH (
>> > 'connector' = 'print'
>> > )[INFO] Execute statement succeed.
>>
>> Flink SQL>
>> >
>> > INSERT into print_sink (
>> > select add(a,b) FROM datagen
>> > )[ERROR] Could not execute SQL statement. Reason:
>> java.lang.IllegalStateException: Instantiating python function
>> 'custom_udf_2.MyUDF' failed.
>> ```
>>
>>
>> Ive tried multiple variations of
>> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
>> LANGUAGE PYTHON;
>>
>> CREATE FUNCTION add AS 'MyUDF'
>> LANGUAGE PYTHON;
>>
>>
>> fyi this is on flink 1.16.1 & python 3.9.13
>>
>> Admittingly I haven’t any documentation on the official documentation
>> with this usage. Is this usecase currently supported?
>> I know that it works with sql if I change the add function as,
>>
>> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
>> result_type=DataTypes.BIGINT())
>> def add(i, j):
>>   return i + j
>>
>> but then it doesn’t create any metrics
>>
>> Does anyone has any idea how I can get this to work specifically with
>> flinksql with python udf metrics
>>
>> Thanks,
>> Tom


Re: python udf with flinksql

2023-05-21 Thread Dian Fu
Hi Tom,

The following statement is incorrect.
```
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;
```

You should define it as following:
custom_udf_2.py
```
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes

class MyUDF(ScalarFunction):

  def __init__(self):
self.counter = None

  def open(self, function_context):
self.counter = function_context.get_metric_group().counter("my_counter")

  def eval(self, x, y):
self.counter.inc()
return x + y

my_udf = udf(MyUDF(), result_type=DataTypes.BIGINT())
```

And then use it in SQL as following:
```
CREATE FUNCTION add AS 'custom_udf_2.my_udf'
LANGUAGE PYTHON;
```

Regards,
Dian

On Fri, May 19, 2023 at 6:23 AM tom yang  wrote:

> Hi
>
>
> I am trying to create a flinksql program using python udf & using
> metrics. This is my sample python file
>
> custom_udf_2.py
>
> ```
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes
>
> class MyUDF(ScalarFunction):
>
>   def __init__(self):
> self.counter = None
>
>   def open(self, function_context):
> self.counter =
> function_context.get_metric_group().counter("my_counter")
>
>   def eval(self, x, y):
> self.counter.inc()
> return x + y
>
> ```
>
> This is my sql script
>
> ```
> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
> LANGUAGE PYTHON;
>
> CREATE TABLE datagen (
> a BIGINT,
> b BIGINT
> ) WITH (
> 'connector' = 'datagen',
> 'fields.a.kind'='sequence',
> 'fields.a.start'='1',
> 'fields.a.end'='8',
> 'fields.b.kind'='sequence',
> 'fields.b.start'='4',
> 'fields.b.end'='11'
> );
>
> CREATE TABLE print_sink (
> `sum` BIGINT
> ) WITH (
> 'connector' = 'print'
> );
>
>
> INSERT into print_sink (
> select add(a,b) FROM datagen
> );
>
> ```
>
> When I try to execute this program I get the following
>
>
> ```
> /bin/sql-client.sh -f ~/python_udf_lab.sql
> --pyFiles ~/custom_udf_2.py
>
> Flink SQL> [INFO] Execute statement succeed.
>
> Flink SQL>
> > CREATE TABLE datagen (
> > a BIGINT,
> > b BIGINT
> > ) WITH (
> > 'connector' = 'datagen',
> > 'fields.a.kind'='sequence',
> > 'fields.a.start'='1',
> > 'fields.a.end'='8',
> > 'fields.b.kind'='sequence',
> > 'fields.b.start'='4',
> > 'fields.b.end'='11'
> > )[INFO] Execute statement succeed.
>
> Flink SQL>
> > CREATE TABLE print_sink (
> > `sum` BIGINT
> > ) WITH (
> > 'connector' = 'print'
> > )[INFO] Execute statement succeed.
>
> Flink SQL>
> >
> > INSERT into print_sink (
> > select add(a,b) FROM datagen
> > )[ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Instantiating python function
> 'custom_udf_2.MyUDF' failed.
> ```
>
>
> Ive tried multiple variations of
> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
> LANGUAGE PYTHON;
>
> CREATE FUNCTION add AS 'MyUDF'
> LANGUAGE PYTHON;
>
>
> fyi this is on flink 1.16.1 & python 3.9.13
>
> Admittingly I haven’t any documentation on the official documentation
> with this usage. Is this usecase currently supported?
> I know that it works with sql if I change the add function as,
>
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
> result_type=DataTypes.BIGINT())
> def add(i, j):
>   return i + j
>
> but then it doesn’t create any metrics
>
> Does anyone has any idea how I can get this to work specifically with
> flinksql with python udf metrics
>
> Thanks,
> Tom
>


python udf with flinksql

2023-05-18 Thread tom yang
Hi


I am trying to create a flinksql program using python udf & using
metrics. This is my sample python file

custom_udf_2.py

```
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes

class MyUDF(ScalarFunction):

  def __init__(self):
self.counter = None

  def open(self, function_context):
self.counter = function_context.get_metric_group().counter("my_counter")

  def eval(self, x, y):
self.counter.inc()
return x + y

```

This is my sql script

```
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;

CREATE TABLE datagen (
a BIGINT,
b BIGINT
) WITH (
'connector' = 'datagen',
'fields.a.kind'='sequence',
'fields.a.start'='1',
'fields.a.end'='8',
'fields.b.kind'='sequence',
'fields.b.start'='4',
'fields.b.end'='11'
);

CREATE TABLE print_sink (
`sum` BIGINT
) WITH (
'connector' = 'print'
);


INSERT into print_sink (
select add(a,b) FROM datagen
);

```

When I try to execute this program I get the following


```
/bin/sql-client.sh -f ~/python_udf_lab.sql
--pyFiles ~/custom_udf_2.py

Flink SQL> [INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE datagen (
> a BIGINT,
> b BIGINT
> ) WITH (
> 'connector' = 'datagen',
> 'fields.a.kind'='sequence',
> 'fields.a.start'='1',
> 'fields.a.end'='8',
> 'fields.b.kind'='sequence',
> 'fields.b.start'='4',
> 'fields.b.end'='11'
> )[INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE print_sink (
> `sum` BIGINT
> ) WITH (
> 'connector' = 'print'
> )[INFO] Execute statement succeed.

Flink SQL>
>
> INSERT into print_sink (
> select add(a,b) FROM datagen
> )[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Instantiating python function
'custom_udf_2.MyUDF' failed.
```


Ive tried multiple variations of
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;

CREATE FUNCTION add AS 'MyUDF'
LANGUAGE PYTHON;


fyi this is on flink 1.16.1 & python 3.9.13

Admittingly I haven’t any documentation on the official documentation
with this usage. Is this usecase currently supported?
I know that it works with sql if I change the add function as,

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT())
def add(i, j):
  return i + j

but then it doesn’t create any metrics

Does anyone has any idea how I can get this to work specifically with
flinksql with python udf metrics

Thanks,
Tom