Log rollover for logs.

2021-04-27 Thread John Smith
Hi, I'm running flink as a systemd service with...

[Service]
Type=forking
WorkingDirectory=/opt/flink
User=flink
Group=flink
ExecStart=/opt/flink/bin/taskmanager.sh start
ExecStop=/opt/flink/bin/taskmanager.sh stop
TimeoutSec=30
Restart=on-failure

My log4j.porperties file is at /opt/flink/conf

Is it as simple as just setting the log rollover in there? Cause sometimes
with certain services like zookeeper some env variable overrides etc... So
just wondering if it's that straight forward with flink.


Re: Contradictory docs: python.files config can include not only python files

2021-04-27 Thread Yik San Chan
Hi Dian,

I created a PR to fix the docs. https://github.com/apache/flink/pull/15779

On Tue, Apr 27, 2021 at 2:08 PM Dian Fu  wrote:

> Thanks for the suggestion. It makes sense to me~.
>
> 2021年4月27日 上午10:28,Yik San Chan  写道:
>
> Hi Dian,
>
> If that's the case, shall we reword "Attach custom python files for job."
> into "attach custom files that could be put in PYTHONPATH, e.g., .zip,
> .whl, etc."
>
> Best,
> Yik San
>
> On Tue, Apr 27, 2021 at 10:08 AM Dian Fu  wrote:
>
>> Hi Yik San,
>>
>> All the files which could be put in the PYTHONPATH are allowed here, e.g.
>> .zip, .whl, etc.
>>
>> Regards,
>> Dian
>>
>> 2021年4月27日 上午8:16,Yik San Chan  写道:
>>
>> Hi Dian,
>>
>> It is still not clear to me - does it only allow Python files (.py), or
>> not?
>>
>> Best,
>> Yik San
>>
>> On Mon, Apr 26, 2021 at 9:15 PM Dian Fu  wrote:
>>
>>> Hi Yik San,
>>>
>>> 1) what `--pyFiles` is used for:
>>> All the files specified via `--pyFiles` will be put in the PYTHONPATH of
>>> the Python worker during execution and then they will be available for the
>>> Python user-defined functions during execution.
>>>
>>> 2) validate for the files passed to `--pyFiles`
>>> Currently it will not validate the files passed to this argument. I also
>>> think that it’s not necessary and not able to perform such kind of check.
>>> Do you have any advice for this?
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年4月26日 下午8:45,Yik San Chan  写道:
>>>
>>> Hi community,
>>>
>>> In
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html,
>>> regarding python.files:
>>>
>>> > Attach custom python files for job.
>>>
>>> This makes readers think only Python files are allowed here. However, in
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#submitting-pyflink-jobs
>>> :
>>>
>>> ./bin/flink run \
>>>   --python examples/python/table/batch/word_count.py \
>>>   --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
>>>
>>> It is obviously including .txt file that is not Python files.
>>>
>>> I believe it is contradictory here. Can anyone confirm?
>>>
>>> Best,
>>> Yik San
>>>
>>>
>>>
>>
>


How to load resource in a PyFlink UDF

2021-04-27 Thread Yik San Chan
Hi,

My UDF has the dependency to a resource file named crypt.csv that is
located in resources/ directory.

```python
# udf_use_resource.py
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=
True).to_dict()
return d.get(s, "unknown")
```

I run the job in local mode (i.e., python udf_use_resource.py) without any
problem. However, when I try to run it with
`~/softwares/flink-1.12.0/bin/flink run -d -pyexec
/usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip
-py udf_use_resource.py` on my local cluster, it complains:

FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b
'resources/crypt.csv'

The resources.zip is zipped from the resources directory. I wonder: where
do I go wrong?

Note: udf_use_resource.py and resources/crypt.csv can be found in
https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71
.

Thanks!

Best,
Yik San


Re:Re: The wrong Options of Kafka Connector, will make the cluster can not run any job

2021-04-27 Thread chenxuying
I had tested flink job in flink_1.11.2 and flink_1.12.2. The error log I post 
before is in flink_1.11.2 cluster.

Now I run job in flink_1.11.2.




1. The wrong Options of Kafka Connector

Ip is right, port is wrong,

```

CREATE TABLE KafkaTable (

message STRING

) WITH (

'connector' = 'kafka',

'topic' = 'filebeat_json_install_log',

'properties.bootstrap.servers' = '192.168.0.77:9093',

'properties.group.id' = 'testGroup',

'scan.startup.mode' = 'latest-offset',

'format' = 'json'

);

```




2. Job details In flink web UI

Log in Root Exception Tabs, as below:

```

2021-04-27 15:59:11

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition filebeat_json_install_log-3 could be 
determined

```




3. Logs in Job Manager

Job Manager print logs continuously as below:

```

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition filebeat_json_install_log-3 could be 
determined

2021-04-27 08:03:16,162 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
cbc357ccb763df2852fee8c4fc7d55f2_0.

2021-04-27 08:03:16,163 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 1 tasks should be restarted to recover the failed task 
cbc357ccb763df2852fee8c4fc7d55f2_0. 

2021-04-27 08:03:16,163 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
v2_ods_device_action_log (876dbcddcec696d42ed887512dacdf22) switched from state 
RUNNING to RESTARTING.

2021-04-27 08:03:17,163 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
v2_ods_device_action_log (876dbcddcec696d42ed887512dacdf22) switched from state 
RESTARTING to RUNNING.

2021-04-27 08:03:17,164 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 
876dbcddcec696d42ed887512dacdf22 from Checkpoint 6 @ 1619510548493 for 
876dbcddcec696d42ed887512dacdf22 located at 
oss://tanwan-datahub/test/flinksql/checkpoints/876dbcddcec696d42ed887512dacdf22/chk-6.

2021-04-27 08:03:17,165 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No master 
state to restore

2021-04-27 08:03:17,165 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
time, pk_id, key_id, idfv, media_site_id]) (1/1) 
(278dd023107c2fd3f2b42383e0c01794) switched from CREATED to SCHEDULED.

2021-04-27 08:03:17,165 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: ...) 
(1/1) (278dd023107c2fd3f2b42383e0c01794) switched from SCHEDULED to DEPLOYING.

2021-04-27 08:03:17,166 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Deploying 
Source: TableSourceScan(tac2fd3f2b42383e0c01794 to 192.168.3.64:6122-55a668 
@ 192.168.3.64 (dataPort=34077) with allocation id 
091b8c459bd00a2deaea398a41c831ab

2021-04-27 08:03:17,176 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
TableSourceScan(table=[[d...3e0c01794) switched from DEPLOYING to RUNNING.

```




3. Cancel job

When I cancel the job ,Job Manager print logs as below:

