Hi! HDFS is mentioned in the docs but not explicitly listed as a requirement: https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/python.html#project-setup
I suppose the Python API could also distribute its libraries through Flink's BlobServer. Cheers, Max On Tue, Jul 19, 2016 at 9:24 AM, Chesnay Schepler <ches...@apache.org> wrote: > Glad to hear it! The HDFS requirement should most definitely be > documented; i assumed it already was actually... > > > On 19.07.2016 03:42, Geoffrey Mon wrote: > > Hello Chesnay, > > Thank you very much! With your help I've managed to set up a Flink cluster > that can run Python jobs successfully. I solved my issue by removing > local=True and installing HDFS in a separate cluster. > > I don't think it was clearly mentioned in the documentation that HDFS was > required for Python-running clusters. Would it be a good idea to include > that in the documentation? > > Cheers, > Geoffrey > > On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler <ches...@apache.org> > wrote: > >> well now i know what the problem could be. >> >> You are trying to execute a job on a cluster (== not local), but have set >> the local flag to true. >> env.execute(local=True) >> >> Due to this flag the files are only copied into the tmp directory of the >> node where you execute the plan, and are thus not accessible from other >> worker nodes. >> >> In order to use the Python API on a cluster you *must* have a filesystem >> that is accessible by all workers (like HDFS) to which the files can be >> copied. From there they can be distributed to the nodes via the DC. >> >> >> On 17.07.2016 17:33, Geoffrey Mon wrote: >> >> I haven't yet figured out how to write a Java job to test >> DistributedCache functionality between machines; I've only gotten worker >> nodes to create caches from local files (on the same worker nodes), rather >> than on files from the master node. The DistributedCache test I've been >> using (based on the DistributedCacheTest unit test) is here: >> https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7 >> >> I realized that this test only tested local files because I was getting >> an error that the file used for the cache was not found until I created >> that file on the worker node in the location specified in the plan. >> >> I've been trying to run a simple Python example that does word counting: >> https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f >> >> I've tried three different setups so far: I've tried virtual machines, >> AWS virtual machine instances, and physical machines. With each setup, I >> get the same errors. >> >> Although with all three of these setups, basic Java jobs can be run (like >> WordCount, PageRank), Python programs cannot be run because the files >> needed to run them are not properly distributed to the worker nodes. I've >> found that although the master node reads the Python libraries and plan >> files (presumably to send them to the worker), the worker node never writes >> any of those files to disk, despite the files being added to the list of >> files in the distributed cache via DistributedCache.writeFileInfotoConfig >> (which I found via remote debugging). >> >> When a Python program is run via pyflink, it executes but crashes as soon >> as there is any sort of operation requiring mapping. The following >> exception is thrown: >> >> 2016-07-17 09:39:50,857 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> MapPartition (PythonFlatMap -> PythonMap) (1/1) >> (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED >> 2016-07-17 09:39:50,863 INFO >> org.apache.flink.runtime.jobmanager.JobManager - Status of >> job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49 >> EDT 2016) changed to FAILING. >> java.lang.Exception: The user defined 'open()' method caused an >> exception: An error occurred while copying the file. >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) >> <snip> >> Caused by: java.lang.RuntimeException: An error occurred while copying >> the file. >> at >> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) >> at >> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102) >> <snip> >> Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not >> exist or the user running Flink ('gmon') has insufficient permissions to >> access it. >> at >> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109) >> at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242) >> at >> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322) >> <snip> >> ... 1 more >> >> If the pyflink library is manually copied into place at /tmp/flink, that >> error will be replaced by the following: >> >> 2016-07-17 00:10:54,342 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> MapPartition (PythonFlatMap -> PythonMap) (1/1) >> (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED >> 2016-07-17 00:10:54,348 INFO >> org.apache.flink.runtime.jobmanager.JobManager - Status of >> job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51 >> EDT 2016) changed to FAILING. >> java.lang.Exception: The user defined 'open()' method caused an >> exception: External process for task MapPartition (PythonFlatMap -> >> PythonMap) terminated prematurely. >> python3: can't open file >> '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': >> [Errno 2] No such file or directory >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) >> <snip> >> Caused by: java.lang.RuntimeException: External process for task >> MapPartition (PythonFlatMap -> PythonMap) terminated prematurely. >> python3: can't open file >> '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': >> [Errno 2] No such file or directory >> at >> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144) >> at >> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92) >> at >> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48) >> <snip> >> ... 3 more >> >> Both of these exceptions point to Flink not properly copying the >> requested files. Has anyone else successfully run Python jobs on a Flink >> cluster, or is there a bug preventing successful operation? Unfortunately, >> I am relying on using a Flink cluster to run a Python job for some >> scientific data that needs to be completed soon. >> >> Thank for your assistance, >> Geoffrey >> >> On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> Please also post the job you're trying to run. >>> >>> >>> On 17.07.2016 08:43, Geoffrey Mon wrote: >>> >>> The Java program I used to test DistributedCache was faulty since it >>> actually created the cache from files on the machine on which the program >>> was running (i.e. the worker node). >>> >>> I tried implementing a cluster again, this time using two actual >>> machines instead of virtual machines. I found the same error of the Python >>> libraries and plan file not being found in the temporary directory. Has >>> anyone else been able to successfully set up a Flink cluster to run Python >>> jobs? I've been beginning to suspect that there may be some issues with >>> running Python jobs on Flink clusters that are present in Flink. >>> >>> Cheers, >>> Geoffrey >>> >>> On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon < <geof...@gmail.com> >>> geof...@gmail.com> wrote: >>> >>>> I wrote a simple Java plan that reads a file in the distributed cache >>>> and uses the first line from that file in a map operation. Sure enough, it >>>> works locally, but fails when the job is sent to a taskmanager on a worker >>>> node. Since DistributedCache seems to work for everyone else, I'm thinking >>>> that maybe some sort of file permissions are not properly set such that >>>> Flink is not able to successfully write distributed cache files. >>>> >>>> I used inotify-tools to watch the temporary files directory on both the >>>> master node and worker node. When the plan is being prepared, the >>>> jobmanager node wrote the Python modules and plan file to the temporary >>>> files directory. However, on the worker node, the directory tree was >>>> created, but the job failed before any of the module or plan files were >>>> even attempted to be written. Interestingly enough, there were no error >>>> messages or warnings about the cache. >>>> >>>> Cheers, >>>> Geoffrey >>>> >>>> On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler < <ches...@apache.org> >>>> ches...@apache.org> wrote: >>>> >>>>> Could you write a java job that uses the Distributed cache to >>>>> distribute files? >>>>> >>>>> If this fails then the DC is faulty, if it doesn't something in the >>>>> Python API is wrong. >>>>> >>>>> >>>>> On 15.07.2016 08:06, Geoffrey Mon wrote: >>>>> >>>>> I've come across similar issues when trying to set up Flink on Amazon >>>>> EC2 instances. Presumably there is something wrong with my setup? Here is >>>>> the flink-conf.yaml I am using: >>>>> >>>>> <https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml> >>>>> https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml >>>>> >>>>> Thanks, >>>>> Geoffrey >>>>> >>>>> On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon < <geof...@gmail.com> >>>>> geof...@gmail.com> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> Here is the TaskManager log on pastebin: >>>>>> <http://pastebin.com/XAJ56gn4>http://pastebin.com/XAJ56gn4 >>>>>> >>>>>> I will look into whether the files were created. >>>>>> >>>>>> By the way, the cluster is made with virtual machines running on >>>>>> BlueData EPIC. I don't know if that might be related to the problem. >>>>>> >>>>>> Thanks, >>>>>> Geoffrey >>>>>> >>>>>> >>>>>> On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler < >>>>>> <ches...@apache.org>ches...@apache.org> wrote: >>>>>> >>>>>>> Hello Geoffrey, >>>>>>> >>>>>>> How often does this occur? >>>>>>> >>>>>>> Flink distributes the user-code and the python library using the >>>>>>> Distributed Cache. >>>>>>> >>>>>>> Either the file is deleted right after being created for some >>>>>>> reason, or the DC returns a file name before the file was created (which >>>>>>> shouldn't happen, it should block it is available). >>>>>>> >>>>>>> If you are up to debugging this i would suggest looking into >>>>>>> FileCache class and verifying whether the file in question is in fact >>>>>>> created. >>>>>>> >>>>>>> The logs of the TaskManager of which the exception occurs could be >>>>>>> of interest too; could you send them to me? >>>>>>> >>>>>>> Regards, >>>>>>> Chesnay >>>>>>> >>>>>>> >>>>>>> On 13.07.2016 04:11, Geoffrey Mon wrote: >>>>>>> >>>>>>> Hello all, >>>>>>> >>>>>>> I've set up Flink on a very small cluster of one master node and >>>>>>> five worker nodes, following the instructions in the documentation ( >>>>>>> <https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html> >>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). >>>>>>> I can run the included examples like WordCount and PageRank across the >>>>>>> entire cluster, but when I try to run simple Python examples, I >>>>>>> sometimes >>>>>>> get a strange error on the first PythonMapPartition about the temporary >>>>>>> folders that contain the streams of data between Python and Java. >>>>>>> >>>>>>> If I run jobs on only the taskmanager on the master node, Python >>>>>>> examples run fine. However, if the jobs use the worker nodes, then I get >>>>>>> the following error: >>>>>>> >>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>> program execution failed: Job execution failed. >>>>>>> at >>>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378) >>>>>>> <snip> >>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>>>> Job execution failed. >>>>>>> at >>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806) >>>>>>> <snip> >>>>>>> Caused by: java.lang.Exception: The user defined 'open()' method >>>>>>> caused an exception: External process for task MapPartition (PythonMap) >>>>>>> terminated prematurely. >>>>>>> python: can't open file >>>>>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': >>>>>>> [Errno 2] No such file or directory >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) >>>>>>> <snip> >>>>>>> Caused by: java.lang.RuntimeException: External process for task >>>>>>> MapPartition (PythonMap) terminated prematurely. >>>>>>> python: can't open file >>>>>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': >>>>>>> [Errno 2] No such file or directory >>>>>>> at >>>>>>> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144) >>>>>>> at >>>>>>> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92) >>>>>>> at >>>>>>> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48) >>>>>>> at >>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477) >>>>>>> ... 5 more >>>>>>> >>>>>>> I'm suspecting this issue has something to do with the data sending >>>>>>> between the master and the workers, but I haven't been able to find any >>>>>>> solutions. Presumably the temporary files weren't received properly and >>>>>>> thus were not created properly? >>>>>>> >>>>>>> Thanks in advance. >>>>>>> >>>>>>> Cheers, >>>>>>> Geoffrey >>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> >>> >> >