Glad to hear it! The HDFS requirement should most definitely be documented; i assumed it already was actually...

On 19.07.2016 03:42, Geoffrey Mon wrote:
Hello Chesnay,

Thank you very much! With your help I've managed to set up a Flink cluster that can run Python jobs successfully. I solved my issue by removing local=True and installing HDFS in a separate cluster.

I don't think it was clearly mentioned in the documentation that HDFS was required for Python-running clusters. Would it be a good idea to include that in the documentation?

Cheers,
Geoffrey

On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    well now i know what the problem could be.

    You are trying to execute a job on a cluster (== not local), but
    have set the local flag to true.
        env.execute(local=True)

    Due to this flag the files are only copied into the tmp directory
    of the node where you execute the plan, and are thus not
    accessible from other worker nodes.

    In order to use the Python API on a cluster you *must* have a
    filesystem that is accessible by all workers (like HDFS) to which
    the files can be copied. From there they can be distributed to the
    nodes via the DC.


    On 17.07.2016 17:33, Geoffrey Mon wrote:
    I haven't yet figured out how to write a Java job to test
    DistributedCache functionality between machines; I've only gotten
    worker nodes to create caches from local files (on the same
    worker nodes), rather than on files from the master node. The
    DistributedCache test I've been using (based on the
    DistributedCacheTest unit test) is here:
    https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7

    I realized that this test only tested local files because I was
    getting an error that the file used for the cache was not found
    until I created that file on the worker node in the location
    specified in the plan.

    I've been trying to run a simple Python example that does word
    counting:
    https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f

    I've tried three different setups so far: I've tried virtual
    machines, AWS virtual machine instances, and physical machines.
    With each setup, I get the same errors.

    Although with all three of these setups, basic Java jobs can be
    run (like WordCount, PageRank), Python programs cannot be run
    because the files needed to run them are not properly distributed
    to the worker nodes. I've found that although the master node
    reads the Python libraries and plan files (presumably to send
    them to the worker), the worker node never writes any of those
    files to disk, despite the files being added to the list of files
    in the distributed cache via
    DistributedCache.writeFileInfotoConfig (which I found via remote
    debugging).

    When a Python program is run via pyflink, it executes but crashes
    as soon as there is any sort of operation requiring mapping. The
    following exception is thrown:

    2016-07-17 09:39:50,857 INFO
     org.apache.flink.runtime.executiongraph.ExecutionGraph        -
    MapPartition (PythonFlatMap -> PythonMap) (1/1)
    (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
    2016-07-17 09:39:50,863 INFO
     org.apache.flink.runtime.jobmanager.JobManager          - Status
    of job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun
    Jul 17 09:39:49 EDT 2016) changed to FAILING.
    java.lang.Exception: The user defined 'open()' method caused an
    exception: An error occurred while copying the file.
    at
    org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
    <snip>
    Caused by: java.lang.RuntimeException: An error occurred while
    copying the file.
    at
    
org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
    at
    
org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
    <snip>
    Caused by: java.io.FileNotFoundException: File file:/tmp/flink
    does not exist or the user running Flink ('gmon') has
    insufficient permissions to access it.
    at
    
org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
    at
    org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
    at
    
org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
    <snip>
    ... 1 more

    If the pyflink library is manually copied into place at
    /tmp/flink, that error will be replaced by the following:

    2016-07-17 00:10:54,342 INFO
     org.apache.flink.runtime.executiongraph.ExecutionGraph        -
    MapPartition (PythonFlatMap -> PythonMap) (1/1)
    (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
    2016-07-17 00:10:54,348 INFO
     org.apache.flink.runtime.jobmanager.JobManager            -
    Status of job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at
    Sun Jul 17 00:10:51 EDT 2016) changed to FAILING.
    java.lang.Exception: The user defined 'open()' method caused an
    exception: External process for task MapPartition (PythonFlatMap
    -> PythonMap) terminated prematurely.
    python3: can't open file
    
'/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py':
    [Errno 2] No such file or directory
    at
    org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
    <snip>
    Caused by: java.lang.RuntimeException: External process for task
    MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
    python3: can't open file
    
'/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py':
    [Errno 2] No such file or directory
    at
    
org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
    at
    
org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
    at
    