```

2021-04-27 08:11:18,190 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
876dbcddcec696d42ed887512dacdf22 reached globally terminal state CANCELED.

2021-04-27 08:11:18,196 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Stopping the JobMaster for job 
v2_ods_device_action_log(876dbcddcec696d42ed887512dacdf22).

2021-04-27 08:11:18,197 INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending 
SlotPool.

2021-04-27 08:11:18,197 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Close ResourceManager connection 
65303b0e98faaa00ada09ad7271be558: Stopping JobMaster for job 
v2_ods_device_action_log(876dbcddcec696d42ed887512dacdf22)..

2021-04-27 08:11:18,197 INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping 
SlotPool.

2021-04-27 08:11:18,197 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Disconnect job manager 
0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_3
 for job 876dbcddcec696d42ed887512dacdf22 from the resource manager.

2021-04-27 08:11:18,216 WARN  
org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname 
could be resolved for the IP address 192.168.3.64, using IP address as host 
name. Local input split assignment (such as for HDFS files) may be impacted.

2021-04-27 08:11:18,271 INFO  
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] - 
[Server]Unable to execute HTTP request: Not Found

[ErrorCode]: NoSuchKey

[RequestId]: 6087C726766D47343487BE32

[HostId]: null

2021-04-27 08:11:18,275 INFO  
org.apach

Re: How to load resource in a PyFlink UDF

2021-04-27 Thread Dian Fu
Hi Yik San,

Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?

Regards,
Dian

> 2021年4月27日 下午4:39,Yik San Chan  写道:
> 
> Hi,
> 
> My UDF has the dependency to a resource file named crypt.csv that is located 
> in resources/ directory.
> 
> ```python
> # udf_use_resource.py
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def decrypt(s):
> import pandas as pd
> d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, 
> squeeze=True).to_dict()
> return d.get(s, "unknown")
> ```
> 
> I run the job in local mode (i.e., python udf_use_resource.py) without any 
> problem. However, when I try to run it with 
> `~/softwares/flink-1.12.0/bin/flink run -d -pyexec 
> /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip 
> -py udf_use_resource.py` on my local cluster, it complains:
> 
> FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: 
> b'resources/crypt.csv'
> 
> The resources.zip is zipped from the resources directory. I wonder: where do 
> I go wrong?
> 
> Note: udf_use_resource.py and resources/crypt.csv can be found in 
> https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71
>  
> .
> 
> Thanks!
> 
> Best,
> Yik San



Re: How to load resource in a PyFlink UDF

2021-04-27 Thread Yik San Chan
Hi Dian,

Thank you! That solves my question. By the way, for my use case, does
-pyarch make more sense than -pyfs?

Best,
Yik San

On Tue, Apr 27, 2021 at 4:52 PM Dian Fu  wrote:

> Hi Yik San,
>
> Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?
>
> Regards,
> Dian
>
> 2021年4月27日 下午4:39,Yik San Chan  写道:
>
> Hi,
>
> My UDF has the dependency to a resource file named crypt.csv that is
> located in resources/ directory.
>
> ```python
> # udf_use_resource.py
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def decrypt(s):
> import pandas as pd
> d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=
> True).to_dict()
> return d.get(s, "unknown")
> ```
>
> I run the job in local mode (i.e., python udf_use_resource.py) without any
> problem. However, when I try to run it with
> `~/softwares/flink-1.12.0/bin/flink run -d -pyexec
> /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip
> -py udf_use_resource.py` on my local cluster, it complains:
>
> FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b
> 'resources/crypt.csv'
>
> The resources.zip is zipped from the resources directory. I wonder: where
> do I go wrong?
>
> Note: udf_use_resource.py and resources/crypt.csv can be found in
> https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71
> .
>
> Thanks!
>
> Best,
> Yik San
>
>
>


Re: Re:Re: The wrong Options of Kafka Connector, will make the cluster can not run any job

2021-04-27 Thread cxydevelop
oh, I am wrong again, the last  it is in flink_1.12.2 not flink_1.11.2



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Confusing docs on python.archives

2021-04-27 Thread Yik San Chan
Hi Dian,

As a follow-up, I fix the docs here
https://github.com/apache/flink/pull/15783

Best,
Yik San

On Tue, Apr 27, 2021 at 10:20 AM Dian Fu  wrote:

> For the command line arguments, it’s documented in
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html
>
> 2021年4月27日 上午10:19,Dian Fu  写道:
>
> There are multiple ways to specify the target directory depending on how
> to specify the python archives.
> 1) API: add_python_archive(“file:///path/to/py_env.zip", "myenv"), see
> [1] for more details,
> 2) configuration: python.archives, e.g. file:///path/to/py_env.zip#myenv
> 3) command line arguments: -pyarch file:///path/to/py_env.zip#myenv
>
> You can specify python archives via either of the above options and it
> will extract py_env.zip into directory myenv during execution.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html
>
> 2021年4月27日 上午8:17,Yik San Chan  写道:
>
> Hi Dian,
>
> I wonder where can we specify the target directory?
>
> Best,
> Yik San
>
> On Mon, Apr 26, 2021 at 9:19 PM Dian Fu  wrote:
>
>> Hi Yik San,
>>
>> It should be a typo issue. I guess it should be `If the target directory
>> name is specified, the archive file will be extracted to a directory with
>> the specified name.`
>>
>> Regards,
>> Dian
>>
>> 2021年4月26日 下午8:57,Yik San Chan  写道:
>>
>> Hi community,
>>
>> In
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html#python-options
>> ,
>>
>> > For each archive file, a target directory is specified. If the target
>> directory name is specified, the archive file will be extracted to a name
>> can directory with the specified name. Otherwise, the archive file will be
>> extracted to a directory with the same name of the archive file.
>>
>> I don't get what does "the archive file will be extracted to a name can
>> directory with the specified name" mean. Maybe there are typos?
>>
>> best,
>> Yik San
>>
>>
>>
>
>


Re: How to load resource in a PyFlink UDF

2021-04-27 Thread Dian Fu
Hi Yik San,

Command line option `-pyarch` could be used to specify archive files such as 
Python virtual environment, ML model, data file, etc.

So for resources.zip, -pyarch makes more sense than -pyfs.

Regards,
Dian

> 2021年4月27日 下午5:14,Yik San Chan  写道:
> 
> Hi Dian,
> 
> Thank you! That solves my question. By the way, for my use case, does -pyarch 
> make more sense than -pyfs?
> 
> Best,
> Yik San
> 
> On Tue, Apr 27, 2021 at 4:52 PM Dian Fu  > wrote:
> Hi Yik San,
> 
> Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?
> 
> Regards,
> Dian
> 
>> 2021年4月27日 下午4:39,Yik San Chan > > 写道:
>> 
>> Hi,
>> 
>> My UDF has the dependency to a resource file named crypt.csv that is located 
>> in resources/ directory.
>> 
>> ```python
>> # udf_use_resource.py
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def decrypt(s):
>> import pandas as pd
>> d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, 
>> squeeze=True).to_dict()
>> return d.get(s, "unknown")
>> ```
>> 
>> I run the job in local mode (i.e., python udf_use_resource.py) without any 
>> problem. However, when I try to run it with 
>> `~/softwares/flink-1.12.0/bin/flink run -d -pyexec 
>> /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip 
>> -py udf_use_resource.py` on my local cluster, it complains:
>> 
>> FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: 
>> b'resources/crypt.csv'
>> 
>> The resources.zip is zipped from the resources directory. I wonder: where do 
>> I go wrong?
>> 
>> Note: udf_use_resource.py and resources/crypt.csv can be found in 
>> https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71
>>  
>> .
>> 
>> Thanks!
>> 
>> Best,
>> Yik San
> 



Re: Confusing docs on python.archives

2021-04-27 Thread Dian Fu
Thank you a lot~

> 2021年4月27日 下午5:38,Yik San Chan  写道:
> 
> Hi Dian,
> 
> As a follow-up, I fix the docs here 
> https://github.com/apache/flink/pull/15783 
> 
> 
> Best,
> Yik San
> 
> On Tue, Apr 27, 2021 at 10:20 AM Dian Fu  > wrote:
> For the command line arguments, it’s documented in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html 
> 
> 
>> 2021年4月27日 上午10:19,Dian Fu > > 写道:
>> 
>> There are multiple ways to specify the target directory depending on how to 
>> specify the python archives.
>> 1) API: add_python_archive(“file:///path/to/py_env <>.zip", "myenv"), see 
>> [1] for more details, 
>> 2) configuration: python.archives, e.g. file:///path/to/py_env.zip#myenv <>
>> 3) command line arguments: -pyarch file:///path/to/py <>_env.zip#myenv
>> 
>> You can specify python archives via either of the above options and it will 
>> extract py_env.zip into directory myenv during execution.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html
>>  
>> 
>> 
>>> 2021年4月27日 上午8:17,Yik San Chan >> > 写道:
>>> 
>>> Hi Dian,
>>> 
>>> I wonder where can we specify the target directory?
>>> 
>>> Best,
>>> Yik San
>>> 
>>> On Mon, Apr 26, 2021 at 9:19 PM Dian Fu >> > wrote:
>>> Hi Yik San,
>>> 
>>> It should be a typo issue. I guess it should be `If the target directory 
>>> name is specified, the archive file will be extracted to a directory with 
>>> the specified name.`
>>> 
>>> Regards,
>>> Dian
>>> 
 2021年4月26日 下午8:57,Yik San Chan >>> > 写道:
 
 Hi community,
 
 In 
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/python_config.html#python-options
  
 ,
 
 > For each archive file, a target directory is specified. If the target 
 > directory name is specified, the archive file will be extracted to a 
 > name can directory with the specified name. Otherwise, the archive file 
 > will be extracted to a directory with the same name of the archive file.
 
 I don't get what does "the archive file will be extracted to a name can 
 directory with the specified name" mean. Maybe there are typos?
 
 best,
 Yik San
>>> 
>> 
> 



Re: How to load resource in a PyFlink UDF

2021-04-27 Thread Yik San Chan
Hi Dian,

Thank you for the detailed answer!

Best,
Yik San

On Tue, Apr 27, 2021 at 5:42 PM Dian Fu  wrote:

> Hi Yik San,
>
> Command line option `-pyarch` could be used to specify archive files such
> as Python virtual environment, ML model, data file, etc.
>
> So for resources.zip, -pyarch makes more sense than -pyfs.
>
> Regards,
> Dian
>
> 2021年4月27日 下午5:14,Yik San Chan  写道:
>
> Hi Dian,
>
> Thank you! That solves my question. By the way, for my use case, does
> -pyarch make more sense than -pyfs?
>
> Best,
> Yik San
>
> On Tue, Apr 27, 2021 at 4:52 PM Dian Fu  wrote:
>
>> Hi Yik San,
>>
>> Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?
>>
>> Regards,
>> Dian
>>
>> 2021年4月27日 下午4:39,Yik San Chan  写道:
>>
>> Hi,
>>
>> My UDF has the dependency to a resource file named crypt.csv that is
>> located in resources/ directory.
>>
>> ```python
>> # udf_use_resource.py
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def decrypt(s):
>> import pandas as pd
>> d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=
>> True).to_dict()
>> return d.get(s, "unknown")
>> ```
>>
>> I run the job in local mode (i.e., python udf_use_resource.py) without
>> any problem. However, when I try to run it with
>> `~/softwares/flink-1.12.0/bin/flink run -d -pyexec
>> /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip
>> -py udf_use_resource.py` on my local cluster, it complains:
>>
>> FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist:
>> b'resources/crypt.csv'
>>
>> The resources.zip is zipped from the resources directory. I wonder: where
>> do I go wrong?
>>
>> Note: udf_use_resource.py and resources/crypt.csv can be found in
>> https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71
>> .
>>
>> Thanks!
>>
>> Best,
>> Yik San
>>
>>
>>
>


ModuleNotFound when loading udf from another py file

2021-04-27 Thread Yik San Chan
Hi,

Here's the reproducible code sample:
https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3

I implement my Python UDF by extending the ScalarFunction base class in a
separate file named decrypt_fun.py, and try to import the udf into my main
python file named udf_use_resource.py.

However, after I `flink run`, I find the error log in TaskManager log:

```
Caused by: java.lang.RuntimeException: Error received from SDK harness for
instruction 1: Traceback (most recent call last):
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 376, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 253, in _execute
response = task()
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 310, in 
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 509, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 382, in get
self.data_channel_factory)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 847, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 902, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 901, in 
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 791, in wrapper
result = cache[args] = func(*args)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 885, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 885, in 
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 883, in 
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 791, in wrapper
result = cache[args] = func(*args)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 888, in get_operation
transform_id, transform_consumers)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1174, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
line 39, in create_scalar_function
operations.ScalarFunctionOperation)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
line 166, in _create_user_defined_function_operation
internal_operation_cls)
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
line 91, in __init__
super(ScalarFunctionOperation, self).__init__(spec)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
lin

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Dian Fu
Hi Yik San,

