Issue with running Flink Python jobs on cluster

2016-07-12 Thread Geoffrey Mon
Hello all,

I've set up Flink on a very small cluster of one master node and five
worker nodes, following the instructions in the documentation (
https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
I can run the included examples like WordCount and PageRank across the
entire cluster, but when I try to run simple Python examples, I sometimes
get a strange error on the first PythonMapPartition about the temporary
folders that contain the streams of data between Python and Java.

If I run jobs on only the taskmanager on the master node, Python examples
run fine. However, if the jobs use the worker nodes, then I get the
following error:

org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)

Caused by: java.lang.Exception: The user defined 'open()' method caused an
exception: External process for task MapPartition (PythonMap) terminated
prematurely.
python: can't open file
'/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
[Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)

Caused by: java.lang.RuntimeException: External process for task
MapPartition (PythonMap) terminated prematurely.
python: can't open file
'/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
[Errno 2] No such file or directory
at
org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at
org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at
org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
... 5 more

I'm suspecting this issue has something to do with the data sending between
the master and the workers, but I haven't been able to find any solutions.
Presumably the temporary files weren't received properly and thus were not
created properly?

Thanks in advance.

Cheers,
Geoffrey


Re: Issue with running Flink Python jobs on cluster

2016-07-13 Thread Geoffrey Mon
Hello,

Here is the TaskManager log on pastebin:
http://pastebin.com/XAJ56gn4

I will look into whether the files were created.

By the way, the cluster is made with virtual machines running on BlueData
EPIC. I don't know if that might be related to the problem.

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler  wrote:

