well now i know what the problem could be.

You are trying to execute a job on a cluster (== not local), but have set the local flag to true.
    env.execute(local=True)

Due to this flag the files are only copied into the tmp directory of the node where you execute the plan, and are thus not accessible from other worker nodes.

In order to use the Python API on a cluster you *must* have a filesystem that is accessible by all workers (like HDFS) to which the files can be copied. From there they can be distributed to the nodes via the DC.**

On 17.07.2016 17:33, Geoffrey Mon wrote:
I haven't yet figured out how to write a Java job to test DistributedCache functionality between machines; I've only gotten worker nodes to create caches from local files (on the same worker nodes), rather than on files from the master node. The DistributedCache test I've been using (based on the DistributedCacheTest unit test) is here:
https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7

I realized that this test only tested local files because I was getting an error that the file used for the cache was not found until I created that file on the worker node in the location specified in the plan.

I've been trying to run a simple Python example that does word counting:
https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f

I've tried three different setups so far: I've tried virtual machines, AWS virtual machine instances, and physical machines. With each setup, I get the same errors.

Although with all three of these setups, basic Java jobs can be run (like WordCount, PageRank), Python programs cannot be run because the files needed to run them are not properly distributed to the worker nodes. I've found that although the master node reads the Python libraries and plan files (presumably to send them to the worker), the worker node never writes any of those files to disk, despite the files being added to the list of files in the distributed cache via DistributedCache.writeFileInfotoConfig (which I found via remote debugging).

When a Python program is run via pyflink, it executes but crashes as soon as there is any sort of operation requiring mapping. The following exception is thrown:

2016-07-17 09:39:50,857 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - MapPartition (PythonFlatMap -> PythonMap) (1/1) (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED 2016-07-17 09:39:50,863 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49 EDT 2016) changed to FAILING. java.lang.Exception: The user defined 'open()' method caused an exception: An error occurred while copying the file.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: An error occurred while copying the file. at org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
<snip>
Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not exist or the user running Flink ('gmon') has insufficient permissions to access it. at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
at org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
<snip>
... 1 more

If the pyflink library is manually copied into place at /tmp/flink, that error will be replaced by the following:

2016-07-17 00:10:54,342 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - MapPartition (PythonFlatMap -> PythonMap) (1/1) (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED 2016-07-17 00:10:54,348 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51 EDT 2016) changed to FAILING. java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely. python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
<snip>
Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonFlatMap -> PythonMap) terminated prematurely. python3: can't open file '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': [Errno 2] No such file or directory at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144) at org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92) at org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
<snip>
... 3 more

Both of these exceptions point to Flink not properly copying the requested files. Has anyone else successfully run Python jobs on a Flink cluster, or is there a bug preventing successful operation? Unfortunately, I am relying on using a Flink cluster to run a Python job for some scientific data that needs to be completed soon.

Thank for your assistance,
Geoffrey

On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    Please also post the job you're trying to run.


    On 17.07.2016 08:43, Geoffrey Mon wrote:
    The Java program I used to test DistributedCache was faulty since
    it actually created the cache from files on the machine on which
    the program was running (i.e. the worker node).

    I tried implementing a cluster again, this time using two actual
    machines instead of virtual machines. I found the same error of
    the Python libraries and plan file not being found in the
    temporary directory. Has anyone else been able to successfully
    set up a Flink cluster to run Python jobs? I've been beginning to
    suspect that there may be some issues with running Python jobs on
    Flink clusters that are present in Flink.

    Cheers,
    Geoffrey

    On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <geof...@gmail.com
    <mailto:geof...@gmail.com>> wrote:

        I wrote a simple Java plan that reads a file in the
        distributed cache and uses the first line from that file in a
        map operation. Sure enough, it works locally, but fails when
        the job is sent to a taskmanager on a worker node. Since
        DistributedCache seems to work for everyone else, I'm
        thinking that maybe some sort of file permissions are not
        properly set such that Flink is not able to successfully
        write distributed cache files.

        I used inotify-tools to watch the temporary files directory
        on both the master node and worker node. When the plan is
        being prepared, the jobmanager node wrote the Python modules
        and plan file to the temporary files directory. However, on
        the worker node, the directory tree was created, but the job
        failed before any of the module or plan files were even
        attempted to be written. Interestingly enough, there were no
        error messages or warnings about the cache.

        Cheers,
        Geoffrey

        On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler
        <ches...@apache.org <mailto:ches...@apache.org>> wrote:

            Could you write a java job that uses the Distributed
            cache to distribute files?

            If this fails then the DC is faulty, if it doesn't
            something in the Python API is wrong.


            On 15.07.2016 08:06, Geoffrey Mon wrote:
            I've come across similar issues when trying to set up
            Flink on Amazon EC2 instances. Presumably there is
            something wrong with my setup? Here is the
            flink-conf.yaml I am using:
            
