Hi!

HDFS is mentioned in the docs but not explicitly listed as a requirement:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/python.html#project-setup

I suppose the Python API could also distribute its libraries through
Flink's BlobServer.

Cheers,
Max

On Tue, Jul 19, 2016 at 9:24 AM, Chesnay Schepler <ches...@apache.org>
wrote:

> Glad to hear it! The HDFS requirement should most definitely be
> documented; i assumed it already was actually...
>
>
> On 19.07.2016 03:42, Geoffrey Mon wrote:
>
> Hello Chesnay,
>
> Thank you very much! With your help I've managed to set up a Flink cluster
> that can run Python jobs successfully. I solved my issue by removing
> local=True and installing HDFS in a separate cluster.
>
> I don't think it was clearly mentioned in the documentation that HDFS was
> required for Python-running clusters. Would it be a good idea to include
> that in the documentation?
>
> Cheers,
> Geoffrey
>
> On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> well now i know what the problem could be.
>>
>> You are trying to execute a job on a cluster (== not local), but have set
>> the local flag to true.
>>     env.execute(local=True)
>>
>> Due to this flag the files are only copied into the tmp directory of the
>> node where you execute the plan, and are thus not accessible from other
>> worker nodes.
>>
>> In order to use the Python API on a cluster you *must* have a filesystem
>> that is accessible by all workers (like HDFS) to which the files can be
>> copied. From there they can be distributed to the nodes via the DC.
>>
>>
>> On 17.07.2016 17:33, Geoffrey Mon wrote:
>>
>> I haven't yet figured out how to write a Java job to test
>> DistributedCache functionality between machines; I've only gotten worker
>> nodes to create caches from local files (on the same worker nodes), rather
>> than on files from the master node. The DistributedCache test I've been
>> using (based on the DistributedCacheTest unit test) is here:
>> https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7
>>
>> I realized that this test only tested local files because I was getting
>> an error that the file used for the cache was not found until I created
>> that file on the worker node in the location specified in the plan.
>>
>> I've been trying to run a simple Python example that does word counting:
>> https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f
>>
>> I've tried three different setups so far: I've tried virtual machines,
>> AWS virtual machine instances, and physical machines. With each setup, I
>> get the same errors.
>>
>> Although with all three of these setups, basic Java jobs can be run (like
>> WordCount, PageRank), Python programs cannot be run because the files
>> needed to run them are not properly distributed to the worker nodes. I've
>> found that although the master node reads the Python libraries and plan
>> files (presumably to send them to the worker), the worker node never writes
>> any of those files to disk, despite the files being added to the list of
>> files in the distributed cache via DistributedCache.writeFileInfotoConfig
>> (which I found via remote debugging).
>>
>> When a Python program is run via pyflink, it executes but crashes as soon
>> as there is any sort of operation requiring mapping. The following
>> exception is thrown:
>>
>> 2016-07-17 09:39:50,857 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> MapPartition (PythonFlatMap -> PythonMap) (1/1)
>> (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
>> 2016-07-17 09:39:50,863 INFO
>>  org.apache.flink.runtime.jobmanager.JobManager                - Status of
>> job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49
>> EDT 2016) changed to FAILING.
>> java.lang.Exception: The user defined 'open()' method caused an
>> exception: An error occurred while copying the file.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>> <snip>
>> Caused by: java.lang.RuntimeException: An error occurred while copying
>> the file.
>> at
>> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>> at
>> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
>> <snip>
>> Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not
>> exist or the user running Flink ('gmon') has insufficient permissions to
>> access it.
>> at
>> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
>> at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
>> at
>> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
>> <snip>
>> ... 1 more
>>
>> If the pyflink library is manually copied into place at /tmp/flink, that
>> error will be replaced by the following:
>>
>> 2016-07-17 00:10:54,342 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> MapPartition (PythonFlatMap -> PythonMap) (1/1)
>> (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
>> 2016-07-17 00:10:54,348 INFO
>>  org.apache.flink.runtime.jobmanager.JobManager                - Status of
>> job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51
>> EDT 2016) changed to FAILING.
>> java.lang.Exception: The user defined 'open()' method caused an
>> exception: External process for task MapPartition (PythonFlatMap ->
>> PythonMap) terminated prematurely.
>> python3: can't open file
>> '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py':
>> [Errno 2] No such file or directory
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>> <snip>
>> Caused by: java.lang.RuntimeException: External process for task
>> MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
>> python3: can't open file
>> '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py':
>> [Errno 2] No such file or directory
>> at
>> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
>> at
>> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
>> at
>> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
>> <snip>
>> ... 3 more
>>
>> Both of these exceptions point to Flink not properly copying the
>> requested files. Has anyone else successfully run Python jobs on a Flink
>> cluster, or is there a bug preventing successful operation? Unfortunately,
>> I am relying on using a Flink cluster to run a Python job for some
>> scientific data that needs to be completed soon.
>>
>> Thank for your assistance,
>> Geoffrey
>>
>> On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> Please also post the job you're trying to run.
>>>
>>>
>>> On 17.07.2016 08:43, Geoffrey Mon wrote:
>>>
>>> The Java program I used to test DistributedCache was faulty since it
>>> actually created the cache from files on the machine on which the program
>>> was running (i.e. the worker node).
>>>
>>> I tried implementing a cluster again, this time using two actual
>>> machines instead of virtual machines. I found the same error of the Python
>>> libraries and plan file not being found in the temporary directory. Has
>>> anyone else been able to successfully set up a Flink cluster to run Python
>>> jobs? I've been beginning to suspect that there may be some issues with
>>> running Python jobs on Flink clusters that are present in Flink.
>>>
>>> Cheers,
>>> Geoffrey
>>>
>>> On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon < <geof...@gmail.com>
>>> geof...@gmail.com> wrote:
>>>
>>>> I wrote a simple Java plan that reads a file in the distributed cache
>>>> and uses the first line from that file in a map operation. Sure enough, it
>>>> works locally, but fails when the job is sent to a taskmanager on a worker
>>>> node. Since DistributedCache seems to work for everyone else, I'm thinking
>>>> that maybe some sort of file permissions are not properly set such that
>>>> Flink is not able to successfully write distributed cache files.
>>>>
>>>> I used inotify-tools to watch the temporary files directory on both the
>>>> master node and worker node. When the plan is being prepared, the
>>>> jobmanager node wrote the Python modules and plan file to the temporary
>>>> files directory. However, on the worker node, the directory tree was
>>>> created, but the job failed before any of the module or plan files were
>>>> even attempted to be written. Interestingly enough, there were no error
>>>> messages or warnings about the cache.
>>>>
>>>> Cheers,
>>>> Geoffrey
>>>>
>>>> On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler < <ches...@apache.org>
>>>> ches...@apache.org> wrote:
>>>>
>>>>> Could you write a java job that uses the Distributed cache to
>>>>> distribute files?
>>>>>
>>>>> If this fails then the DC is faulty, if it doesn't something in the
>>>>> Python API is wrong.
>>>>>
>>>>>
>>>>> On 15.07.2016 08:06, Geoffrey Mon wrote:
>>>>>
>>>>> I've come across similar issues when trying to set up Flink on Amazon
>>>>> EC2 instances. Presumably there is something wrong with my setup? Here is
>>>>> the flink-conf.yaml I am using:
>>>>>
>>>>> <https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml>
>>>>> https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml
>>>>>
>>>>> Thanks,
>>>>> Geoffrey
>>>>>
>>>>> On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon < <geof...@gmail.com>
>>>>> geof...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Here is the TaskManager log on pastebin:
>>>>>> <http://pastebin.com/XAJ56gn4>http://pastebin.com/XAJ56gn4
>>>>>>
>>>>>> I will look into whether the files were created.
>>>>>>
>>>>>> By the way, the cluster is made with virtual machines running on
>>>>>> BlueData EPIC. I don't know if that might be related to the problem.
>>>>>>
>>>>>> Thanks,
>>>>>> Geoffrey
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <
>>>>>> <ches...@apache.org>ches...@apache.org> wrote:
>>>>>>
>>>>>>> Hello Geoffrey,
>>>>>>>
>>>>>>> How often does this occur?
>>>>>>>
>>>>>>> Flink distributes the user-code and the python library using the
>>>>>>> Distributed Cache.
>>>>>>>
>>>>>>> Either the file is deleted right after being created for some
>>>>>>> reason, or the DC returns a file name before the file was created (which
>>>>>>> shouldn't happen, it should block it is available).
>>>>>>>
>>>>>>> If you are up to debugging this i would suggest looking into
>>>>>>> FileCache class and verifying whether the file in question is in fact
>>>>>>> created.
>>>>>>>
>>>>>>> The logs of the TaskManager of which the exception occurs could be
>>>>>>> of interest too; could you send them to me?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Chesnay
>>>>>>>
>>>>>>>
>>>>>>> On 13.07.2016 04:11, Geoffrey Mon wrote:
>>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I've set up Flink on a very small cluster of one master node and
>>>>>>> five worker nodes, following the instructions in the documentation (
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
>>>>>>> I can run the included examples like WordCount and PageRank across the
>>>>>>> entire cluster, but when I try to run simple Python examples, I 
>>>>>>> sometimes
>>>>>>> get a strange error on the first PythonMapPartition about the temporary
>>>>>>> folders that contain the streams of data between Python and Java.
>>>>>>>
>>>>>>> If I run jobs on only the taskmanager on the master node, Python
>>>>>>> examples run fine. However, if the jobs use the worker nodes, then I get
>>>>>>> the following error:
>>>>>>>
>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>> program execution failed: Job execution failed.
>>>>>>> at
>>>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
>>>>>>> <snip>
>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>>> Job execution failed.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
>>>>>>> <snip>
>>>>>>> Caused by: java.lang.Exception: The user defined 'open()' method
>>>>>>> caused an exception: External process for task MapPartition (PythonMap)
>>>>>>> terminated prematurely.
>>>>>>> python: can't open file
>>>>>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
>>>>>>> [Errno 2] No such file or directory
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>>>>>>> <snip>
>>>>>>> Caused by: java.lang.RuntimeException: External process for task
>>>>>>> MapPartition (PythonMap) terminated prematurely.
>>>>>>> python: can't open file
>>>>>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
>>>>>>> [Errno 2] No such file or directory
>>>>>>> at
>>>>>>> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
>>>>>>> at
>>>>>>> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
>>>>>>> at
>>>>>>> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
>>>>>>> ... 5 more
>>>>>>>
>>>>>>> I'm suspecting this issue has something to do with the data sending
>>>>>>> between the master and the workers, but I haven't been able to find any
>>>>>>> solutions. Presumably the temporary files weren't received properly and
>>>>>>> thus were not created properly?
>>>>>>>
>>>>>>> Thanks in advance.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Geoffrey
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>
>>
>

Reply via email to