Prabhu Joseph created FLINK-33529:
-------------------------------------

             Summary: PyFlink fails with "No module named 'cloudpickle"
                 Key: FLINK-33529
                 URL: https://issues.apache.org/jira/browse/FLINK-33529
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.18.0
         Environment: Python 3.7.16 or Python 3.9
YARN
            Reporter: Prabhu Joseph


PyFlink fails with "No module named 'cloudpickle" on Flink 1.18. The same 
program works fine on Flink 1.17. This is after the change 
(https://issues.apache.org/jira/browse/FLINK-32034).

*Repro:*

{code}
[hadoop@ip-1-2-3-4 ~]$ python --version
Python 3.7.16

[hadoop@ip-1-2-3-4 ~]$ rpm -qa | grep flink
flink-1.18.0-1.amzn2.x86_64

[hadoop@ip-1-2-3-4 ~]$ flink-yarn-session -d

[hadoop@ip-1-2-3-4 ~]$ flink run -py /tmp/batch_wc.py --output 
s3://prabhuflinks3/OUT2/
{code}


*Error*

{code}
ModuleNotFoundError: No module named 'cloudpickle'

        at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
        at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
        at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
        at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:92)
        at 
org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator.open(PythonTableFunctionOperator.java:114)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
{code}

*Analysis*

1. On Flink 1.17 and Python-3.7.16, 
PythonEnvironmentManagerUtils#getSitePackagesPath used to return following two 
paths

{code}
[root@ip-172-31-45-97 tmp]# python flink1.17-get_site_packages.py /tmp
/tmp/lib/python3.7/site-packages
/tmp/lib64/python3.7/site-packages
{code}

whereas Flink 1.18 (FLINK-32034) has changed the 
PythonEnvironmentManagerUtils#getSitePackagesPath and only one path is returned

{code}
[root@ip-172-31-45-97 tmp]# python flink1.18-get_site_packages.py /tmp
/tmp/lib64/python3.7/site-packages
[root@ip-172-31-45-97 tmp]#
{code}

The pyflink dependencies are installed in "/tmp/lib/python3.7/site-packages" 
which is not returned by the getSitePackagesPath in Flink1.18 causing the 
pyflink job failure.

Attached batch_wc.py, flink1.17-get_site_packages.py and 
flink1.18-get_site_packages.py.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to