> Hello Geoffrey,
>
> How often does this occur?
>
> Flink distributes the user-code and the python library using the
> Distributed Cache.
>
> Either the file is deleted right after being created for some reason, or
> the DC returns a file name before the file was created (which shouldn't
> happen, it should block it is available).
>
> If you are up to debugging this i would suggest looking into FileCache
> class and verifying whether the file in question is in fact created.
>
> The logs of the TaskManager of which the exception occurs could be of
> interest too; could you send them to me?
>
> Regards,
> Chesnay
>
>
> On 13.07.2016 04:11, Geoffrey Mon wrote:
>
> Hello all,
>
> I've set up Flink on a very small cluster of one master node and five
> worker nodes, following the instructions in the documentation (
> https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
> I can run the included examples like WordCount and PageRank across the
> entire cluster, but when I try to run simple Python examples, I sometimes
> get a strange error on the first PythonMapPartition about the temporary
> folders that contain the streams of data between Python and Java.
>
> If I run jobs on only the taskmanager on the master node, Python examples
> run fine. However, if the jobs use the worker nodes, then I get the
> following error:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
> 
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
> 
> Caused by: java.lang.Exception: The user defined 'open()' method caused an
> exception: External process for task MapPartition (PythonMap) terminated
> prematurely.
> python: can't open file
> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
> [Errno 2] No such file or directory
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
> 
> Caused by: java.lang.RuntimeException: External process for task
> MapPartition (PythonMap) terminated prematurely.
> python: can't open file
> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
> [Errno 2] No such file or directory
> at
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
> at
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
> at
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
> ... 5 more
>
> I'm suspecting this issue has something to do with the data sending
> between the master and the workers, but I haven't been able to find any
> solutions. Presumably the temporary files weren't received properly and
> thus were not created properly?
>
> Thanks in advance.
>
> Cheers,
> Geoffrey
>
>
>


Re: Issue with running Flink Python jobs on cluster

2016-07-14 Thread Geoffrey Mon
I've come across similar issues when trying to set up Flink on Amazon EC2
instances. Presumably there is something wrong with my setup? Here is the
flink-conf.yaml I am using:
https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml

Thanks,
Geoffrey

On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon  wrote:

> Hello,
>
> Here is the TaskManager log on pastebin:
> http://pastebin.com/XAJ56gn4
>
> I will look into whether the files were created.
>
> By the way, the cluster is made with virtual machines running on BlueData
> EPIC. I don't know if that might be related to the problem.
>
> Thanks,
> Geoffrey
>
>
> On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler 
> wrote:
>
>> Hello Geoffrey,
>>
>> How often does this occur?
>>
>> Flink distributes the user-code and the python library using the
>> Distributed Cache.
>>
>> Either the file is deleted right after being created for some reason, or
>> the DC returns a file name before the file was created (which shouldn't
>> happen, it should block it is available).
>>
>> If you are up to debugging this i would suggest looking into FileCache
>> class and verifying whether the file in question is in fact created.
>>
>> The logs of the TaskManager of which the exception occurs could be of
>> interest too; could you send them to me?
>>
>> Regards,
>> Chesnay
>>
>>
>> On 13.07.2016 04:11, Geoffrey Mon wrote:
>>
>> Hello all,
>>
>> I've set up Flink on a very small cluster of one master node and five
>> worker nodes, following the instructions in the documentation (
>> https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
>> I can run the included examples like WordCount and PageRank across the
>> entire cluster, but when I try to run simple Python examples, I sometimes
>> get a strange error on the first PythonMapPartition about the temporary
>> folders that contain the streams of data between Python and Java.
>>
>> If I run jobs on only the taskmanager on the master node, Python examples
>> run fine. However, if the jobs use the worker nodes, then I get the
>> following error:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
>> 
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
>> 
>> Caused by: java.lang.Exception: The user defined 'open()' method caused
>> an exception: External process for task MapPartition (PythonMap) terminated
>> prematurely.
>> python: can't open file
>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
>> [Errno 2] No such file or directory
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>> 
>> Caused by: java.lang.RuntimeException: External process for task
>> MapPartition (PythonMap) terminated prematurely.
>> python: can't open file
>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
>> [Errno 2] No such file or directory
>> at
>> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
>> at
>> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
>> at
>> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:477)
>> ... 5 more
>>
>> I'm suspecting this issue has something to do with the data sending
>> between the master and the workers, but I haven't been able to find any
>> solutions. Presumably the temporary files weren't received properly and
>> thus were not created properly?
>>
>> Thanks in advance.
>>
>> Cheers,
>> Geoffrey
>>
>>
>>


Re: Issue with running Flink Python jobs on cluster

2016-07-15 Thread Geoffrey Mon
I wrote a simple Java plan that reads a file in the distributed cache and
uses the first line from that file in a map operation. Sure enough, it
works locally, but fails when the job is sent to a taskmanager on a worker
node. Since DistributedCache seems to work for everyone else, I'm thinking
that maybe some sort of file permissions are not properly set such that
Flink is not able to successfully write distributed cache files.

I used inotify-tools to watch the temporary files directory on both the
master node and worker node. When the plan is being prepared, the
jobmanager node wrote the Python modules and plan file to the temporary
files directory. However, on the worker node, the directory tree was
created, but the job failed before any of the module or plan files were
even attempted to be written. Interestingly enough, there were no error
messages or warnings about the cache.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler  wrote:

> Could you write a java job that uses the Distributed cache to distribute
> files?
>
> If this fails then the DC is faulty, if it doesn't something in the Python
> API is wrong.
>
>
> On 15.07.2016 08:06, Geoffrey Mon wrote:
>
> I've come across similar issues when trying to set up Flink on Amazon EC2
> instances. Presumably there is something wrong with my setup? Here is the
> flink-conf.yaml I am using:
>
> https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml
>
> Thanks,
> Geoffrey
>
> On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon < 
> geof...@gmail.com> wrote:
>
>> Hello,
>>
>> Here is the TaskManager log on pastebin:
>> http://pastebin.com/XAJ56gn4
>>
>> I will look into whether the files were created.
>>
>> By the way, the cluster is made with virtual machines running on BlueData
>> EPIC. I don't know if that might be related to the problem.
>>
>> Thanks,
>> Geoffrey
>>
>>
>> On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler 
>> wrote:
>>
>>> Hello Geoffrey,
>>>
>>> How often does this occur?
>>>
>>> Flink distributes the user-code and the python library using the
>>> Distributed Cache.
>>>
>>> Either the file is deleted right after being created for some reason, or
>>> the DC returns a file name before the file was created (which shouldn't
>>> happen, it should block it is available).
>>>
>>> If you are up to debugging this i would suggest looking into FileCache
>>> class and verifying whether the file in question is in fact created.
>>>
>>> The logs of the TaskManager of which the exception occurs could be of
>>> interest too; could you send them to me?
>>>
>>> Regards,
>>> Chesnay
>>>
>>>
>>> On 13.07.2016 04:11, Geoffrey Mon wrote:
>>>
>>> Hello all,
>>>
>>> I've set up Flink on a very small cluster of one master node and five
>>> worker nodes, following the instructions in the documentation (
>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
>>> I can run the included examples like WordCount and PageRank across the
>>> entire cluster, but when I try to run simple Python examples, I sometimes
>>> get a strange error on the first PythonMapPartition about the temporary
>>> folders that contain the streams of data between Python and Java.
>>>
>>> If I run jobs on only the taskmanager on the master node, Python
>>> examples run fine. However, if the jobs use the worker nodes, then I get
>>> the following error:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
>>> 
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
>>> 
>>> Caused by: java.lang.Exception: The user defined 'open()' method caused
>>> an exception: External process for task MapPartition (PythonMap) terminated
>>> prematurely.
>>> python: can't open file
>>> '/home/bluedata/flink/tmp/flink-dist-cache-d1958549-3d58-4554-b286-cb4b1cdb9060/52feefa8892e61ba9a187f7684c84ada/flink/plan.py':
>>> [Errno 2] No such file or directory
>>>

Re: Issue with running Flink Python jobs on cluster

2016-07-16 Thread Geoffrey Mon
The Java program I used to test DistributedCache was faulty since it
actually created the cache from files on the machine on which the program
was running (i.e. the worker node).

I tried implementing a cluster again, this time using two actual machines
instead of virtual machines. I found the same error of the Python libraries
and plan file not being found in the temporary directory. Has anyone else
been able to successfully set up a Flink cluster to run Python jobs? I've
been beginning to suspect that there may be some issues with running Python
jobs on Flink clusters that are present in Flink.

Cheers,
Geoffrey

On Fri, Jul 15, 2016 at 11:28 AM Geoffrey Mon  wrote:

> I wrote a simple Java plan that reads a file in the distributed cache and
> uses the first line from that file in a map operation. Sure enough, it
> works locally, but fails when the job is sent to a taskmanager on a worker
> node. Since DistributedCache seems to work for everyone else, I'm thinking
> that maybe some sort of file permissions are not properly set such that
> Flink is not able to successfully write distributed cache files.
>
> I used inotify-tools to watch the temporary files directory on both the
> master node and worker node. When the plan is being prepared, the
> jobmanager node wrote the Python modules and plan file to the temporary
> files directory. However, on the worker node, the directory tree was
> created, but the job failed before any of the module or plan files were
> even attempted to be written. Interestingly enough, there were no error
> messages or warnings about the cache.
>
> Cheers,
> Geoffrey
>
> On Fri, Jul 15, 2016 at 4:15 AM Chesnay Schepler 
> wrote:
>
>> Could you write a java job that uses the Distributed cache to distribute
>> files?
>>
>> If this fails then the DC is faulty, if it doesn't something in the
>> Python API is wrong.
>>
>>
>> On 15.07.2016 08:06, Geoffrey Mon wrote:
>>
>> I've come across similar issues when trying to set up Flink on Amazon EC2
>> instances. Presumably there is something wrong with my setup? Here is the
>> flink-conf.yaml I am using:
>>
>> https://gist.githubusercontent.com/GEOFBOT/3ffc9b21214174ae750cc3fdb2625b71/raw/flink-conf.yaml
>>
>> Thanks,
>> Geoffrey
>>
>> On Wed, Jul 13, 2016 at 1:15 PM Geoffrey Mon < 
>> geof...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Here is the TaskManager log on pastebin:
>>> http://pastebin.com/XAJ56gn4
>>>
>>> I will look into whether the files were created.
>>>
>>> By the way, the cluster is made with virtual machines running on
>>> BlueData EPIC. I don't know if that might be related to the problem.
>>>
>>> Thanks,
>>> Geoffrey
>>>
>>>
>>> On Wed, Jul 13, 2016 at 6:12 AM Chesnay Schepler 
>>> wrote:
>>>
>>>> Hello Geoffrey,
>>>>
>>>> How often does this occur?
>>>>
>>>> Flink distributes the user-code and the python library using the
>>>> Distributed Cache.
>>>>
>>>> Either the file is deleted right after being created for some reason,
>>>> or the DC returns a file name before the file was created (which shouldn't
>>>> happen, it should block it is available).
>>>>
>>>> If you are up to debugging this i would suggest looking into FileCache
>>>> class and verifying whether the file in question is in fact created.
>>>>
>>>> The logs of the TaskManager of which the exception occurs could be of
>>>> interest too; could you send them to me?
>>>>
>>>> Regards,
>>>> Chesnay
>>>>
>>>>
>>>> On 13.07.2016 04:11, Geoffrey Mon wrote:
>>>>
>>>> Hello all,
>>>>
>>>> I've set up Flink on a very small cluster of one master node and five
>>>> worker nodes, following the instructions in the documentation (
>>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html).
>>>> I can run the included examples like WordCount and PageRank across the
>>>> entire cluster, but when I try to run simple Python examples, I sometimes
>>>> get a strange error on the first PythonMapPartition about the temporary
>>>> folders that contain the streams of data between Python and Java.
>>>>
>>>> If I run jobs on only the taskmanager on the master node, Python
>>>> examples run fine. However, if the jobs use the worker nodes, then I get
>>>> t

Re: Issue with running Flink Python jobs on cluster

2016-07-17 Thread Geoffrey Mon
I haven't yet figured out how to write a Java job to test DistributedCache
functionality between machines; I've only gotten worker nodes to create
caches from local files (on the same worker nodes), rather than on files
from the master node. The DistributedCache test I've been using (based on
the DistributedCacheTest unit test) is here:
https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7

I realized that this test only tested local files because I was getting an
error that the file used for the cache was not found until I created that
file on the worker node in the location specified in the plan.

I've been trying to run a simple Python example that does word counting:
https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f

I've tried three different setups so far: I've tried virtual machines, AWS
virtual machine instances, and physical machines. With each setup, I get
the same errors.

Although with all three of these setups, basic Java jobs can be run (like
WordCount, PageRank), Python programs cannot be run because the files
needed to run them are not properly distributed to the worker nodes. I've
found that although the master node reads the Python libraries and plan
files (presumably to send them to the worker), the worker node never writes
any of those files to disk, despite the files being added to the list of
files in the distributed cache via DistributedCache.writeFileInfotoConfig
(which I found via remote debugging).

When a Python program is run via pyflink, it executes but crashes as soon
as there is any sort of operation requiring mapping. The following
exception is thrown:

2016-07-17 09:39:50,857 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph-
MapPartition (PythonFlatMap -> PythonMap) (1/1)
(12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
2016-07-17 09:39:50,863 INFO
 org.apache.flink.runtime.jobmanager.JobManager- Status of
job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49
EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception:
An error occurred while copying the file.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)

Caused by: java.lang.RuntimeException: An error occurred while copying the
file.
at
org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
at
org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)

Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not
exist or the user running Flink ('gmon') has insufficient permissions to
access it.
at
org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
at
org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)