https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml

            Thanks,
            Geoffrey

            On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon
            <geof...@gmail.com <mailto:geof...@gmail.com>> wrote:

                Hello,

                Here is the TaskManager log on pastebin:
                http://pastebin.com/XAJ56gn4

                I will look into whether the files were created.

                By the way, the cluster is made with virtual
                machines running on BlueData EPIC. I don't know if
                that might be related to the problem.

                Thanks,
                Geoffrey


                On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler
                <ches...@apache.org <mailto:ches...@apache.org>> wrote:

                    Hello Geoffrey,

                    How often does this occur?

                    Flink distributes the user-code and the python
                    library using the Distributed Cache.

                    Either the file is deleted right after being
                    created for some reason, or the DC returns a
                    file name before the file was created (which
                    shouldn't happen, it should block it is available).

                    If you are up to debugging this i would suggest
                    looking into FileCache class and verifying
                    whether the file in question is in fact created.

                    The logs of the TaskManager of which the
                    exception occurs could be of interest too; could
                    you send them to me?

                    Regards,
                    Chesnay


                    On 13.07.2016 04:11, Geoffrey Mon wrote:
                    Hello all,

                    I've set up Flink on a very small cluster of
                    one master node and five worker nodes,
                    following the instructions in the documentation
                    
(https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
                    I can run the included examples like WordCount
                    and PageRank across the entire cluster, but
                    when I try to run simple Python examples, I
                    sometimes get a strange error on the first
                    PythonMapPartition about the temporary folders
                    that contain the streams of data between Python
                    and Java.

                    If I run jobs on only the taskmanager on the
                    master node, Python examples run fine. However,
                    if the jobs use the worker nodes, then I get
                    the following error:

                    org.apache.flink.client.program.ProgramInvocationException:
                    The program execution failed: Job execution failed.
                    at
                    
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
                    <snip>
                    Caused by:
                    org.apache.flink.runtime.client.JobExecutionException:
                    Job execution failed.
                    at
                    
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
                    <snip>
                    Caused by: java.lang.Exception: The user
                    defined 'open()' method caused an exception:
                    External process for task MapPartition
                    (PythonMap) terminated prematurely.
                    python: can't open file
                    
'/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
                    [Errno 2] No such file or directory
                    at
                    
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
                    <snip>
                    Caused by: java.lang.RuntimeException: External
                    process for task MapPartition (PythonMap)
                    terminated prematurely.
                    python: can't open file
                    
'/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
                    [Errno 2] No such file or directory
                    at
                    
org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
                    at
                    
org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
                    at
                    
org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
                    at
                    
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
                    at
                    
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
                    ... 5 more

                    I'm suspecting this issue has something to do
                    with the data sending between the master and
                    the workers, but I haven't been able to find
                    any solutions. Presumably the temporary files
                    weren't received properly and thus were not
                    created properly?

                    Thanks in advance.

                    Cheers,
                    Geoffrey





Reply via email to