org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
    <snip>
    ... 3 more

    Both of these exceptions point to Flink not properly copying the
    requested files. Has anyone else successfully run Python jobs on
    a Flink cluster, or is there a bug preventing successful
    operation? Unfortunately, I am relying on using a Flink cluster
    to run a Python job for some scientific data that needs to be
    completed soon.

    Thank for your assistance,
    Geoffrey

    On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:

        Please also post the job you're trying to run.


        On 17.07.2016 08:43, Geoffrey Mon wrote:
        The Java program I used to test DistributedCache was faulty
        since it actually created the cache from files on the
        machine on which the program was running (i.e. the worker node).

        I tried implementing a cluster again, this time using two
        actual machines instead of virtual machines. I found the
        same error of the Python libraries and plan file not being
        found in the temporary directory. Has anyone else been able
        to successfully set up a Flink cluster to run Python jobs?
        I've been beginning to suspect that there may be some issues
        with running Python jobs on Flink clusters that are present
        in Flink.

        Cheers,
        Geoffrey

        On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon
        <geof...@gmail.com <mailto:geof...@gmail.com>> wrote:

            I wrote a simple Java plan that reads a file in the
            distributed cache and uses the first line from that file
            in a map operation. Sure enough, it works locally, but
            fails when the job is sent to a taskmanager on a worker
            node. Since DistributedCache seems to work for everyone
            else, I'm thinking that maybe some sort of file
            permissions are not properly set such that Flink is not
            able to successfully write distributed cache files.

            I used inotify-tools to watch the temporary files
            directory on both the master node and worker node. When
            the plan is being prepared, the jobmanager node wrote
            the Python modules and plan file to the temporary files
            directory. However, on the worker node, the directory
            tree was created, but the job failed before any of the
            module or plan files were even attempted to be written.
            Interestingly enough, there were no error messages or
            warnings about the cache.

            Cheers,
            Geoffrey

            On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler
            <ches...@apache.org <mailto:ches...@apache.org>> wrote:

                Could you write a java job that uses the Distributed
                cache to distribute files?

                If this fails then the DC is faulty, if it doesn't
                something in the Python API is wrong.


                On 15.07.2016 08:06, Geoffrey Mon wrote:
                I've come across similar issues when trying to set
                up Flink on Amazon EC2 instances. Presumably there
                is something wrong with my setup? Here is the
                flink-conf.yaml I am using:
                
https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml

                Thanks,
                Geoffrey

                On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon
                <geof...@gmail.com <mailto:geof...@gmail.com>> wrote:

                    Hello,

                    Here is the TaskManager log on pastebin:
                    http://pastebin.com/XAJ56gn4

                    I will look into whether the files were created.

                    By the way, the cluster is made with virtual
                    machines running on BlueData EPIC. I don't know
                    if that might be related to the problem.

                    Thanks,
                    Geoffrey


                    On Wed, Jul 13, 2016 at 6:12 AM Chesnay
                    Schepler <ches...@apache.org
                    <mailto:ches...@apache.org>> wrote:

                        Hello Geoffrey,

                        How often does this occur?

                        Flink distributes the user-code and the
                        python library using the Distributed Cache.

                        Either the file is deleted right after
                        being created for some reason, or the DC
                        returns a file name before the file was
                        created (which shouldn't happen, it should
                        block it is available).

                        If you are up to debugging this i would
                        suggest looking into FileCache class and
                        verifying whether the file in question is
                        in fact created.

                        The logs of the TaskManager of which the
                        exception occurs could be of interest too;
                        could you send them to me?

                        Regards,
                        Chesnay


                        On 13.07.2016 04:11, Geoffrey Mon wrote:
                        Hello all,

                        I've set up Flink on a very small cluster
                        of one master node and five worker nodes,
                        following the instructions in the
                        documentation
                        
(https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
                        I can run the included examples like
                        WordCount and PageRank across the entire
                        cluster, but when I try to run simple
                        Python examples, I sometimes get a strange
                        error on the first PythonMapPartition
                        about the temporary folders that contain
                        the streams of data between Python and Java.

                        If I run jobs on only the taskmanager on
                        the master node, Python examples run fine.
                        However, if the jobs use the worker nodes,
                        then I get the following error:

                        
org.apache.flink.client.program.ProgramInvocationException:
                        The program execution failed: Job
                        execution failed.
                        at
                        
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
                        <snip>
                        Caused by:
                        org.apache.flink.runtime.client.JobExecutionException:
                        Job execution failed.
                        at
                        
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
                        <snip>
                        Caused by: java.lang.Exception: The user
                        defined 'open()' method caused an
                        exception: External process for task
                        MapPartition (PythonMap) terminated
                        prematurely.
                        python: can't open file
                        
'/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
                        [Errno 2] No such file or directory
                        at
                        
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
                        <snip>
                        Caused by: java.lang.RuntimeException:
                        External process for task MapPartition
                        (PythonMap) terminated prematurely.
                        python: can't open file
                        
'/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
                        [Errno 2] No such file or directory
                        at
                        
org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
                        at
                        
org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
                        at
                        
org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
                        at
                        
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
                        at
                        
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
                        ... 5 more

                        I'm suspecting this issue has something to
                        do with the data sending between the
                        master and the workers, but I haven't been
                        able to find any solutions. Presumably the
                        temporary files weren't received properly
                        and thus were not created properly?

                        Thanks in advance.

                        Cheers,
                        Geoffrey






Reply via email to