From the exception message, it’s clear that it could not find module 
`decrypt_fun` during execution.

You need to specify file `decrypt_fun.py` as a dependency during submitting the 
job, e.g. via -pyfs command line arguments. Otherwise, this file will not be 
available during execution.

Regards,
Dian

> 2021年4月27日 下午8:01,Yik San Chan  写道:
> 
> Hi,
> 
> Here's the reproducible code sample: 
> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
>  
> 
> 
> I implement my Python UDF by extending the ScalarFunction base class in a 
> separate file named decrypt_fun.py, and try to import the udf into my main 
> python file named udf_use_resource.py.
> 
> However, after I `flink run`, I find the error log in TaskManager log:
> 
> ```
> Caused by: java.lang.RuntimeException: Error received from SDK harness for 
> instruction 1: Traceback (most recent call last):
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 376, in get
> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
> IndexError: pop from empty list
> 
> During handling of the above exception, another exception occurred:
> 
> Traceback (most recent call last):
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 253, in _execute
> response = task()
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 310, in 
> lambda: self.create_worker().do_instruction(request), request)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 480, in do_instruction
> getattr(request, request_type), request.instruction_id)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 509, in process_bundle
> instruction_id, request.process_bundle_descriptor_id)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 382, in get
> self.data_channel_factory)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 847, in __init__
> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 902, in create_execution_tree
> descriptor.transforms, key=topological_height, reverse=True)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 901, in 
> (transform_id, get_operation(transform_id)) for transform_id in sorted(
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 791, in wrapper
> result = cache[args] = func(*args)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 885, in get_operation
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 885, in 
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 883, in 
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 791, in wrapper
> result = cache[args] = func(*args)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 888, in get_operation
> transform_id, transform_consumers)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1174, in create_operation
> return creator(self, transform_id, transform_proto, payload, consumers)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
>  line 39, in create_scalar_function
> operations.ScalarFunctionOperation)
>   File 
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
>  line 166, in _create_user_defined_function_operation
>   

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Yik San Chan
Hi Dian,

Thanks! Adding -pyfs definitely helps.

However, I am curious. If I define my udf this way:

```python
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=
0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I can `flink run` without having to specify -pyfs option. The code can also
be found in the commit
https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607.
I wonder why?

Best,
Yik San

On Tue, Apr 27, 2021 at 8:13 PM Dian Fu  wrote:

