Prabhu Joseph created FLINK-33529:
-------------------------------------
Summary: PyFlink fails with "No module named 'cloudpickle"
Key: FLINK-33529
URL: https://issues.apache.org/jira/browse/FLINK-33529
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.18.0
Environment: Python 3.7.16 or Python 3.9
YARN
Reporter: Prabhu Joseph
PyFlink fails with "No module named 'cloudpickle" on Flink 1.18. The same
program works fine on Flink 1.17. This is after the change
(https://issues.apache.org/jira/browse/FLINK-32034).
*Repro:*
{code}
[hadoop@ip-1-2-3-4 ~]$ python --version
Python 3.7.16
[hadoop@ip-1-2-3-4 ~]$ rpm -qa | grep flink
flink-1.18.0-1.amzn2.x86_64
[hadoop@ip-1-2-3-4 ~]$ flink-yarn-session -d
[hadoop@ip-1-2-3-4 ~]$ flink run -py /tmp/batch_wc.py --output
s3://prabhuflinks3/OUT2/
{code}
*Error*
{code}
ModuleNotFoundError: No module named 'cloudpickle'
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
at
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
at
org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
{code}
*Analysis*
1. On Flink 1.17 and Python-3.7.16,
PythonEnvironmentManagerUtils#getSitePackagesPath used to return following two
paths
{code}
[root@ip-172-31-45-97 tmp]# python flink1.17-get_site_packages.py /tmp
/tmp/lib/python3.7/site-packages
/tmp/lib64/python3.7/site-packages
{code}
whereas Flink 1.18 (FLINK-32034) has changed the
PythonEnvironmentManagerUtils#getSitePackagesPath and only one path is returned
{code}
[root@ip-172-31-45-97 tmp]# python flink1.18-get_site_packages.py /tmp
/tmp/lib64/python3.7/site-packages
[root@ip-172-31-45-97 tmp]#
{code}
The pyflink dependencies are installed in "/tmp/lib/python3.7/site-packages"
which is not returned by the getSitePackagesPath in Flink1.18 causing the
pyflink job failure.
Attached batch_wc.py, flink1.17-get_site_packages.py and
flink1.18-get_site_packages.py.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)