... 1 more

If the pyflink library is manually copied into place at /tmp/flink, that
error will be replaced by the following:

2016-07-17 00:10:54,342 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph-
MapPartition (PythonFlatMap -> PythonMap) (1/1)
(23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
2016-07-17 00:10:54,348 INFO
 org.apache.flink.runtime.jobmanager.JobManager- Status of
job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51
EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an exception:
External process for task MapPartition (PythonFlatMap -> PythonMap)
terminated prematurely.
python3: can't open file
'/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py':
[Errno 2] No such file or directory
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)

Caused by: java.lang.RuntimeException: External process for task
MapPartition (PythonFlatMap -> PythonMap) terminated prematurely.
python3: can't open file
'/tmp/flink-dist-cache-724a9274-f984-429c-a0bf-4f5f23c5cfbc/e072403ffec32bd14b54416b53cb46ae/flink/plan.py':
[Errno 2] No such file or directory
at
org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:144)
at
org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:92)
at
org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:48)

... 3 more

Both of these exceptions point to Flink not properly copying the requested
files. Has anyone else successfully run Python jobs on a Flink cluster, or
is there a bug preventing successful operation? Unfortunately, I am relying
on using a Flink cluster to run a Python job for some scientific data that
needs to be completed soon.

Thank for your assistance,
Geoffrey

