Re: python udf with flinksql
Thanks Dian, that resolved my issues. On Sun, May 21, 2023 at 6:55 PM Dian Fu wrote: > > Hi Tom, > > The following statement is incorrect. > ``` > CREATE FUNCTION add AS 'custom_udf_2.MyUDF' > LANGUAGE PYTHON; > ``` > > You should define it as following: > custom_udf_2.py > ``` > from pyflink.table.udf import ScalarFunction, udf > from pyflink.table import DataTypes > > class MyUDF(ScalarFunction): > > def __init__(self): > self.counter = None > > def open(self, function_context): > self.counter = function_context.get_metric_group().counter("my_counter") > > def eval(self, x, y): > self.counter.inc() > return x + y > > my_udf = udf(MyUDF(), result_type=DataTypes.BIGINT()) > ``` > > And then use it in SQL as following: > ``` > CREATE FUNCTION add AS 'custom_udf_2.my_udf' > LANGUAGE PYTHON; > ``` > > Regards, > Dian > > On Fri, May 19, 2023 at 6:23 AM tom yang wrote: >> >> Hi >> >> >> I am trying to create a flinksql program using python udf & using >> metrics. This is my sample python file >> >> custom_udf_2.py >> >> ``` >> from pyflink.table.udf import ScalarFunction, udf >> from pyflink.table import DataTypes >> >> class MyUDF(ScalarFunction): >> >> def __init__(self): >> self.counter = None >> >> def open(self, function_context): >> self.counter = function_context.get_metric_group().counter("my_counter") >> >> def eval(self, x, y): >> self.counter.inc() >> return x + y >> >> ``` >> >> This is my sql script >> >> ``` >> CREATE FUNCTION add AS 'custom_udf_2.MyUDF' >> LANGUAGE PYTHON; >> >> CREATE TABLE datagen ( >> a BIGINT, >> b BIGINT >> ) WITH ( >> 'connector' = 'datagen', >> 'fields.a.kind'='sequence', >> 'fields.a.start'='1', >> 'fields.a.end'='8', >> 'fields.b.kind'='sequence', >> 'fields.b.start'='4', >> 'fields.b.end'='11' >> ); >> >> CREATE TABLE print_sink ( >> `sum` BIGINT >> ) WITH ( >> 'connector' = 'print' >> ); >> >> >> INSERT into print_sink ( >> select add(a,b) FROM datagen >> ); >> >> ``` >> >> When I try to execute this program I get the following >> >> >> ``` >> /bin/sql-client.sh -f ~/python_udf_lab.sql >> --pyFiles ~/custom_udf_2.py >> >> Flink SQL> [INFO] Execute statement succeed. >> >> Flink SQL> >> > CREATE TABLE datagen ( >> > a BIGINT, >> > b BIGINT >> > ) WITH ( >> > 'connector' = 'datagen', >> > 'fields.a.kind'='sequence', >> > 'fields.a.start'='1', >> > 'fields.a.end'='8', >> > 'fields.b.kind'='sequence', >> > 'fields.b.start'='4', >> > 'fields.b.end'='11' >> > )[INFO] Execute statement succeed. >> >> Flink SQL> >> > CREATE TABLE print_sink ( >> > `sum` BIGINT >> > ) WITH ( >> > 'connector' = 'print' >> > )[INFO] Execute statement succeed. >> >> Flink SQL> >> > >> > INSERT into print_sink ( >> > select add(a,b) FROM datagen >> > )[ERROR] Could not execute SQL statement. Reason: >> java.lang.IllegalStateException: Instantiating python function >> 'custom_udf_2.MyUDF' failed. >> ``` >> >> >> Ive tried multiple variations of >> CREATE FUNCTION add AS 'custom_udf_2.MyUDF' >> LANGUAGE PYTHON; >> >> CREATE FUNCTION add AS 'MyUDF' >> LANGUAGE PYTHON; >> >> >> fyi this is on flink 1.16.1 & python 3.9.13 >> >> Admittingly I haven’t any documentation on the official documentation >> with this usage. Is this usecase currently supported? >> I know that it works with sql if I change the add function as, >> >> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], >> result_type=DataTypes.BIGINT()) >> def add(i, j): >> return i + j >> >> but then it doesn’t create any metrics >> >> Does anyone has any idea how I can get this to work specifically with >> flinksql with python udf metrics >> >> Thanks, >> Tom
Re: python udf with flinksql
Hi Tom, The following statement is incorrect. ``` CREATE FUNCTION add AS 'custom_udf_2.MyUDF' LANGUAGE PYTHON; ``` You should define it as following: custom_udf_2.py ``` from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes class MyUDF(ScalarFunction): def __init__(self): self.counter = None def open(self, function_context): self.counter = function_context.get_metric_group().counter("my_counter") def eval(self, x, y): self.counter.inc() return x + y my_udf = udf(MyUDF(), result_type=DataTypes.BIGINT()) ``` And then use it in SQL as following: ``` CREATE FUNCTION add AS 'custom_udf_2.my_udf' LANGUAGE PYTHON; ``` Regards, Dian On Fri, May 19, 2023 at 6:23 AM tom yang wrote: > Hi > > > I am trying to create a flinksql program using python udf & using > metrics. This is my sample python file > > custom_udf_2.py > > ``` > from pyflink.table.udf import ScalarFunction, udf > from pyflink.table import DataTypes > > class MyUDF(ScalarFunction): > > def __init__(self): > self.counter = None > > def open(self, function_context): > self.counter = > function_context.get_metric_group().counter("my_counter") > > def eval(self, x, y): > self.counter.inc() > return x + y > > ``` > > This is my sql script > > ``` > CREATE FUNCTION add AS 'custom_udf_2.MyUDF' > LANGUAGE PYTHON; > > CREATE TABLE datagen ( > a BIGINT, > b BIGINT > ) WITH ( > 'connector' = 'datagen', > 'fields.a.kind'='sequence', > 'fields.a.start'='1', > 'fields.a.end'='8', > 'fields.b.kind'='sequence', > 'fields.b.start'='4', > 'fields.b.end'='11' > ); > > CREATE TABLE print_sink ( > `sum` BIGINT > ) WITH ( > 'connector' = 'print' > ); > > > INSERT into print_sink ( > select add(a,b) FROM datagen > ); > > ``` > > When I try to execute this program I get the following > > > ``` > /bin/sql-client.sh -f ~/python_udf_lab.sql > --pyFiles ~/custom_udf_2.py > > Flink SQL> [INFO] Execute statement succeed. > > Flink SQL> > > CREATE TABLE datagen ( > > a BIGINT, > > b BIGINT > > ) WITH ( > > 'connector' = 'datagen', > > 'fields.a.kind'='sequence', > > 'fields.a.start'='1', > > 'fields.a.end'='8', > > 'fields.b.kind'='sequence', > > 'fields.b.start'='4', > > 'fields.b.end'='11' > > )[INFO] Execute statement succeed. > > Flink SQL> > > CREATE TABLE print_sink ( > > `sum` BIGINT > > ) WITH ( > > 'connector' = 'print' > > )[INFO] Execute statement succeed. > > Flink SQL> > > > > INSERT into print_sink ( > > select add(a,b) FROM datagen > > )[ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalStateException: Instantiating python function > 'custom_udf_2.MyUDF' failed. > ``` > > > Ive tried multiple variations of > CREATE FUNCTION add AS 'custom_udf_2.MyUDF' > LANGUAGE PYTHON; > > CREATE FUNCTION add AS 'MyUDF' > LANGUAGE PYTHON; > > > fyi this is on flink 1.16.1 & python 3.9.13 > > Admittingly I haven’t any documentation on the official documentation > with this usage. Is this usecase currently supported? > I know that it works with sql if I change the add function as, > > @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], > result_type=DataTypes.BIGINT()) > def add(i, j): > return i + j > > but then it doesn’t create any metrics > > Does anyone has any idea how I can get this to work specifically with > flinksql with python udf metrics > > Thanks, > Tom >
python udf with flinksql
Hi I am trying to create a flinksql program using python udf & using metrics. This is my sample python file custom_udf_2.py ``` from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes class MyUDF(ScalarFunction): def __init__(self): self.counter = None def open(self, function_context): self.counter = function_context.get_metric_group().counter("my_counter") def eval(self, x, y): self.counter.inc() return x + y ``` This is my sql script ``` CREATE FUNCTION add AS 'custom_udf_2.MyUDF' LANGUAGE PYTHON; CREATE TABLE datagen ( a BIGINT, b BIGINT ) WITH ( 'connector' = 'datagen', 'fields.a.kind'='sequence', 'fields.a.start'='1', 'fields.a.end'='8', 'fields.b.kind'='sequence', 'fields.b.start'='4', 'fields.b.end'='11' ); CREATE TABLE print_sink ( `sum` BIGINT ) WITH ( 'connector' = 'print' ); INSERT into print_sink ( select add(a,b) FROM datagen ); ``` When I try to execute this program I get the following ``` /bin/sql-client.sh -f ~/python_udf_lab.sql --pyFiles ~/custom_udf_2.py Flink SQL> [INFO] Execute statement succeed. Flink SQL> > CREATE TABLE datagen ( > a BIGINT, > b BIGINT > ) WITH ( > 'connector' = 'datagen', > 'fields.a.kind'='sequence', > 'fields.a.start'='1', > 'fields.a.end'='8', > 'fields.b.kind'='sequence', > 'fields.b.start'='4', > 'fields.b.end'='11' > )[INFO] Execute statement succeed. Flink SQL> > CREATE TABLE print_sink ( > `sum` BIGINT > ) WITH ( > 'connector' = 'print' > )[INFO] Execute statement succeed. Flink SQL> > > INSERT into print_sink ( > select add(a,b) FROM datagen > )[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: Instantiating python function 'custom_udf_2.MyUDF' failed. ``` Ive tried multiple variations of CREATE FUNCTION add AS 'custom_udf_2.MyUDF' LANGUAGE PYTHON; CREATE FUNCTION add AS 'MyUDF' LANGUAGE PYTHON; fyi this is on flink 1.16.1 & python 3.9.13 Admittingly I haven’t any documentation on the official documentation with this usage. Is this usecase currently supported? I know that it works with sql if I change the add function as, @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j but then it doesn’t create any metrics Does anyone has any idea how I can get this to work specifically with flinksql with python udf metrics Thanks, Tom