Re: python udf with flinksql

2023-05-23 Thread tom yang
Thanks Dian, that resolved my issues.

On Sun, May 21, 2023 at 6:55 PM Dian Fu  wrote:
>
> Hi Tom,
>
> The following statement is incorrect.
> ```
> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
> LANGUAGE PYTHON;
> ```
>
> You should define it as following:
> custom_udf_2.py
> ```
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes
>
> class MyUDF(ScalarFunction):
>
>   def __init__(self):
> self.counter = None
>
>   def open(self, function_context):
> self.counter = function_context.get_metric_group().counter("my_counter")
>
>   def eval(self, x, y):
> self.counter.inc()
> return x + y
>
> my_udf = udf(MyUDF(), result_type=DataTypes.BIGINT())
> ```
>
> And then use it in SQL as following:
> ```
> CREATE FUNCTION add AS 'custom_udf_2.my_udf'
> LANGUAGE PYTHON;
> ```
>
> Regards,
> Dian
>
> On Fri, May 19, 2023 at 6:23 AM tom yang  wrote:
>>
>> Hi
>>
>>
>> I am trying to create a flinksql program using python udf & using
>> metrics. This is my sample python file
>>
>> custom_udf_2.py
>>
>> ```
>> from pyflink.table.udf import ScalarFunction, udf
>> from pyflink.table import DataTypes
>>
>> class MyUDF(ScalarFunction):
>>
>>   def __init__(self):
>> self.counter = None
>>
>>   def open(self, function_context):
>> self.counter = function_context.get_metric_group().counter("my_counter")
>>
>>   def eval(self, x, y):
>> self.counter.inc()
>> return x + y
>>
>> ```
>>
>> This is my sql script
>>
>> ```
>> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
>> LANGUAGE PYTHON;
>>
>> CREATE TABLE datagen (
>> a BIGINT,
>> b BIGINT
>> ) WITH (
>> 'connector' = 'datagen',
>> 'fields.a.kind'='sequence',
>> 'fields.a.start'='1',
>> 'fields.a.end'='8',
>> 'fields.b.kind'='sequence',
>> 'fields.b.start'='4',
>> 'fields.b.end'='11'
>> );
>>
>> CREATE TABLE print_sink (
>> `sum` BIGINT
>> ) WITH (
>> 'connector' = 'print'
>> );
>>
>>
>> INSERT into print_sink (
>> select add(a,b) FROM datagen
>> );
>>
>> ```
>>
>> When I try to execute this program I get the following
>>
>>
>> ```
>> /bin/sql-client.sh -f ~/python_udf_lab.sql
>> --pyFiles ~/custom_udf_2.py
>>
>> Flink SQL> [INFO] Execute statement succeed.
>>
>> Flink SQL>
>> > CREATE TABLE datagen (
>> > a BIGINT,
>> > b BIGINT
>> > ) WITH (
>> > 'connector' = 'datagen',
>> > 'fields.a.kind'='sequence',
>> > 'fields.a.start'='1',
>> > 'fields.a.end'='8',
>> > 'fields.b.kind'='sequence',
>> > 'fields.b.start'='4',
>> > 'fields.b.end'='11'
>> > )[INFO] Execute statement succeed.
>>
>> Flink SQL>
>> > CREATE TABLE print_sink (
>> > `sum` BIGINT
>> > ) WITH (
>> > 'connector' = 'print'
>> > )[INFO] Execute statement succeed.
>>
>> Flink SQL>
>> >
>> > INSERT into print_sink (
>> > select add(a,b) FROM datagen
>> > )[ERROR] Could not execute SQL statement. Reason:
>> java.lang.IllegalStateException: Instantiating python function
>> 'custom_udf_2.MyUDF' failed.
>> ```
>>
>>
>> Ive tried multiple variations of
>> CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
>> LANGUAGE PYTHON;
>>
>> CREATE FUNCTION add AS 'MyUDF'
>> LANGUAGE PYTHON;
>>
>>
>> fyi this is on flink 1.16.1 & python 3.9.13
>>
>> Admittingly I haven’t any documentation on the official documentation
>> with this usage. Is this usecase currently supported?
>> I know that it works with sql if I change the add function as,
>>
>> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
>> result_type=DataTypes.BIGINT())
>> def add(i, j):
>>   return i + j
>>
>> but then it doesn’t create any metrics
>>
>> Does anyone has any idea how I can get this to work specifically with
>> flinksql with python udf metrics
>>
>> Thanks,
>> Tom


python udf with flinksql

2023-05-18 Thread tom yang
Hi


I am trying to create a flinksql program using python udf & using
metrics. This is my sample python file

custom_udf_2.py

```
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes

class MyUDF(ScalarFunction):

  def __init__(self):
self.counter = None

  def open(self, function_context):
self.counter = function_context.get_metric_group().counter("my_counter")

  def eval(self, x, y):
self.counter.inc()
return x + y

```

This is my sql script

```
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;

CREATE TABLE datagen (
a BIGINT,
b BIGINT
) WITH (
'connector' = 'datagen',
'fields.a.kind'='sequence',
'fields.a.start'='1',
'fields.a.end'='8',
'fields.b.kind'='sequence',
'fields.b.start'='4',
'fields.b.end'='11'
);

CREATE TABLE print_sink (
`sum` BIGINT
) WITH (
'connector' = 'print'
);


INSERT into print_sink (
select add(a,b) FROM datagen
);

```