On Sun, Jul 17, 2016 at 4:04 AM Chesnay Schep

Re: Issue with running Flink Python jobs on cluster

2016-07-18 Thread Geoffrey Mon
Hello Chesnay,

Thank you very much! With your help I've managed to set up a Flink cluster
that can run Python jobs successfully. I solved my issue by removing
local=True and installing HDFS in a separate cluster.

I don't think it was clearly mentioned in the documentation that HDFS was
required for Python-running clusters. Would it be a good idea to include
that in the documentation?

Cheers,
Geoffrey

On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler 
wrote:

> well now i know what the problem could be.
>
> You are trying to execute a job on a cluster (== not local), but have set
> the local flag to true.
> env.execute(local=True)
>
> Due to this flag the files are only copied into the tmp directory of the
> node where you execute the plan, and are thus not accessible from other
> worker nodes.
>
> In order to use the Python API on a cluster you *must* have a filesystem
> that is accessible by all workers (like HDFS) to which the files can be
> copied. From there they can be distributed to the nodes via the DC.
>
>
> On 17.07.2016 17:33, Geoffrey Mon wrote:
>
> I haven't yet figured out how to write a Java job to test DistributedCache
> functionality between machines; I've only gotten worker nodes to create
> caches from local files (on the same worker nodes), rather than on files
> from the master node. The DistributedCache test I've been using (based on
> the DistributedCacheTest unit test) is here:
> https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7
>
> I realized that this test only tested local files because I was getting an
> error that the file used for the cache was not found until I created that
> file on the worker node in the location specified in the plan.
>
> I've been trying to run a simple Python example that does word counting:
> https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f
>
> I've tried three different setups so far: I've tried virtual machines, AWS
> virtual machine instances, and physical machines. With each setup, I get
> the same errors.
>
> Although with all three of these setups, basic Java jobs can be run (like
> WordCount, PageRank), Python programs cannot be run because the files
> needed to run them are not properly distributed to the worker nodes. I've
> found that although the master node reads the Python libraries and plan
> files (presumably to send them to the worker), the worker node never writes
> any of those files to disk, despite the files being added to the list of
> files in the distributed cache via DistributedCache.writeFileInfotoConfig
> (which I found via remote debugging).
>
> When a Python program is run via pyflink, it executes but crashes as soon
> as there is any sort of operation requiring mapping. The following
> exception is thrown:
>
> 2016-07-17 09:39:50,857 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph-
> MapPartition (PythonFlatMap -> PythonMap) (1/1)
> (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
> 2016-07-17 09:39:50,863 INFO
>  org.apache.flink.runtime.jobmanager.JobManager- Status of
> job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49
> EDT 2016) changed to FAILING.
> java.lang.Exception: The user defined 'open()' method caused an exception:
> An error occurred while copying the file.
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
> 
> Caused by: java.lang.RuntimeException: An error occurred while copying the
> file.
> at
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
> at
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
> 
> Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not
> exist or the user running Flink ('gmon') has insufficient permissions to
> access it.
> at
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
> at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
> at
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
> 
> ... 1 more
>
> If the pyflink library is manually copied into place at /tmp/flink, that
> error will be replaced by the following:
>
> 2016-07-17 00:10:54,342 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph-
> MapPartition (PythonFlatMap -> PythonMap) (1/1)
> (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
> 2016-07-17 00:10:54,348 INFO
>  org.apache.flink.runtime.jobmanager.JobManager- Status of
> job e072403ffec32bd14b54416b53cb46ae (Flink Java Job at Sun Jul 17 00:10:51
> 

Many operations cause StackOverflowError with AWS EMR YARN cluster

2016-11-13 Thread Geoffrey Mon
Hello all,

I have a pretty complicated plan file using the Flink Python API running on
a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
dictionary learning algorithm and has to run a sequence of operations many
times; each sequence involves bulk iterations with join operations and
other more intensive operations, and depends on the results of the previous
sequence. I have found that when the number of times to run this sequence
of operations is high (e.g. 20) I get this exception:

Uncaught error from thread
[flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
java.lang.StackOverflowError
at 
java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
..

I assume this problem is caused by having to send too many serialized
operations between Java and Python. When using a Java implementation of the
same operations, I also get:

java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
at 
org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69)
at 
org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81)
at 
org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607)