> Hi Yik San,
>
> From the exception message, it’s clear that it could not find module
> `decrypt_fun` during execution.
>
> You need to specify file `decrypt_fun.py` as a dependency during
> submitting the job, e.g. via -pyfs command line arguments. Otherwise, this
> file will not be available during execution.
>
> Regards,
> Dian
>
> 2021年4月27日 下午8:01,Yik San Chan  写道:
>
> Hi,
>
> Here's the reproducible code sample:
> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
>
> I implement my Python UDF by extending the ScalarFunction base class in a
> separate file named decrypt_fun.py, and try to import the udf into my main
> python file named udf_use_resource.py.
>
> However, after I `flink run`, I find the error log in TaskManager log:
>
> ```
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 1: Traceback (most recent call last):
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 376, in get
> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
> IndexError: pop from empty list
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
> response = task()
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, in 
> lambda: self.create_worker().do_instruction(request), request)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 509, in process_bundle
> instruction_id, request.process_bundle_descriptor_id)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 382, in get
> self.data_channel_factory)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 847, in __init__
> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 902, in create_execution_tree
> descriptor.transforms, key=topological_height, reverse=True)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 901, in 
> (transform_id, get_operation(transform_id)) for transform_id in sorted(
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 791, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 885, in get_operation
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 885, in 
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 883, in 
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 791, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 888, in get_operation
> transform_id, transform_consumers)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1174, in create_operation
> return creator(self, transform_id, transform_proto, payload, cons

Re: Log rollover for logs.

2021-04-27 Thread Nicolaus Weidner
Hi John,

On Tue, Apr 27, 2021 at 9:47 AM John Smith  wrote:

> Hi, I'm running flink as a systemd service with...
>
> [Service]
> Type=forking
> WorkingDirectory=/opt/flink
> User=flink
> Group=flink
> ExecStart=/opt/flink/bin/taskmanager.sh start
> ExecStop=/opt/flink/bin/taskmanager.sh stop
> TimeoutSec=30
> Restart=on-failure
>
> My log4j.porperties file is at /opt/flink/conf
>
> Is it as simple as just setting the log rollover in there? Cause sometimes
> with certain services like zookeeper some env variable overrides etc... So
> just wondering if it's that straight forward with flink.
>

yes, you can use the log4j RollingFileAppender [1] for this purpose. You
can also find more information regarding logging in Flink in the docs [2].

Best regards,
Nico

[1]
https://logging.apache.org/log4j/2.x/manual/appenders.html#RollingFileAppender
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/advanced/logging.html


Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Dian Fu
I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to 
serialize the Python UDF. 

For the latter case, I guess the whole Python UDF implementation will be 
serialized. However, for the previous case, only the path of the class is 
serialized.

Regards,
Dian

> 2021年4月27日 下午8:52,Yik San Chan  写道:
> 
> Hi Dian,
> 
> Thanks! Adding -pyfs definitely helps.
> 
> However, I am curious. If I define my udf this way:
> 
> ```python
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def decrypt(s):
> import pandas as pd
> d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, 
> index_col=0, squeeze=True).to_dict()
> return d.get(s, "unknown")
> ```
> 
> I can `flink run` without having to specify -pyfs option. The code can also 
> be found in the commit 
> https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607
>  
> .
>  I wonder why?
> 
> Best,
> Yik San
> 
> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu  > wrote:
> Hi Yik San,
> 
> From the exception message, it’s clear that it could not find module 
> `decrypt_fun` during execution.
> 
> You need to specify file `decrypt_fun.py` as a dependency during submitting 
> the job, e.g. via -pyfs command line arguments. Otherwise, this file will not 
> be available during execution.
> 
> Regards,
> Dian
> 
>> 2021年4月27日 下午8:01,Yik San Chan > > 写道:
>> 
>> Hi,
>> 
>> Here's the reproducible code sample: 
>> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
>>  
>> 
>> 
>> I implement my Python UDF by extending the ScalarFunction base class in a 
>> separate file named decrypt_fun.py, and try to import the udf into my main 
>> python file named udf_use_resource.py.
>> 
>> However, after I `flink run`, I find the error log in TaskManager log:
>> 
>> ```
>> Caused by: java.lang.RuntimeException: Error received from SDK harness for 
>> instruction 1: Traceback (most recent call last):
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>  line 376, in get
>> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>> IndexError: pop from empty list
>> 
>> During handling of the above exception, another exception occurred:
>> 
>> Traceback (most recent call last):
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>  line 253, in _execute
>> response = task()
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>  line 310, in 
>> lambda: self.create_worker().do_instruction(request), request)
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>  line 480, in do_instruction
>> getattr(request, request_type), request.instruction_id)
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>  line 509, in process_bundle
>> instruction_id, request.process_bundle_descriptor_id)
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>  line 382, in get
>> self.data_channel_factory)
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>  line 847, in __init__
>> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>  line 902, in create_execution_tree
>> descriptor.transforms, key=topological_height, reverse=True)
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>  line 901, in 
>> (transform_id, get_operation(transform_id)) for transform_id in sorted(
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>  line 791, in wrapper
>> result = cache[args] = func(*args)
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>  line 885, in get_operation
>> pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>   File 
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>  line 885, in 
>> pcoll_id in descriptor.transforms[tr

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Yik San Chan
Hi Dian,

Wow, this is unexpected 😮 How about adding documentations to Python UDF
about this? Again it can be time consuming to figure this out. Maybe:

To be able to run Python UDFs in any non-local mode, it is recommended to
include your UDF definitions using -pyfs config option, if your UDFs live
outside of the file where the main() function is defined.

What do you think?

Best,
Yik San

On Tue, Apr 27, 2021 at 9:24 PM Dian Fu  wrote:

> I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle
> to serialize the Python UDF.
>
> For the latter case, I guess the whole Python UDF implementation will be
> serialized. However, for the previous case, only the path of the class is
> serialized.
>
> Regards,
> Dian
>
> 2021年4月27日 下午8:52,Yik San Chan  写道:
>
> Hi Dian,
>
> Thanks! Adding -pyfs definitely helps.
>
> However, I am curious. If I define my udf this way:
>
> ```python
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def decrypt(s):
> import pandas as pd
> d = pd.read_csv('resources.zip/resources/crypt.csv', header=None,
> index_col=0, squeeze=True).to_dict()
> return d.get(s, "unknown")
> ```
>
> I can `flink run` without having to specify -pyfs option. The code can
> also be found in the commit
> https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607.
> I wonder why?
>
> Best,
> Yik San
>
> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu  wrote:
>
>> Hi Yik San,
>>
>> From the exception message, it’s clear that it could not find module
>> `decrypt_fun` during execution.
>>
>> You need to specify file `decrypt_fun.py` as a dependency during
>> submitting the job, e.g. via -pyfs command line arguments. Otherwise, this
>> file will not be available during execution.
>>
>> Regards,
>> Dian
>>
>> 2021年4月27日 下午8:01,Yik San Chan  写道:
>>
>> Hi,
>>
>> Here's the reproducible code sample:
>> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
>>
>> I implement my Python UDF by extending the ScalarFunction base class in a
>> separate file named decrypt_fun.py, and try to import the udf into my main
>> python file named udf_use_resource.py.
>>
>> However, after I `flink run`, I find the error log in TaskManager log:
>>
>> ```
>> Caused by: java.lang.RuntimeException: Error received from SDK harness
>> for instruction 1: Traceback (most recent call last):
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 376, in get
>> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>> IndexError: pop from empty list
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 253, in _execute
>> response = task()
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 310, in 
>> lambda: self.create_worker().do_instruction(request), request)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 480, in do_instruction
>> getattr(request, request_type), request.instruction_id)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 509, in process_bundle
>> instruction_id, request.process_bundle_descriptor_id)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 382, in get
>> self.data_channel_factory)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 847, in __init__
>> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 902, in create_execution_tree
>> descriptor.transforms, key=topological_height, reverse=True)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 901, in 
>> (transform_id, get_operation(transform_id)) for transform_id in sorted(
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 791, in wrapper
>> result = cache[args] = func(*args)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 885, in get_operation
>> pcoll_id in descriptor.transforms[transform_id].outputs.items()
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Dian Fu
Hi Yik San,

Make sense to me. :)

Regards,
Dian

> 2021年4月27日 下午9:50,Yik San Chan  写道:
> 
> Hi Dian,
> 
> Wow, this is unexpected 😮 How about adding documentations to Python UDF about 
> this? Again it can be time consuming to figure this out. Maybe:
> 
> To be able to run Python UDFs in any non-local mode, it is recommended to 
> include your UDF definitions using -pyfs config option, if your UDFs live 
> outside of the file where the main() function is defined.
> 
> What do you think?
> 
> Best,
> Yik San
> 
> On Tue, Apr 27, 2021 at 9:24 PM Dian Fu  > wrote:
> I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to 
> serialize the Python UDF. 
> 
> For the latter case, I guess the whole Python UDF implementation will be 
> serialized. However, for the previous case, only the path of the class is 
> serialized.
> 
> Regards,
> Dian
> 
>> 2021年4月27日 下午8:52,Yik San Chan > > 写道:
>> 
>> Hi Dian,
>> 
>> Thanks! Adding -pyfs definitely helps.
>> 
>> However, I am curious. If I define my udf this way:
>> 
>> ```python
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def decrypt(s):
>> import pandas as pd
>> d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, 
>> index_col=0, squeeze=True).to_dict()
>> return d.get(s, "unknown")
>> ```
>> 
>> I can `flink run` without having to specify -pyfs option. The code can also 
>> be found in the commit 
>> https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607
>>  
>> .
>>  I wonder why?
>> 
>> Best,
>> Yik San
>> 
>> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu > > wrote:
>> Hi Yik San,
>> 
>> From the exception message, it’s clear that it could not find module 
>> `decrypt_fun` during execution.
>> 
>> You need to specify file `decrypt_fun.py` as a dependency during submitting 
>> the job, e.g. via -pyfs command line arguments. Otherwise, this file will 
>> not be available during execution.
>> 
>> Regards,
>> Dian
>> 
>>> 2021年4月27日 下午8:01,Yik San Chan >> > 写道:
>>> 
>>> Hi,
>>> 
>>> Here's the reproducible code sample: 
>>> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
>>>  
>>> 
>>> 
>>> I implement my Python UDF by extending the ScalarFunction base class in a 
>>> separate file named decrypt_fun.py, and try to import the udf into my main 
>>> python file named udf_use_resource.py.
>>> 
>>> However, after I `flink run`, I find the error log in TaskManager log:
>>> 
>>> ```
>>> Caused by: java.lang.RuntimeException: Error received from SDK harness for 
>>> instruction 1: Traceback (most recent call last):
>>>   File 
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>  line 376, in get
>>> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>>> IndexError: pop from empty list
>>> 
>>> During handling of the above exception, another exception occurred:
>>> 
>>> Traceback (most recent call last):
>>>   File 
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>  line 253, in _execute
>>> response = task()
>>>   File 
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>  line 310, in 
>>> lambda: self.create_worker().do_instruction(request), request)
>>>   File 
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>  line 480, in do_instruction
>>> getattr(request, request_type), request.instruction_id)
>>>   File 
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>  line 509, in process_bundle
>>> instruction_id, request.process_bundle_descriptor_id)
>>>   File 
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>  line 382, in get
>>> self.data_channel_factory)
>>>   File 
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>  line 847, in __init__
>>> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>>   File 
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>  line 902, in create_execution_tree
>>> descriptor.transforms, key=topological_height, reverse=True)
>>>   File 
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/wor

