Hello Chesnay, Thank you very much! With your help I've managed to set up a Flink cluster that can run Python jobs successfully. I solved my issue by removing local=True and installing HDFS in a separate cluster.
I don't think it was clearly mentioned in the documentation that HDFS was required for Python-running clusters. Would it be a good idea to include that in the documentation? Cheers, Geoffrey On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler <ches...@apache.org> wrote: > well now i know what the problem could be. > > You are trying to execute a job on a cluster (== not local), but have set > the local flag to true. > env.execute(local=True) > > Due to this flag the files are only copied into the tmp directory of the > node where you execute the plan, and are thus not accessible from other > worker nodes. > > In order to use the Python API on a cluster you *must* have a filesystem > that is accessible by all workers (like HDFS) to which the files can be > copied. From there they can be distributed to the nodes via the DC. > > > On 17.07.2016 17:33, Geoffrey Mon wrote: > > I haven't yet figured out how to write a Java job to test DistributedCache > functionality between machines; I've only gotten worker nodes to create > caches from local files (on the same worker nodes), rather than on files > from the master node. The DistributedCache test I've been using (based on > the DistributedCacheTest unit test) is here: > https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7 > > I realized that this test only tested local files because I was getting an > error that the file used for the cache was not found until I created that > file on the worker node in the location specified in the plan. > > I've been trying to run a simple Python example that does word counting: > https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f > > I've tried three different setups so far: I've tried virtual machines, AWS > virtual machine instances, and physical machines. With each setup, I get > the same errors. > > Although with all three of these setups, basic Java jobs can be run (like > WordCount, PageRank), Python programs cannot be run because the files > needed to run them are not properly distributed to the worker nodes. I've > found that although the master node reads the Python libraries and plan > files (presumably to send them to the worker), the worker node never writes > any of those files to disk, despite the files being added to the list of > files in the distributed cache via DistributedCache.writeFileInfotoConfig > (which I found via remote debugging). > > When a Python program is run via pyflink, it executes but crashes as soon > as there is any sort of operation requiring mapping. The following > exception is thrown: > > 2016-07-17 09:39:50,857 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > MapPartition (PythonFlatMap -> PythonMap) (1/1) > (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED > 2016-07-17 09:39:50,863 INFO > org.apache.flink.runtime.jobmanager.JobManager - Status of > job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49 > EDT 2016) changed to FAILING. > java.lang.Exception: The user defined 'open()' method caused an exception: > An error occurred while copying the file. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) > <snip> > Caused by: java.lang.RuntimeException: An error occurred while copying the > file. > at > org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102) > <snip> > Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not > exist or the user running Flink ('gmon') has insufficient permissions to > access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109) > at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322) > <snip> > ... 1 more > > If the pyflink library is manually copied into place at /tmp/flink, that > error will be replaced by the following: > > 2016-07-17 00:10:54,342 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > MapPartition (PythonFlatMap -> PythonMap) (1/1) > (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED > 2016-07-17 00:10:54,348 INFO > org.apache.flink.runtime.jobmanager.JobManager - Status of > job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51 > EDT 2016) changed to FAILING. > java.lang.Exception: The user defined 'open()' method caused an exception: > External process for task MapPartition (PythonFlatMap -> PythonMap) > terminated prematurely. > python3: can't open file > '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': > [Errno 2] No such file or directory > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) > <snip> > Caused by: java.lang.RuntimeException: External process for task > MapPartition (PythonFlatMap -> PythonMap) terminated prematurely. > python3: can't open file > '/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py': > [Errno 2] No such file or directory > at > org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92) > at > org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48) > <snip> > ... 3 more > > Both of these exceptions point to Flink not properly copying the requested > files. Has anyone else successfully run Python jobs on a Flink cluster, or > is there a bug preventing successful operation? Unfortunately, I am relying > on using a Flink cluster to run a Python job for some scientific data that > needs to be completed soon. > > Thank for your assistance, > Geoffrey > > On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schepler <ches...@apache.org> > wrote: > >> Please also post the job you're trying to run. >> >> >> On 17.07.2016 08:43, Geoffrey Mon wrote: >> >> The Java program I used to test DistributedCache was faulty since it >> actually created the cache from files on the machine on which the program >> was running (i.e. the worker node). >> >> I tried implementing a cluster again, this time using two actual machines >> instead of virtual machines. I found the same error of the Python libraries >> and plan file not being found in the temporary directory. Has anyone else >> been able to successfully set up a Flink cluster to run Python jobs? I've >> been beginning to suspect that there may be some issues with running Python >> jobs on Flink clusters that are present in Flink. >> >> Cheers, >> Geoffrey >> >> On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon <geof...@gmail.com> wrote: >> >>> I wrote a simple Java plan that reads a file in the distributed cache >>> and uses the first line from that file in a map operation. Sure enough, it >>> works locally, but fails when the job is sent to a taskmanager on a worker >>> node. Since DistributedCache seems to work for everyone else, I'm thinking >>> that maybe some sort of file permissions are not properly set such that >>> Flink is not able to successfully write distributed cache files. >>> >>> I used inotify-tools to watch the temporary files directory on both the >>> master node and worker node. When the plan is being prepared, the >>> jobmanager node wrote the Python modules and plan file to the temporary >>> files directory. However, on the worker node, the directory tree was >>> created, but the job failed before any of the module or plan files were >>> even attempted to be written. Interestingly enough, there were no error >>> messages or warnings about the cache. >>> >>> Cheers, >>> Geoffrey >>> >>> On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler <ches...@apache.org> >>> wrote: >>> >>>> Could you write a java job that uses the Distributed cache to >>>> distribute files? >>>> >>>> If this fails then the DC is faulty, if it doesn't something in the >>>> Python API is wrong. >>>> >>>> >>>> On 15.07.2016 08:06, Geoffrey Mon wrote: >>>> >>>> I've come across similar issues when trying to set up Flink on Amazon >>>> EC2 instances. Presumably there is something wrong with my setup? Here is >>>> the flink-conf.yaml I am using: >>>> >>>> https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml >>>> >>>> Thanks, >>>> Geoffrey >>>> >>>> On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon <geof...@gmail.com> wrote: >>>> >>>>> Hello, >>>>> >>>>> Here is the TaskManager log on pastebin: >>>>> http://pastebin.com/XAJ56gn4 >>>>> >>>>> I will look into whether the files were created. >>>>> >>>>> By the way, the cluster is made with virtual machines running on >>>>> BlueData EPIC. I don't know if that might be related to the problem. >>>>> >>>>> Thanks, >>>>> Geoffrey >>>>> >>>>> >>>>> On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler <ches...@apache.org> >>>>> wrote: >>>>> >>>>>> Hello Geoffrey, >>>>>> >>>>>> How often does this occur? >>>>>> >>>>>> Flink distributes the user-code and the python library using the >>>>>> Distributed Cache. >>>>>> >>>>>> Either the file is deleted right after being created for some reason, >>>>>> or the DC returns a file name before the file was created (which >>>>>> shouldn't >>>>>> happen, it should block it is available). >>>>>> >>>>>> If you are up to debugging this i would suggest looking into >>>>>> FileCache class and verifying whether the file in question is in fact >>>>>> created. >>>>>> >>>>>> The logs of the TaskManager of which the exception occurs could be of >>>>>> interest too; could you send them to me? >>>>>> >>>>>> Regards, >>>>>> Chesnay >>>>>> >>>>>> >>>>>> On 13.07.2016 04:11, Geoffrey Mon wrote: >>>>>> >>>>>> Hello all, >>>>>> >>>>>> I've set up Flink on a very small cluster of one master node and five >>>>>> worker nodes, following the instructions in the documentation ( >>>>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html). >>>>>> I can run the included examples like WordCount and PageRank across the >>>>>> entire cluster, but when I try to run simple Python examples, I sometimes >>>>>> get a strange error on the first PythonMapPartition about the temporary >>>>>> folders that contain the streams of data between Python and Java. >>>>>> >>>>>> If I run jobs on only the taskmanager on the master node, Python >>>>>> examples run fine. However, if the jobs use the worker nodes, then I get >>>>>> the following error: >>>>>> >>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>> program execution failed: Job execution failed. >>>>>> at >>>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378) >>>>>> <snip> >>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>>>>> execution failed. >>>>>> at >>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806) >>>>>> <snip> >>>>>> Caused by: java.lang.Exception: The user defined 'open()' method >>>>>> caused an exception: External process for task MapPartition (PythonMap) >>>>>> terminated prematurely. >>>>>> python: can't open file >>>>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': >>>>>> [Errno 2] No such file or directory >>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) >>>>>> <snip> >>>>>> Caused by: java.lang.RuntimeException: External process for task >>>>>> MapPartition (PythonMap) terminated prematurely. >>>>>> python: can't open file >>>>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py': >>>>>> [Errno 2] No such file or directory >>>>>> at >>>>>> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144) >>>>>> at >>>>>> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92) >>>>>> at >>>>>> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48) >>>>>> at >>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) >>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477) >>>>>> ... 5 more >>>>>> >>>>>> I'm suspecting this issue has something to do with the data sending >>>>>> between the master and the workers, but I haven't been able to find any >>>>>> solutions. Presumably the temporary files weren't received properly and >>>>>> thus were not created properly? >>>>>> >>>>>> Thanks in advance. >>>>>> >>>>>> Cheers, >>>>>> Geoffrey >>>>>> >>>>>> >>>>>> >>>> >>>> >> >