The problem seems to caused by YARN's handling of memory, because I have
gotten the same Python implementation to work on a smaller, local virtual
cluster that is not using YARN, even though my local cluster has far fewer
computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR is
using. After the YARN job has failed, sometimes a python process is left on
the cluster using up most of the RAM.

How can I solve this issue? I am unsure of how to reduce the number of
operations while keeping the same functionality.

Thanks,
Geoffrey


Re: Many operations cause StackOverflowError with AWS EMR YARN cluster

2016-11-14 Thread Geoffrey Mon
Hi Ufuk,

The master instance of the cluster was also a m3.xlarge instance with 15 GB
RAM, which I would've expected to be enough. I have gotten the program to
run successfully on a personal virtual cluster where each node has 8 GB RAM
and where the master node was also a worker node, so the problem appears to
have something to do with YARN's memory behavior (such as on EMR).

Nevertheless, it would probably be a good idea to modify my code to reduce
its memory usage. When running my code on my local cluster, performance was
probably bottlenecked.

The job does use a for loop to run the core operations for a specific
number of times, specified as a command line parameter. If it helps, here
is my code:
Python: https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py
(L260
is the core for loop)
Java:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
(L120
is the core for loop)
I would expect the join operations to be a big cause of the excessive
memory usage.

Thanks!

Geoffrey

On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi  wrote:

> The Python API is in alpha state currently, so we would have to check if
> it is related specifically to that. Looping in Chesnay who worked on that.
>
> The JVM GC error happens on the client side as that's where the optimizer
> runs. How much memory does the client submitting the job have?
>
> How do you compose the job? Do you have nested loops, e.g. for() { ...
> bulk iteration Flink program }?
>
> – Ufuk
>
> On 14 November 2016 at 08:02:26, Geoffrey Mon (geof...@gmail.com) wrote:
> > Hello all,
> >
> > I have a pretty complicated plan file using the Flink Python API running
> on
> > a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> > dictionary learning algorithm and has to run a sequence of operations
> many
> > times; each sequence involves bulk iterations with join operations and
> > other more intensive operations, and depends on the results of the
> previous
> > sequence. I have found that when the number of times to run this sequence
> > of operations is high (e.g. 20) I get this exception:
> >
> > Uncaught error from thread
> > [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
> > since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
> > java.lang.StackOverflowError
> > at
> java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> > at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > ..
> >
> > I assume this problem is caused by having to send too many serialized
> > operations between Java and Python. When using a Java implementation of
> the
> > same operations, I also get:
> >
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> > at
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
> > at
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
> > at
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90)
> > at
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69)
> > at
> org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81)
> > at
> org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607)
> > 
> >
> > The problem seems to caused by YARN's handling of memory, because I have
> > gotten the same Python implementation to work on a smaller, local virtual
> > cluster that is not using YARN, even though my local cluster has far
> fewer
> > computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR
> is
> > using. After the YARN job has failed, sometimes a python process is left
> on
> > the cluster using up most of the RAM.
> >
> > How can I solve this issue? I am unsure of how to reduce the number of
> > operations while keeping the same functionality.
> >
> > Thanks,
> > Geoffrey
> >
>
>


Re: Many operations cause StackOverflowError with AWS EMR YARN cluster

2016-11-20 Thread Geoffrey Mon
Hello,

I know that the reuse of the data set in my plan is causing the problem
(after one dictionary atom is learned using the data set "S", "S" is
updated for use with the next dictionary atom). When I comment out the line
updating the data set "S", I have no problem and the plan processing phase
takes substantially less time.

I assume that this is because updating and reusing "S" makes the graph of
transformations much more complicated and forces the optimizer to do much
more work, since for example the final value of "S" depends on all previous
operations combined. Is there a way to replace the for loop in my plan so
that I don't cause this complication and so that memory usage is
manageable? I considered making "S" an iterative data set, but I need to
save each dictionary atom to a file, and I wouldn't be able to do that if
"S" was iterative and not finalized.

Perhaps I would be able to collect "S" at the end of each dictionary atom
and then make the new "S" directly from these values. This however would
require that "collect" be implemented in the Python API.

In addition, I don't think the problem is YARN-specific anymore because I
have been able to reproduce it on a local machine.

Cheers,
Geoffrey