Custom metrics in Stateful Functions

2021-04-27 Thread Cliff Resnick
We think Embedded Statefun is a nicer fit than Datastream for some problem
domains, but one thing we miss is support for custom metrics/counters. Is
there a way to access the Flink support? It looks like if we want custom
metrics we'll need to roll our own.


[Stateful Functions] Help for calling remote stateful function (written in Python)

2021-04-27 Thread Bonino Dario

Dear List,

I am trying to call a sample stateful function defined in Python, using 
the Stateful Function Python SDK, from a Flink pipeline. I am building 
upon the examples provided for the  SDK for Flink DataStream Integration 
but I am currently stuck on a type cast issue that I am not able to 
overcome, even by looking at the flink-statefun sources. I am sure that 
I am probably doing something wrong.


In the flink pipeline (of which an excerpt is reported below), I load a 
set of users from a CSV file and create a Datastream where User is 
a protobuf v3 generated class. Given this stream, the base idea is to 
forward the stream to a remote function (written in python using the 
sdk) that basically unpacks the user object, extracts the user id and 
provides it back as a String.



val REMOTE_GREET = FunctionType("com.me.try", "echo_user_id")
val GREETINGS = EgressIdentifier("com.me.try", "out", 
String::class.java)


@JvmStatic
fun main(args: Array) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    val usersCsv = env.readTextFile("input/users.csv")
    val users = createUsersStream(usersCsv).shuffle()

    val statefunConfig = StatefulFunctionsConfig.fromEnvironment(env)
    statefunConfig.factoryType = MessageFactoryType.WITH_PROTOBUF_PAYLOADS

    val usersIngress: DataStream = users.map { user ->
    RoutableMessageBuilder.builder()
    .withTargetAddress(REMOTE_GREET, user.userId.toString())
    .withMessageBody(user)
    .build()
    }


    val predictEgress = StatefulFunctionDataStreamBuilder.builder("test")
    .withDataStreamAsIngress(usersIngress)
    .withRequestReplyRemoteFunction(
    RequestReplyFunctionBuilder
    .requestReplyFunctionBuilder(REMOTE_GREET, 
URI.create("http://127.0.0.1:8000/statefun";))

    .withMaxRequestDuration(Duration.ofSeconds(15))
    .withMaxNumBatchRequests(500)
    )
    .withEgressId(GREETINGS)
    .withConfiguration(statefunConfig)
    .build(env)

    val output = predictEgress.getDataStreamForEgressId(GREETINGS)

    output.print()
    env.execute("Hello stateful!!")
}

Unfortunately, while the Python function seems to be working (tests 
build by following the Ververica workshop repository about Stateful 
functions are up and consistently running) and it is listening at the 
provided address (http://127.0.0.1:8000/statefun), the Kotlin pipeline 
(above) fails with a type cast error, which occurs before actually 
calling the remote function, at line 90 of the 
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction. The 
reported exception is:



Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed. at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) 
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) 
at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) 
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238) 
at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) 
at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) 
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) 
at akka.dispatch.OnComplete.internal(Future.scala:264) at 
akka.dispatch.OnComplete.internal(Future.scala:261) at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) 
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) 
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) 
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) 
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) 
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) 
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) 
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at 
scala.concurrent.impl.Promise.liftedTree1$

Re: Custom metrics in Stateful Functions

2021-04-27 Thread Igal Shilman
Hello Cliff,

You are right, indeed defining custom metrics is not supported at the
moment.
I will file a JIRA issue so we can track this, and we will try to
prioritize this feature up.
Meanwhile, there are a lot of metrics that StateFun defines, like
invocations rates etc' perhaps you can find it useful already [1]

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/metrics/

Kind regards,
Igal.


On Tue, Apr 27, 2021 at 5:19 PM Cliff Resnick  wrote:

> We think Embedded Statefun is a nicer fit than Datastream for some problem
> domains, but one thing we miss is support for custom metrics/counters. Is
> there a way to access the Flink support? It looks like if we want custom
> metrics we'll need to roll our own.
>


Re: [Stateful Functions] Help for calling remote stateful function (written in Python)

2021-04-27 Thread Igal Shilman
Hello!

Your analysis is correct, indeed what is passed is whatever is being handed
to withMessageBody(..).
Starting with StateFun 3.0, if you need to send a message to a remote
function the message needs to be a TypedValue.

You can create an instance of TypedValue manually, or you can add a
dependency on the Java SDK and use the MessageBuilder to
extract TypedValues.

I think that you are right, the DataStream <-> Remote functions could be a
little bit improved, I will file a JIRA issue
for that!

A side question is there a particular reason that you chose to use the
DataStream SDK v.s the other deployment options?

Thanks,
Igal.





On Tue, Apr 27, 2021 at 5:31 PM Bonino Dario  wrote:

