Hello Chesnay,

Thank you very much! With your help I've managed to set up a Flink cluster
that can run Python jobs successfully. I solved my issue by removing
local=True and installing HDFS in a separate cluster.

I don't think it was clearly mentioned in the documentation that HDFS was
required for Python-running clusters. Would it be a good idea to include
that in the documentation?

Cheers,
Geoffrey

On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler <ches...@apache.org>
wrote:

> well now i know what the problem could be.
>
> You are trying to execute a job on a cluster (== not local), but have set
> the local flag to true.
>     env.execute(local=True)
>
> Due to this flag the files are only copied into the tmp directory of the
> node where you execute the plan, and are thus not accessible from other
> worker nodes.
>
> In order to use the Python API on a cluster you *must* have a filesystem
> that is accessible by all workers (like HDFS) to which the files can be
> copied. From there they can be distributed to the nodes via the DC.
>
>
> On 17.07.2016 17:33, Geoffrey Mon wrote:
>
> I haven't yet figured out how to write a Java job to test DistributedCache
> functionality between machines; I've only gotten worker nodes to create
> caches from local files (on the same worker nodes), rather than on files
> from the master node. The DistributedCache test I've been using (based on
> the DistributedCacheTest unit test) is here:
> https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7
>
> I realized that this test only tested local files because I was getting an
> error that the file used for the cache was not found until I created that
> file on the worker node in the location specified in the plan.
>
> I've been trying to run a simple Python example that does word counting:
> https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f
>
> I've tried three different setups so far: I've tried virtual machines, AWS
> virtual machine instances, and physical machines. With each setup, I get
> the same errors.
>
> Although with all three of these setups, basic Java jobs can be run (like
> WordCount, PageRank), Python programs cannot be run because the files
> needed to run them are not properly distributed to the worker nodes. I've
> found that although the master node reads the Python libraries and plan
> files (presumably to send them to the worker), the worker node never writes
> any of those files to disk, despite the files being added to the list of
> files in the distributed cache via DistributedCache.writeFileInfotoConfig
> (which I found via remote debugging).
>
> When a Python program is run via pyflink, it executes but crashes as soon
> as there is any sort of operation requiring mapping. The following
> exception is thrown:
>
> 2016-07-17 09:39:50,857 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> MapPartition (PythonFlatMap -> PythonMap) (1/1)
> (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
> 2016-07-17 09:39:50,863 INFO
>  org.apache.flink.runtime.jobmanager.JobManager                - Status of
> job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49
> EDT 2016) changed to FAILING.
> java.lang.Exception: The user defined 'open()' method caused an exception:
> An error occurred while copying the file.
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
> <snip>
> Caused by: java.lang.RuntimeException: An error occurred while copying the
> file.
> at
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
> at
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
> <snip>
> Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not
> exist or the user running Flink ('gmon') has insufficient permissions to
> access it.
> at
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
> at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
> at
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
> <snip>
> ... 1 more
>
> If the pyflink library is manually copied into place at /tmp/flink, that
> error will be replaced by the following:
>
> 2016-07-17 00:10:54,342 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> MapPartition (PythonFlatMap -> PythonMap) (1/1)
> (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
> 2016-07-17 00:10:54,348 INFO
>  org.apache.flink.runtime.jobmanager.JobManager                - Status of
> job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51
> EDT 2016) changed to FAILING.
> java.lang.Exception: The user defined 'open()' method caused an exception:
> External process for task MapPartition (PythonFlatMap -> PythonMap)
> terminated prematurely.
> python3: can't open file
> '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py':
> [Errno 2] No such file or directory
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
> <snip>
> Caused by: java.lang.RuntimeException: External process for task
> MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
> python3: can't open file
> '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py':
> [Errno 2] No such file or directory
> at
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
> at
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
> at
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
> <snip>
> ... 3 more
>
> Both of these exceptions point to Flink not properly copying the requested
> files. Has anyone else successfully run Python jobs on a Flink cluster, or
> is there a bug preventing successful operation? Unfortunately, I am relying
> on using a Flink cluster to run a Python job for some scientific data that
> needs to be completed soon.
>
> Thank for your assistance,
> Geoffrey
>
> On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Please also post the job you're trying to run.
>>
>>
>> On 17.07.2016 08:43, Geoffrey Mon wrote:
>>
>> The Java program I used to test DistributedCache was faulty since it
>> actually created the cache from files on the machine on which the program
>> was running (i.e. the worker node).
>>
>> I tried implementing a cluster again, this time using two actual machines
>> instead of virtual machines. I found the same error of the Python libraries
>> and plan file not being found in the temporary directory. Has anyone else
>> been able to successfully set up a Flink cluster to run Python jobs? I've
>> been beginning to suspect that there may be some issues with running Python
>> jobs on Flink clusters that are present in Flink.
>>
>> Cheers,
>> Geoffrey
>>
>> On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <geof...@gmail.com> wrote:
>>
>>> I wrote a simple Java plan that reads a file in the distributed cache
>>> and uses the first line from that file in a map operation. Sure enough, it
>>> works locally, but fails when the job is sent to a taskmanager on a worker
>>> node. Since DistributedCache seems to work for everyone else, I'm thinking
>>> that maybe some sort of file permissions are not properly set such that
>>> Flink is not able to successfully write distributed cache files.
>>>
>>> I used inotify-tools to watch the temporary files directory on both the
>>> master node and worker node. When the plan is being prepared, the
>>> jobmanager node wrote the Python modules and plan file to the temporary
>>> files directory. However, on the worker node, the directory tree was
>>> created, but the job failed before any of the module or plan files were
>>> even attempted to be written. Interestingly enough, there were no error
>>> messages or warnings about the cache.
>>>
>>> Cheers,
>>> Geoffrey
>>>
>>> On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <ches...@apache.org>
>>> wrote:
>>>
>>>> Could you write a java job that uses the Distributed cache to
>>>> distribute files?
>>>>
>>>> If this fails then the DC is faulty, if it doesn't something in the
>>>> Python API is wrong.
>>>>
>>>>
>>>> On 15.07.2016 08:06, Geoffrey Mon wrote:
>>>>
>>>> I've come across similar issues when trying to set up Flink on Amazon
>>>> EC2 instances. Presumably there is something wrong with my setup? Here is
>>>> the flink-conf.yaml I am using:
>>>>
>>>> https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml
>>>>
>>>> Thanks,
>>>> Geoffrey
>>>>
>>>> On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <geof...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Here is the TaskManager log on pastebin:
>>>>> http://pastebin.com/XAJ56gn4
>>>>>
>>>>> I will look into whether the files were created.
>>>>>
>>>>> By the way, the cluster is made with virtual machines running on
>>>>> BlueData EPIC. I don't know if that might be related to the problem.
>>>>>
>>>>> Thanks,
>>>>> Geoffrey
>>>>>
>>>>>
>>>>> On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <ches...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hello Geoffrey,
>>>>>>
>>>>>> How often does this occur?
>>>>>>
>>>>>> Flink distributes the user-code and the python library using the
>>>>>> Distributed Cache.
>>>>>>
>>>>>> Either the file is deleted right after being created for some reason,
>>>>>> or the DC returns a file name before the file was created (which 
>>>>>> shouldn't
>>>>>> happen, it should block it is available).
>>>>>>
>>>>>> If you are up to debugging this i would suggest looking into
>>>>>> FileCache class and verifying whether the file in question is in fact
>>>>>> created.
>>>>>>
>>>>>> The logs of the TaskManager of which the exception occurs could be of
>>>>>> interest too; could you send them to me?
>>>>>>
>>>>>> Regards,
>>>>>> Chesnay
>>>>>>
>>>>>>
>>>>>> On 13.07.2016 04:11, Geoffrey Mon wrote:
>>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I've set up Flink on a very small cluster of one master node and five
>>>>>> worker nodes, following the instructions in the documentation (
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
>>>>>> I can run the included examples like WordCount and PageRank across the
>>>>>> entire cluster, but when I try to run simple Python examples, I sometimes
>>>>>> get a strange error on the first PythonMapPartition about the temporary
>>>>>> folders that contain the streams of data between Python and Java.
>>>>>>
>>>>>> If I run jobs on only the taskmanager on the master node, Python
>>>>>> examples run fine. However, if the jobs use the worker nodes, then I get
>>>>>> the following error:
>>>>>>
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>> program execution failed: Job execution failed.
>>>>>> at
>>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
>>>>>> <snip>
>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>>> execution failed.
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
>>>>>> <snip>
>>>>>> Caused by: java.lang.Exception: The user defined 'open()' method
>>>>>> caused an exception: External process for task MapPartition (PythonMap)
>>>>>> terminated prematurely.
>>>>>> python: can't open file
>>>>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
>>>>>> [Errno 2] No such file or directory
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>>>>>> <snip>
>>>>>> Caused by: java.lang.RuntimeException: External process for task
>>>>>> MapPartition (PythonMap) terminated prematurely.
>>>>>> python: can't open file
>>>>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
>>>>>> [Errno 2] No such file or directory
>>>>>> at
>>>>>> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
>>>>>> at
>>>>>> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
>>>>>> at
>>>>>> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
>>>>>> at
>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
>>>>>> ... 5 more
>>>>>>
>>>>>> I'm suspecting this issue has something to do with the data sending
>>>>>> between the master and the workers, but I haven't been able to find any
>>>>>> solutions. Presumably the temporary files weren't received properly and
>>>>>> thus were not created properly?
>>>>>>
>>>>>> Thanks in advance.
>>>>>>
>>>>>> Cheers,
>>>>>> Geoffrey
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>
>

Reply via email to