On Mon, Nov 14, 2016 at 11:38 AM Geoffrey Mon  wrote:

> Hi Ufuk,
>
> The master instance of the cluster was also a m3.xlarge instance with 15
> GB RAM, which I would've expected to be enough. I have gotten the program
> to run successfully on a personal virtual cluster where each node has 8 GB
> RAM and where the master node was also a worker node, so the problem
> appears to have something to do with YARN's memory behavior (such as on
> EMR).
>
> Nevertheless, it would probably be a good idea to modify my code to reduce
> its memory usage. When running my code on my local cluster, performance was
> probably bottlenecked.
>
> The job does use a for loop to run the core operations for a specific
> number of times, specified as a command line parameter. If it helps, here
> is my code:
> Python:
> https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py (L260
> is the core for loop)
> Java:
> https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
>  (L120
> is the core for loop)
> I would expect the join operations to be a big cause of the excessive
> memory usage.
>
> Thanks!
>
> Geoffrey
>
>
> On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi  wrote:
>
> The Python API is in alpha state currently, so we would have to check if
> it is related specifically to that. Looping in Chesnay who worked on that.
>
> The JVM GC error happens on the client side as that's where the optimizer
> runs. How much memory does the client submitting the job have?
>
> How do you compose the job? Do you have nested loops, e.g. for() { ...
> bulk iteration Flink program }?
>
> – Ufuk
>
> On 14 November 2016 at 08:02:26, Geoffrey Mon (geof...@gmail.com) wrote:
> > Hello all,
> >
> > I have a pretty complicated plan file using the Flink Python API running
> on
> > a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> > dictionary learning algorithm and has to run a sequence of operations
> many
> > times; each sequence involves bulk iterations with join operations and
> > other more intensive operations, and depends on the results of the
> previous
> > sequence. I have found that when the number of times to run this sequence
> > of operations is high (e.g. 20) I get this exception:
> >
> > Uncaught error from thread
> > [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
> > since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
> > java.lang.StackOverflowError
> > at
> java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> > at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > ..
> >
> > I assume this problem is caused by having to send too many serialized
> > operations between Java and Python. When using a Java implementation of
> the
&

Re: Many operations cause StackOverflowError with AWS EMR YARN cluster

2017-01-26 Thread Geoffrey Mon
Hello Chesnay,

Thanks for the advice. I've begun adding multiple jobs per Python plan file
here: https://issues.apache.org/jira/browse/FLINK-5183 and
https://github.com/GEOFBOT/flink/tree/FLINK-5183

The functionality of the patch works. I am able to run multiple jobs per
file successfully, but the process doesn't exit once the jobs are done.
This is because the main issue that I am encountering is that although I
added a check for PythonPlanBinder to check for more jobs from the Python
process until the Python process exits, there is a race condition where
Python process usually exits after Java checks to see if it is still
running. Therefore, unless you use a debugger to pause the Java process
until the Python process exits or some other phenomenon happens where
Python exits fast enough, the Java process thinks that the Python process
is still alive and will end up waiting indefinitely for more Python jobs.

Another one minor comment I had with my own patch was that I used global
variables to keep track of the number of execution environments and to
differentiate between different environments. Is there a better way to do
this?

Thanks!

Cheers,
Geoffrey

On Wed, Nov 23, 2016 at 5:41 AM, Chesnay Schepler 
wrote:

Hello,

implementing collect() in python is not that trivial and the gain is
questionable. There is an inherent size limit (think 10mb), and it is
a bit at odds with the deployment model of the Python API.

Something easier would be to execute each iteration of the for-loop as a
separate job and save the result in a file.
Note that right now the Pyhton API can't execute multiple jobs from the
same file; we would need some modifications
in the PythonPlanBinder to allow this.

Regards,
Chesnay


On 20.11.2016 23:54, Geoffrey Mon wrote:

Hello,

I know that the reuse of the data set in my plan is causing the problem
(after one dictionary atom is learned using the data set "S", "S" is
updated for use with the next dictionary atom). When I comment out the line
updating the data set "S", I have no problem and the plan processing phase
takes substantially less time.

I assume that this is because updating and reusing "S" makes the graph of
transformations much more complicated and forces the optimizer to do much
more work, since for example the final value of "S" depends on all previous
operations combined. Is there a way to replace the for loop in my plan so
that I don't cause this complication and so that memory usage is
manageable? I considered making "S" an iterative data set, but I need to
save each dictionary atom to a file, and I wouldn't be able to do that if
"S" was iterative and not finalized.

Perhaps I would be able to collect "S" at the end of each dictionary atom
and then make the new "S" directly from these values. This however would
require that "collect" be implemented in the Python API.

In addition, I don't think the problem is YARN-specific anymore because I
have been able to reproduce it on a local machine.

Cheers,
Geoffrey

On Mon, Nov 14, 2016 at 11:38 AM Geoffrey Mon  wrote:

Hi Ufuk,

The master instance of the cluster was also a m3.xlarge instance with 15 GB
RAM, which I would've expected to be enough. I have gotten the program to
run successfully on a personal virtual cluster where each node has 8 GB RAM
and where the master node was also a worker node, so the problem appears to
have something to do with YARN's memory behavior (such as on EMR).

Nevertheless, it would probably be a good idea to modify my code to reduce
its memory usage. When running my code on my local cluster, performance was
probably bottlenecked.

The job does use a for loop to run the core operations for a specific
number of times, specified as a command line parameter. If it helps, here
is my code:
Python: https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py
(L260
is the core for loop)
Java:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
(L120
is the core for loop)
I would expect the join operations to be a big cause of the excessive
memory usage.

Thanks!

Geoffrey


On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi  wrote:

The Python API is in alpha state currently, so we would have to check if it
is related specifically to that. Looping in Chesnay who worked on that.

The JVM GC error happens on the client side as that's where the optimizer
runs. How much memory does the client submitting the job have?

How do you compose the job? Do you have nested loops, e.g. for() { ... bulk
iteration Flink program }?

– Ufuk

On 14 November 2016 at 08:02:26, Geoffrey Mon ( 
geof...@gmail.com) wrote:
> Hello all,
>
> I have a pretty complicated plan file using the Flink Python API running
on
> a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> dictionary learning algori

"Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

2017-02-09 Thread Geoffrey Mon
Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job
can be found here if it would help in any way:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by
the previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it
completes, the second job is submitted by the YARN client:


02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
- Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
- TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
- All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
- Submitting job with JobID:
b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://flink@
.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works
for me), then the second job runs fine. However, if the input file for my
first job is large and the first job takes more than a minute or so to
complete, Flink will not acknowledge receiving the next job; the web Flink
console does not show any new jobs and Flink logs do not mention receiving
any new jobs after the first job has completed. The YARN client's job
submission times out after Flink does not respond:

Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
Job submission to the JobManager timed out. You may increase
'akka.client.timeout' in case the JobManager needs more time to configure
and confirm the job submission.
at
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)