> Dear List,
>
> I am trying to call a sample stateful function defined in Python, using
> the Stateful Function Python SDK, from a Flink pipeline. I am building upon
> the examples provided for the  SDK for Flink DataStream Integration but I
> am currently stuck on a type cast issue that I am not able to overcome,
> even by looking at the flink-statefun sources. I am sure that I am probably
> doing something wrong.
>
> In the flink pipeline (of which an excerpt is reported below), I load a
> set of users from a CSV file and create a Datastream where User is a
> protobuf v3 generated class. Given this stream, the base idea is to forward
> the stream to a remote function (written in python using the sdk) that
> basically unpacks the user object, extracts the user id and provides it
> back as a String.
>
>
> val REMOTE_GREET = FunctionType("com.me.try", "echo_user_id")
> val GREETINGS = EgressIdentifier("com.me.try", "out",
> String::class.java)
>
> @JvmStatic
> fun main(args: Array) {
> val env = StreamExecutionEnvironment.getExecutionEnvironment()
> val usersCsv = env.readTextFile("input/users.csv")
> val users = createUsersStream(usersCsv).shuffle()
>
> val statefunConfig = StatefulFunctionsConfig.fromEnvironment(env)
> statefunConfig.factoryType = MessageFactoryType.WITH_PROTOBUF_PAYLOADS
>
> val usersIngress: DataStream = users.map { user ->
> RoutableMessageBuilder.builder()
> .withTargetAddress(REMOTE_GREET, user.userId.toString())
> .withMessageBody(user)
> .build()
> }
>
>
> val predictEgress = StatefulFunctionDataStreamBuilder.builder("test")
> .withDataStreamAsIngress(usersIngress)
> .withRequestReplyRemoteFunction(
> RequestReplyFunctionBuilder
> .requestReplyFunctionBuilder(REMOTE_GREET, URI.create(
> "http://127.0.0.1:8000/statefun"; ))
> .withMaxRequestDuration(Duration.ofSeconds(15))
> .withMaxNumBatchRequests(500)
> )
> .withEgressId(GREETINGS)
> .withConfiguration(statefunConfig)
> .build(env)
>
> val output = predictEgress.getDataStreamForEgressId(GREETINGS)
>
> output.print()
> env.execute("Hello stateful!!")
> }
>
> Unfortunately, while the Python function seems to be working (tests build
> by following the Ververica workshop repository about Stateful functions are
> up and consistently running) and it is listening at the provided address (
> http://127.0.0.1:8000/statefun), the Kotlin pipeline (above) fails with a
> type cast error, which occurs before actually calling the remote function,
> at line 90 of the 
> org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.
> The reported exception is:
>
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed. at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> at akka.dispatch.OnComplete.internal(Future.scala:264) at
> akka.dispatch.OnComplete.internal(Future.scala:261) at
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at
> akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at
> scala.concurrent.impl.Call

Taskmanager killed often after migrating to flink 1.12

2021-04-27 Thread Sambaran
Hi there,

We have recently migrated to flink 1.12 from 1.7, although the jobs are
running fine, sometimes the task manager is getting killed (much frequently
2-3 times a day).

Logs:
INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.

While checking more logs we see flink not able to discard old checkpoints
org.apache.flink.runtime.checkpoint.CheckpointsCleaner   [] - Could not
discard completed checkpoint 173.

We are not sure of what is the reason here, has anyone faced this before?

Regards
Sambaran


Exception handling

2021-04-27 Thread Jacob Sevart
How do we get uncaught exceptions in operators to skip the problematic
messages, rather than crash the entire job? Is there an easier or less
mistake-prone way to do this than wrapping every operator method in
try/catch?

And what to do about Map? Since it has to return something, we're either
returning null and then catching it with a *.filter(Objects.nonNull)* in
the next operator, or converting it to FlatMap. FlatMap conversion is
annoying, because then we need to mock the Collector for testing.

Obviously it would be best to sanitize inputs so that exceptions don't
occur, but we've recently encountered some setbacks in the game of
whack-a-mole with pathological messages, and are hoping to mitigate the
losses when these do occur.

Jacob


Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Yik San Chan
Hi Dian,

I follow up with this PR https://github.com/apache/flink/pull/15790

On Tue, Apr 27, 2021 at 11:03 PM Dian Fu  wrote:

> Hi Yik San,
>
> Make sense to me. :)
>
> Regards,
> Dian
>
> 2021年4月27日 下午9:50,Yik San Chan  写道:
>
> Hi Dian,
>
> Wow, this is unexpected 😮 How about adding documentations to Python UDF
> about this? Again it can be time consuming to figure this out. Maybe:
>
> To be able to run Python UDFs in any non-local mode, it is recommended to
> include your UDF definitions using -pyfs config option, if your UDFs live
> outside of the file where the main() function is defined.
>
> What do you think?
>
> Best,
> Yik San
>
> On Tue, Apr 27, 2021 at 9:24 PM Dian Fu  wrote:
>
>> I guess this is the magic of cloud pickle. PyFlink depends on cloud
>> pickle to serialize the Python UDF.
>>
>> For the latter case, I guess the whole Python UDF implementation will be
>> serialized. However, for the previous case, only the path of the class is
>> serialized.
>>
>> Regards,
>> Dian
>>
>> 2021年4月27日 下午8:52,Yik San Chan  写道:
>>
>> Hi Dian,
>>
>> Thanks! Adding -pyfs definitely helps.
>>
>> However, I am curious. If I define my udf this way:
>>
>> ```python
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def decrypt(s):
>> import pandas as pd
>> d = pd.read_csv('resources.zip/resources/crypt.csv', header=None,
>> index_col=0, squeeze=True).to_dict()
>> return d.get(s, "unknown")
>> ```
>>
>> I can `flink run` without having to specify -pyfs option. The code can
>> also be found in the commit
>> https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607.
>> I wonder why?
>>
>> Best,
>> Yik San
>>
>> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu  wrote:
>>
>>> Hi Yik San,
>>>
>>> From the exception message, it’s clear that it could not find module
>>> `decrypt_fun` during execution.
>>>
>>> You need to specify file `decrypt_fun.py` as a dependency during
>>> submitting the job, e.g. via -pyfs command line arguments. Otherwise, this
>>> file will not be available during execution.
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年4月27日 下午8:01,Yik San Chan  写道:
>>>
>>> Hi,
>>>
>>> Here's the reproducible code sample:
>>> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
>>>
>>> I implement my Python UDF by extending the ScalarFunction base class in
>>> a separate file named decrypt_fun.py, and try to import the udf into my
>>> main python file named udf_use_resource.py.
>>>
>>> However, after I `flink run`, I find the error log in TaskManager log:
>>>
>>> ```
>>> Caused by: java.lang.RuntimeException: Error received from SDK harness
>>> for instruction 1: Traceback (most recent call last):
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 376, in get
>>> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>>> IndexError: pop from empty list
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 253, in _execute
>>> response = task()
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 310, in 
>>> lambda: self.create_worker().do_instruction(request), request)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 480, in do_instruction
>>> getattr(request, request_type), request.instruction_id)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 509, in process_bundle
>>> instruction_id, request.process_bundle_descriptor_id)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 382, in get
>>> self.data_channel_factory)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 847, in __init__
>>> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 902, in create_execution_tree
>>> descriptor.transforms, key=topological_height, reverse=True)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 901, in 
>>> (transform_id, get_operation(transform_id)) for transform_id in sorted(
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 791, in wrapper
>>> result = cache[args] = func(*

Using Hive UDFs

2021-04-27 Thread 김영우
Hi,

I've configured Hive metastore to use HiveCatalog in streaming application.
So far, most of the features are working fine in hive integration.

However, I have a problem in using Hive UDFs. Already done prerequisites to
use Hive geospatial UDFs[1]

To sanity check, I did run a query like below:

tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


Got an exception like this:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: SQL validation failed. From line 1, column 18 to line 1,
column 63: No match found for function signature ST_Point(,
)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: SQL validation
failed. From line 1, column 18 to line 1, column 63: No match found for
function signature ST_Point(, )

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)

at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)

at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)

at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
column 18 to line 1, column 63: No match found for function signature
ST_Point(, )

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)

at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)

at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)

at
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at
org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)

at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)

at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)

at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)

at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)

at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)

at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)

at
org.apache.

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Dian Fu
Thanks a lot~

> 2021年4月28日 上午8:25,Yik San Chan  写道:
> 
> Hi Dian,
> 
> I follow up with this PR https://github.com/apache/flink/pull/15790 
> 
> On Tue, Apr 27, 2021 at 11:03 PM Dian Fu  > wrote:
> Hi Yik San,
> 
> Make sense to me. :)
> 
> Regards,
> Dian
> 
>> 2021年4月27日 下午9:50,Yik San Chan > > 写道:
>> 
>> Hi Dian,
>> 
>> Wow, this is unexpected 😮 How about adding documentations to Python UDF 
>> about this? Again it can be time consuming to figure this out. Maybe:
>> 
>> To be able to run Python UDFs in any non-local mode, it is recommended to 
>> include your UDF definitions using -pyfs config option, if your UDFs live 
>> outside of the file where the main() function is defined.
>> 
>> What do you think?
>> 
>> Best,
>> Yik San
>> 
>> On Tue, Apr 27, 2021 at 9:24 PM Dian Fu > > wrote:
>> I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle 
>> to serialize the Python UDF. 
>> 
>> For the latter case, I guess the whole Python UDF implementation will be 
>> serialized. However, for the previous case, only the path of the class is 
>> serialized.
>> 
>> Regards,
>> Dian
>> 
>>> 2021年4月27日 下午8:52,Yik San Chan >> > 写道:
>>> 
>>> Hi Dian,
>>> 
>>> Thanks! Adding -pyfs definitely helps.
>>> 
>>> However, I am curious. If I define my udf this way:
>>> 
>>> ```python
>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>> def decrypt(s):
>>> import pandas as pd
>>> d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, 
>>> index_col=0, squeeze=True).to_dict()
>>> return d.get(s, "unknown")
>>> ```
>>> 
>>> I can `flink run` without having to specify -pyfs option. The code can also 
>>> be found in the commit 
>>> https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607
>>>  
>>> .
>>>  I wonder why?
>>> 
>>> Best,
>>> Yik San
>>> 
>>> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu >> > wrote:
>>> Hi Yik San,
>>> 
>>> From the exception message, it’s clear that it could not find module 
>>> `decrypt_fun` during execution.
>>> 
>>> You need to specify file `decrypt_fun.py` as a dependency during submitting 
>>> the job, e.g. via -pyfs command line arguments. Otherwise, this file will 
>>> not be available during execution.
>>> 
>>> Regards,
>>> Dian
>>> 
 2021年4月27日 下午8:01,Yik San Chan >>> > 写道:
 
 Hi,
 
 Here's the reproducible code sample: 
 https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
  
 
 
 I implement my Python UDF by extending the ScalarFunction base class in a 
 separate file named decrypt_fun.py, and try to import the udf into my main 
 python file named udf_use_resource.py.
 
 However, after I `flink run`, I find the error log in TaskManager log:
 
 ```
 Caused by: java.lang.RuntimeException: Error received from SDK harness for 
 instruction 1: Traceback (most recent call last):
   File 
 "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
  line 376, in get
 processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
 IndexError: pop from empty list
 
 During handling of the above exception, another exception occurred:
 
 Traceback (most recent call last):
   File 
 "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
  line 253, in _execute
 response = task()
   File 
 "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
  line 310, in 
 lambda: self.create_worker().do_instruction(request), request)
   File 
 "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
  line 480, in do_instruction
 getattr(request, request_type), request.instruction_id)
   File 
 "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
  line 509, in process_bundle
 instruction_id, request.process_bundle_descriptor_id)
   File 
 "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
  line 382, in get
 self.data_channel_factory)
   File 
 "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
  line 847, in