When I try to execute this program I get the following


```
/bin/sql-client.sh -f ~/python_udf_lab.sql
--pyFiles ~/custom_udf_2.py

Flink SQL> [INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE datagen (
> a BIGINT,
> b BIGINT
> ) WITH (
> 'connector' = 'datagen',
> 'fields.a.kind'='sequence',
> 'fields.a.start'='1',
> 'fields.a.end'='8',
> 'fields.b.kind'='sequence',
> 'fields.b.start'='4',
> 'fields.b.end'='11'
> )[INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE print_sink (
> `sum` BIGINT
> ) WITH (
> 'connector' = 'print'
> )[INFO] Execute statement succeed.

Flink SQL>
>
> INSERT into print_sink (
> select add(a,b) FROM datagen
> )[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Instantiating python function
'custom_udf_2.MyUDF' failed.
```


Ive tried multiple variations of
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;

CREATE FUNCTION add AS 'MyUDF'
LANGUAGE PYTHON;


fyi this is on flink 1.16.1 & python 3.9.13

Admittingly I haven’t any documentation on the official documentation
with this usage. Is this usecase currently supported?
I know that it works with sql if I change the add function as,

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT())
def add(i, j):
  return i + j

but then it doesn’t create any metrics

Does anyone has any idea how I can get this to work specifically with
flinksql with python udf metrics

Thanks,
Tom


python udf out of memory

2023-03-28 Thread tom yang
Hi,

I am running a standalone cluster setup and submit flinksql job with python
udf following the examples here


github.com/ververica/flink-sql-cookbook/blob/main/udfs/01_python_udfs/01_python_udfs.md

I notice that each time I submit the job, cancel and resubmit, eventually
my task manager will throw an out of memory exception. I am sure it is due
to a leaky class loader somewhere but I am not sure how to track it down.
Has anyone experienced this issue before?


2023-03-24 04:55:46,380 ERROR
org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error
occurred while executing the TaskManager. Shutting it down...
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
should be increased. If the error persists (usually in cluster after
several job (re-)submissions) then there is probably a class loading leak
in user code or some of its dependencies which has to be investigated and
fixed. The task executor has to be shutdown... at
java.lang.ClassLoader.defineClass1(Native Method) ~[?:?] at
java.lang.ClassLoader.defineClass(ClassLoader.java:1017) ~[?:?] at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
~[?:?] at java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
~[?:?] at java.net.URLClassLoader$1.run(URLClassLoader.java:458) ~[?:?] at
java.net.URLClassLoader$1.run(URLClassLoader.java:452) ~[?:?] at
java.security.AccessController.doPrivileged(Native Method) ~[?:?] at
java.net.URLClassLoader.findClass(URLClassLoader.java:451) ~[?:?] at
java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?] at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
~[dpi-flink-sql-base-app-0.9.35.jar:?] at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
[dpi-flink-sql-base-app-0.9.35.jar:?] at
java.lang.ClassLoader.loadClass(ClassLoader.java:522) [?:?] at
org.apache.beam.sdk.options.PipelineOptionsFactory.(PipelineOptionsFactory.java:500)
[blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:238)
[blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
[blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
at
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
[blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
at
org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
[blob_p-bbc3c49fcdd79f0b3f7f6c99a18bd72516414de1-4563cd43f6f153fe0ec32993bf935209:1.16.1]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$927/0x000800a4ac40.call(Unknown
Source) [flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.runtime.taskmanager.Task$$Lambda$815/0x000800904840.run(Unknown
Source) [flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
[flink-dist-1.16.1.jar:1.16.1] at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
[flink-dist-1.16.1.jar:1.16.1] at java.lang.Thread.run(Thread.java:829)
[?:?]


asyhcnrouonous io question

2021-10-04 Thread tom yang
Hello, I have a recently ran into an issue with RichAsyncFunction and wanted to get some guidance from the community Please see snippet class AsyncFetchFromHttp extends RichAsyncFunction> {23    private transient AysncHttpClient client;45    @Override6    public void open(Configuration parameters) throws Exception {7    client = new AysncHttpClient();8    }910    @Override11    public void close() throws Exception {12    client.close();13    }1415    @Override16    public void asyncInvoke(String key, final ResultFuture> resultFuture) throws Exception {1718    // issue the asynchronous request, receive a future for result19    CompleteableFuture> future = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())20    21    future.whenCompleteAsync((response, throwable) -> {22  if (throwable != null ) {2324  resultFuture.completeExceptionally(throwable);25  }26  else {27    if (resp.statusCode() == HttpStatus.SC_OK) {28  resultFuture.complete(Collections.singleton(new Tuple2<>(key, response.body())29    }  30    else if (resp.statusCode() == HttpStatus.SC_NOT_FOUND) {32  resultFuture.complete(Collections.emptyList())33    }34    else {35   resultFuture.completeExceptionally(new RuntimeException("Server processing error"));36    }37  }38    39    })404142    }43} 1 . If the future completes exceptionally, ie resultFuture.completeExceptionally(throwable);does the input message get discarded?2. Should the request be made on a dedicated ExecutorService or is the forkpoolcommon sufficient?3. If the rest api service for example returns 404, should you complete with an empty collection or can you omit line 32 entirely? Thanks!