I have tried increasing akka.client.timeout to large values such as 1200s
(20 minutes), but even then Flink does not acknowledge or execute any other
jobs and there is the same timeout error. Does anyone know how I can get
Flink to execute all of the jobs properly?

Cheers,
Geoffrey Mon


Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

2017-02-13 Thread Geoffrey Mon
Just to clarify, is Flink designed to allow submitting multiple jobs from a
single program class when using a YARN cluster? I wasn't sure based on the
documentation.

Cheers,
Geoffrey

On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon  wrote:

> Hello all,
>
> I'm running a Flink plan made up of multiple jobs. The source for my job
> can be found here if it would help in any way:
> https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
> Each of the jobs (except for the first job) depends on files generated by
> the previous job; I'm running it on an AWS EMR cluster using YARN.
>
> When I submit the plan file, the first job runs as planned. After it
> completes, the second job is submitted by the YARN client:
>
> 
> 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
> 02/09/2017 16:39:43 Job execution switched to status FINISHED.
> 2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
>   - Waiting until all TaskManagers have connected
> Waiting until all TaskManagers have connected
> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>   - TaskManager status (5/5)
> TaskManager status (5/5)
> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>   - All TaskManagers are connected
> All TaskManagers are connected
> 2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
>   - Submitting job with JobID:
> b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
> Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@
> .ec2.internal:35598/user/jobmanager#68430682]
>
> If the input file is small and the first job runs quickly (~1 minute works
> for me), then the second job runs fine. However, if the input file for my
> first job is large and the first job takes more than a minute or so to
> complete, Flink will not acknowledge receiving the next job; the web Flink
> console does not show any new jobs and Flink logs do not mention receiving
> any new jobs after the first job has completed. The YARN client's job
> submission times out after Flink does not respond:
>
> Caused by:
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the JobManager timed out. You may increase
> 'akka.client.timeout' in case the JobManager needs more time to configure
> and confirm the job submission.
> at
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
> at
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
> at
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
> at
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>
> I have tried increasing akka.client.timeout to large values such as 1200s
> (20 minutes), but even then Flink does not acknowledge or execute any other
> jobs and there is the same timeout error. Does anyone know how I can get
> Flink to execute all of the jobs properly?
>
> Cheers,
> Geoffrey Mon
>


Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

2017-02-16 Thread Geoffrey Mon
Hi Robert,

Thanks for your reply. I've done some further testing and (hopefully)
solved the issue; this turned out to be a red herring.  After discovering
that the same issue manifested itself when testing on my local machine, I
found that multiple jobs can be submitted from a main() function for both
temporary and permanent Flink YARN clusters, and that the issue was not
with Flink or with YARN, but with my job file.

In one part of my job, I need to fill in missing components of a vector
with zeroes. I did this by combining the vector DataSet with another
DataSet containing indexed zeroes using a union operation and an
aggregation operation. In my problematic job, I used
ExecutionEnvironment#fromElements to make a DataSet out of an ArrayList of
Tuples containing an index and a zero. However, for input files with very
large parameters, I needed to generate very large length DataSets of
zeroes, and since I was using fromElements, the client needed to send the
Flink runtime all of the elements with which to create the DataSet (lots
and lots of zeroes). This caused the job to time out before execution,
making me think that the job had not been properly received by the runtime.