Re: Using Hive UDFs

2021-04-27 Thread Shengkai Fang
Hi.

The order of the module may influence the load of the function.

[1] https://issues.apache.org/jira/browse/FLINK-22383

Youngwoo Kim (김영우)  于2021年4月28日周三 上午10:50写道:

> Hi,
>
> I've configured Hive metastore to use HiveCatalog in streaming
> application. So far, most of the features are working fine in hive
> integration.
>
> However, I have a problem in using Hive UDFs. Already done prerequisites
> to use Hive geospatial UDFs[1]
>
> To sanity check, I did run a query like below:
>
> tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");
>
>
> Got an exception like this:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: SQL validation failed. From line 1, column 18 to
> line 1, column 63: No match found for function signature
> ST_Point(, )
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation
> failed. From line 1, column 18 to line 1, column 63: No match found for
> function signature ST_Point(, )
>
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
>
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
>
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>
> at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>
> ... 11 more
>
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1, column 18 to line 1, column 63: No match found for function signature
> ST_Point(, )
>
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)
>
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)
>
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)
>
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
>
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)
>
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)
>
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756)
>
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741)
>
> at
> org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606)
>
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244)
>
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
>
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor

Re: Using Hive UDFs

2021-04-27 Thread Rui Li
Hi Youngwoo,

The catalog function is associated with a catalog and DB. Assuming you have
created the function ST_Point in your metastore, could you verify whether
the current catalog is your HiveCatalog and the current database is the
database in which ST_Point is registered?

On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang  wrote:

> Hi.
>
> The order of the module may influence the load of the function.
>
> [1] https://issues.apache.org/jira/browse/FLINK-22383
>
> Youngwoo Kim (김영우)  于2021年4月28日周三 上午10:50写道:
>
>> Hi,
>>
>> I've configured Hive metastore to use HiveCatalog in streaming
>> application. So far, most of the features are working fine in hive
>> integration.
>>
>> However, I have a problem in using Hive UDFs. Already done prerequisites
>> to use Hive geospatial UDFs[1]
>>
>> To sanity check, I did run a query like below:
>>
>> tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");
>>
>>
>> Got an exception like this:
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: SQL validation failed. From line 1, column 18 to
>> line 1, column 63: No match found for function signature
>> ST_Point(, )
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>>
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>>
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>>
>> at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>
>> Caused by: org.apache.flink.table.api.ValidationException: SQL validation
>> failed. From line 1, column 18 to line 1, column 63: No match found for
>> function signature ST_Point(, )
>>
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>>
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
>>
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
>>
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
>>
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
>>
>> at com.skt.chiron.FlinkApp.main(FlinkApp.java:67)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>>
>> ... 11 more
>>
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 18 to line 1, column 63: No match found for function signature
>> ST_Point(, )
>>
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>>
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>>
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>>
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)
>>
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838)
>>
>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)
>>
>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
>>
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)
>>
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869)
>>
>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>>
>> at
>> org.apache.calcite.sq

Re: Using Hive UDFs

2021-04-27 Thread 김영우
Thanks Shengkai and Rui for looking into this.

A snippet from my app. looks like following:

HiveCatalog hive = *new* HiveCatalog("flink-hive", "default",
"/tmp/hive");

tableEnv.registerCatalog("flink-hive", hive);


tableEnv.useCatalog("flink-hive");

tableEnv.loadModule("flink-hive", *new* HiveModule("3.1.2"));


tableEnv.getConfig().setSqlDialect(SqlDialect.*HIVE*);


tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia");

tableEnv.executeSql("USE flink_gaia");

tableEnv.executeSql("SHOW CURRENT CATALOG").print();

tableEnv.executeSql("SHOW CURRENT DATABASE").print();

tableEnv.executeSql("SHOW TABLES").print();

tableEnv.executeSql("SHOW FUNCTIONS").print();



// Test Hive UDF

tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


And I got the following output and exception:


+--+

| current catalog name |

+--+

|   flink-hive |

+--+

1 row in set

+---+

| current database name |

+---+

|flink_gaia |

+---+

1 row in set

+--+

|   table name |

+--+

| geofence |

| lcap |

| lcap_temporal_fenced |

+--+


++

|  function name |

++

|   regr_sxy |

..


380 rows in set


(snip)


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: SQL validation failed. From line 1, column 18 to line 1,
column 31: No match found for function signature ST_Point(,
)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)




Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 1:44 PM Rui Li  wrote:

> Hi Youngwoo,
>
> The catalog function is associated with a catalog and DB. Assuming you
> have created the function ST_Point in your metastore, could you verify
> whether the current catalog is your HiveCatalog and the current database is
> the database in which ST_Point is registered?
>
> On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang  wrote:
>
>> Hi.
>>
>> The order of the module may influence the load of the function.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-22383
>>
>> Youngwoo Kim (김영우)  于2021年4月28日周三 上午10:50写道:
>>
>>> Hi,
>>>
>>> I've configured Hive metastore to use HiveCatalog in streaming
>>> application. So far, most of the features are working fine in hive
>>> integration.
>>>
>>> However, I have a problem in using Hive UDFs. Already done prerequisites
>>> to use Hive geospatial UDFs[1]
>>>
>>> To sanity check, I did run a query like below:
>>>
>>> tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");
>>>
>>>
>>> Got an exception like this:
>>>
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: SQL validation failed. From line 1, column 18 to
>>> line 1, column 63: No match found for function signature
>>> ST_Point(, )
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>>>
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>>>
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>>>
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>>>
>>> at
>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(H

Re: Using Hive UDFs

2021-04-27 Thread Rui Li
Hi Youngwoo,

Could you please share the function jar and DDL you used to create the
function? I can try reproducing this issue locally.

On Wed, Apr 28, 2021 at 1:33 PM Youngwoo Kim (김영우)  wrote:

> Thanks Shengkai and Rui for looking into this.
>
> A snippet from my app. looks like following:
>
> HiveCatalog hive = *new* HiveCatalog("flink-hive", "default",
> "/tmp/hive");
>
> tableEnv.registerCatalog("flink-hive", hive);
>
>
> tableEnv.useCatalog("flink-hive");
>
> tableEnv.loadModule("flink-hive", *new* HiveModule("3.1.2"));
>
>
> tableEnv.getConfig().setSqlDialect(SqlDialect.*HIVE*);
>
>
> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia");
>
> tableEnv.executeSql("USE flink_gaia");
>
> tableEnv.executeSql("SHOW CURRENT CATALOG").print();
>
> tableEnv.executeSql("SHOW CURRENT DATABASE").print();
>
> tableEnv.executeSql("SHOW TABLES").print();
>
> tableEnv.executeSql("SHOW FUNCTIONS").print();
>
>
>
> // Test Hive UDF
>
> tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");
>
>
> And I got the following output and exception:
>
>
> +--+
>
> | current catalog name |
>
> +--+
>
> |   flink-hive |
>
> +--+
>
> 1 row in set
>
> +---+
>
> | current database name |
>
> +---+
>
> |flink_gaia |
>
> +---+
>
> 1 row in set
>
> +--+
>
> |   table name |
>
> +--+
>
> | geofence |
>
> | lcap |
>
> | lcap_temporal_fenced |
>
> +--+
>
>
> ++
>
> |  function name |
>
> ++
>
> |   regr_sxy |
>
> ..
>
>
> 380 rows in set
>
>
> (snip)
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: SQL validation failed. From line 1, column 18 to
> line 1, column 31: No match found for function signature
> ST_Point(, )
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
>
>
>
> Thanks,
>
> Youngwoo
>
>
>
> On Wed, Apr 28, 2021 at 1:44 PM Rui Li  wrote:
>
>> Hi Youngwoo,
>>
>> The catalog function is associated with a catalog and DB. Assuming you
>> have created the function ST_Point in your metastore, could you verify
>> whether the current catalog is your HiveCatalog and the current database is
>> the database in which ST_Point is registered?
>>
>> On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang  wrote:
>>
>>> Hi.
>>>
>>> The order of the module may influence the load of the function.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-22383
>>>
>>> Youngwoo Kim (김영우)  于2021年4月28日周三 上午10:50写道:
>>>
 Hi,

 I've configured Hive metastore to use HiveCatalog in streaming
 application. So far, most of the features are working fine in hive
 integration.

 However, I have a problem in using Hive UDFs. Already done
 prerequisites to use Hive geospatial UDFs[1]

 To sanity check, I did run a query like below:

 tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");


 Got an exception like this:


 org.apache.flink.client.program.ProgramInvocationException: The main
 method caused an error: SQL validation failed. From line 1, column 18 to
 line 1, column 63: No match found for function signature
 ST_Point(, )

 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

 at
 org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

 at
 org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

 at
 org.apache.flink.client.cli

Re: Using Hive UDFs

2021-04-27 Thread 김영우
Hey Rui,

For geospatial udfs, I've configured these jars to my flink deployment:

# Flink-Hive

RUN wget -q -O
/opt/flink/lib/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.12.2/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar
\

  && wget -q -O /opt/flink/lib/hive-exec-3.1.2.jar
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.2/hive-exec-3.1.2.jar
\

  && wget -q -O /opt/flink/lib/libfb303-0.9.3.jar
http://databus.dbpedia.org:8081/repository/internal/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar


# Hive geospatial udf, https://github.com/Esri/spatial-framework-for-hadoop

RUN wget -q -O /opt/flink/lib/spatial-sdk-hive.jar
https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-hive-2.2.0.jar
\

  && wget -q -O /opt/flink/lib/spatial-sdk-json.jar
https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-json-2.2.0.jar
\

  && wget -q -O /opt/flink/lib/esri-geometry-api.jar
https://repo1.maven.org/maven2/com/esri/geometry/esri-geometry-api/2.2.4/esri-geometry-api-2.2.4.jar



As I mentioned above, I did not register the functions explicitly because
the 'CREATE FUNCTION ...'  statement did not work for me. If I run this
statement, e.g., "CREATE FUNCTION ST_GeomFromText AS
 'com.esri.hadoop.hive.ST_GeomFromText'" :


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Function flink_gaia.ST_GeomFromText already exists in
Catalog flink-hive.

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)

at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.ValidationException: Function
flink_gaia.ST_GeomFromText already exists in Catalog flink-hive.

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at com.skt.chiron.FlinkApp.main(FlinkApp.java:58)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)

... 11 more



Thanks,

Youngwoo



On Wed, Apr 28, 2021 at 3:05 PM Rui Li  wrote:

> Hi Youngwoo,
>
> Could you please share the function jar and DDL you used to create the
> function? I can try reproducing this issue locally.
>
> On Wed, Apr 28, 2021 at 1:33 PM Youngwoo Kim (김영우) 
> wrote:
>
>> Thanks Shengkai and Rui for looking into this.
>>
>> A snippet from my app. looks like following:
>>
>> HiveCatalog hive = *new* HiveCatalog("flink-hive", "default",
>> "/tmp/hive");
>>
>> tableEnv.registerCatalog("flink-hive", hive);
>>
>>
>> tableEnv.useCatalog("flink-hive");
>>
>> tableEnv.loadModule("flink-hive", *new* HiveModule("3.1.2"));
>>
>>
>> tableEnv.getConfig().setSqlDialect(SqlDialect.*HIVE*);
>>
>>
>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia");
>>
>> tableEnv.executeSql("USE flink_gaia");
>>
>> tableEnv.executeSql("SHOW CURRENT CATALOG").print();
>>
>> tableEnv.executeSql("SHOW CURRENT DATABASE").print();
>>
>> tableEnv.executeSql("SHOW TABLES").print();
>>
>> tableEnv.executeSql("SHOW FUNCTIONS").print();
>>
>>
>>
>> // Test Hive UDF
>>
>> tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))");
>>
>>
>> And I got the following output and exception:
>>
>>
>> +--+
>>
>> | current catalog name |
>>
>> +--+
>>
>> |   flink-hive |
>>
>> +--+
>>
>> 1 row in set
>>
>> +---+
>>
>> | current database name |
>>
>> +---+
>>
>> |flink_