I've replaced this with ExecutionEnvironment#generateSequence and a map
function mapping each number of the generated sequence to a tuple with a
zero. This has solved the issue and my job seems to be running fine for now.
(
https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370
)

Again, thank you very much for your help.

Sincerely,
Geoffrey

On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger  wrote:

Hi Geoffrey,

I think the "per job yarn cluster" feature does probably not work for one
main() function submitting multiple jobs.
If you have a yarn session + regular "flink run" it should work.

On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon  wrote:

Just to clarify, is Flink designed to allow submitting multiple jobs from a
single program class when using a YARN cluster? I wasn't sure based on the
documentation.

Cheers,
Geoffrey


On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon  wrote:

Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job
can be found here if it would help in any way:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by
the previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it
completes, the second job is submitted by the YARN client:


02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
- Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
- TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
- All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
- Submitting job with JobID:
b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://flink@
.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works
for me), then the second job runs fine. However, if the input file for my
first job is large and the first job takes more than a minute or so to
complete, Flink will not acknowledge receiving the next job; the web Flink
console does not show any new jobs and Flink logs do not mention receiving
any new jobs after the first job has completed. The YARN client's job
submission times out after Flink does not respond:

Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
Job submission to the JobManager timed out. You may increase
'akka.client.timeout' in case the JobManager needs more time to configure
and confirm the job submission.
at
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)

I have tried increasing akka.client.timeout to large values such as 1200s
(20 minut

Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

2017-02-17 Thread Geoffrey Mon
Hi Gordon,

I was using a Flink session that lasted as long as the plan jar was still
running (which I believe would be a "per job yarn cluster"), by submitting
a command to EMR that looked like:
flink run -m yarn-cluster -yn 5 [jar] [jar arguments]

Cheers,
Geoffrey

On Fri, Feb 17, 2017 at 12:09 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Geoffrey,
>
> Thanks for investigating and updating on this. Good to know that it is
> working!
>
> Just to clarify, was your series of jobs submitted to a “yarn session +
> regular bin/flink run”, or “per job yarn cluster”?
> I’m asking just to make sure of the limitations Robert mentioned.
>
> Cheers,
> Gordon
>
>
> On February 17, 2017 at 3:37:27 AM, Geoffrey Mon (geof...@gmail.com)
> wrote:
>
> Hi Robert,
>
> Thanks for your reply. I've done some further testing and (hopefully)
> solved the issue; this turned out to be a red herring.  After discovering
> that the same issue manifested itself when testing on my local machine, I
> found that multiple jobs can be submitted from a main() function for both
> temporary and permanent Flink YARN clusters, and that the issue was not
> with Flink or with YARN, but with my job file.
>
> In one part of my job, I need to fill in missing components of a vector
> with zeroes. I did this by combining the vector DataSet with another
> DataSet containing indexed zeroes using a union operation and an
> aggregation operation. In my problematic job, I used
> ExecutionEnvironment#fromElements to make a DataSet out of an ArrayList of
> Tuples containing an index and a zero. However, for input files with very
> large parameters, I needed to generate very large length DataSets of
> zeroes, and since I was using fromElements, the client needed to send the
> Flink runtime all of the elements with which to create the DataSet (lots
> and lots of zeroes). This caused the job to time out before execution,
> making me think that the job had not been properly received by the runtime.
>
> I've replaced this with ExecutionEnvironment#generateSequence and a map
> function mapping each number of the generated sequence to a tuple with a
> zero. This has solved the issue and my job seems to be running fine for now.
> (
> https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370
> )
>
> Again, thank you very much for your help.
>
> Sincerely,
> Geoffrey
>
> On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger 
> wrote:
>
> Hi Geoffrey,
>
> I think the "per job yarn cluster" feature does probably not work for one
> main() function submitting multiple jobs.
> If you have a yarn session + regular "flink run" it should work.
>
> On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon  wrote:
>
> Just to clarify, is Flink designed to allow submitting multiple jobs from
> a single program class when using a YARN cluster? I wasn't sure based on
> the documentation.
>
> Cheers,
> Geoffrey
>
>
> On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon  wrote:
>
> Hello all,
>
> I'm running a Flink plan made up of multiple jobs. The source for my job
> can be found here if it would help in any way:
> https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
> Each of the jobs (except for the first job) depends on files generated by
> the previous job; I'm running it on an AWS EMR cluster using YARN.
>
> When I submit the plan file, the first job runs as planned. After it
> completes, the second job is submitted by the YARN client:
>
> 
> 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
> 02/09/2017 16:39:43 Job execution switched to status FINISHED.
> 2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
>   - Waiting until all TaskManagers have connected
> Waiting until all TaskManagers have connected
> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>   - TaskManager status (5/5)
> TaskManager status (5/5)
> 2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
>   - All TaskManagers are connected
> All TaskManagers are connected
> 2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
>   - Submitting job with JobID:
> b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
> Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@
> .ec2.internal:35598/user/jobmanager#68430682]
>
> If the input file is small and the first job runs quickly (~1 minute works
> for me), then the sec