Re: Many operations cause StackOverflowError with AWS EMR YARN cluster

2017-01-26 Thread Geoffrey Mon
Hello Chesnay,

Thanks for the advice. I've begun adding multiple jobs per Python plan file
here: https://issues.apache.org/jira/browse/FLINK-5183 and
https://github.com/GEOFBOT/flink/tree/FLINK-5183

The functionality of the patch works. I am able to run multiple jobs per
file successfully, but the process doesn't exit once the jobs are done.
This is because the main issue that I am encountering is that although I
added a check for PythonPlanBinder to check for more jobs from the Python
process until the Python process exits, there is a race condition where
Python process usually exits after Java checks to see if it is still
running. Therefore, unless you use a debugger to pause the Java process
until the Python process exits or some other phenomenon happens where
Python exits fast enough, the Java process thinks that the Python process
is still alive and will end up waiting indefinitely for more Python jobs.

Another one minor comment I had with my own patch was that I used global
variables to keep track of the number of execution environments and to
differentiate between different environments. Is there a better way to do
this?

Thanks!

Cheers,
Geoffrey

On Wed, Nov 23, 2016 at 5:41 AM, Chesnay Schepler 
wrote:

Hello,

implementing collect() in python is not that trivial and the gain is
questionable. There is an inherent size limit (think 10mb), and it is
a bit at odds with the deployment model of the Python API.

Something easier would be to execute each iteration of the for-loop as a
separate job and save the result in a file.
Note that right now the Pyhton API can't execute multiple jobs from the
same file; we would need some modifications
in the PythonPlanBinder to allow this.

Regards,
Chesnay


On 20.11.2016 23:54, Geoffrey Mon wrote:

Hello,

I know that the reuse of the data set in my plan is causing the problem
(after one dictionary atom is learned using the data set "S", "S" is
updated for use with the next dictionary atom). When I comment out the line
updating the data set "S", I have no problem and the plan processing phase
takes substantially less time.

I assume that this is because updating and reusing "S" makes the graph of
transformations much more complicated and forces the optimizer to do much
more work, since for example the final value of "S" depends on all previous
operations combined. Is there a way to replace the for loop in my plan so
that I don't cause this complication and so that memory usage is
manageable? I considered making "S" an iterative data set, but I need to
save each dictionary atom to a file, and I wouldn't be able to do that if
"S" was iterative and not finalized.

Perhaps I would be able to collect "S" at the end of each dictionary atom
and then make the new "S" directly from these values. This however would
require that "collect" be implemented in the Python API.

In addition, I don't think the problem is YARN-specific anymore because I
have been able to reproduce it on a local machine.

Cheers,
Geoffrey

On Mon, Nov 14, 2016 at 11:38 AM Geoffrey Mon  wrote:

Hi Ufuk,

The master instance of the cluster was also a m3.xlarge instance with 15 GB
RAM, which I would've expected to be enough. I have gotten the program to
run successfully on a personal virtual cluster where each node has 8 GB RAM
and where the master node was also a worker node, so the problem appears to
have something to do with YARN's memory behavior (such as on EMR).

Nevertheless, it would probably be a good idea to modify my code to reduce
its memory usage. When running my code on my local cluster, performance was
probably bottlenecked.

The job does use a for loop to run the core operations for a specific
number of times, specified as a command line parameter. If it helps, here
is my code:
Python: https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py
(L260
is the core for loop)
Java:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
(L120
is the core for loop)
I would expect the join operations to be a big cause of the excessive
memory usage.

Thanks!

Geoffrey


On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi  wrote:

The Python API is in alpha state currently, so we would have to check if it
is related specifically to that. Looping in Chesnay who worked on that.

The JVM GC error happens on the client side as that's where the optimizer
runs. How much memory does the client submitting the job have?

How do you compose the job? Do you have nested loops, e.g. for() { ... bulk
iteration Flink program }?

– Ufuk

On 14 November 2016 at 08:02:26, Geoffrey Mon ( 
geof...@gmail.com) wrote:
> Hello all,
>
> I have a pretty complicated plan file using the Flink Python API running
on
> a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> dictionary learning algorithm and has to run a sequence of operations many
> times; each sequence involves bulk 

Re: Many operations cause StackOverflowError with AWS EMR YARN cluster

2016-11-23 Thread Chesnay Schepler

Hello,

implementing collect() in python is not that trivial and the gain is 
questionable. There is an inherent size limit (think 10mb), and it is

a bit at odds with the deployment model of the Python API.

Something easier would be to execute each iteration of the for-loop as a 
separate job and save the result in a file.
Note that right now the Pyhton API can't execute multiple jobs from the 
same file; we would need some modifications

in the PythonPlanBinder to allow this.

Regards,
Chesnay

On 20.11.2016 23:54, Geoffrey Mon wrote:

Hello,

I know that the reuse of the data set in my plan is causing the 
problem (after one dictionary atom is learned using the data set "S", 
"S" is updated for use with the next dictionary atom). When I comment 
out the line updating the data set "S", I have no problem and the plan 
processing phase takes substantially less time.


I assume that this is because updating and reusing "S" makes the graph 
of transformations much more complicated and forces the optimizer to 
do much more work, since for example the final value of "S" depends on 
all previous operations combined. Is there a way to replace the for 
loop in my plan so that I don't cause this complication and so that 
memory usage is manageable? I considered making "S" an iterative data 
set, but I need to save each dictionary atom to a file, and I wouldn't 
be able to do that if "S" was iterative and not finalized.


Perhaps I would be able to collect "S" at the end of each dictionary 
atom and then make the new "S" directly from these values. This 
however would require that "collect" be implemented in the Python API.


In addition, I don't think the problem is YARN-specific anymore 
because I have been able to reproduce it on a local machine.


Cheers,
Geoffrey

On Mon, Nov 14, 2016 at 11:38 AM Geoffrey Mon > wrote:


Hi Ufuk,

The master instance of the cluster was also a m3.xlarge instance
with 15 GB RAM, which I would've expected to be enough. I have
gotten the program to run successfully on a personal virtual
cluster where each node has 8 GB RAM and where the master node was
also a worker node, so the problem appears to have something to do
with YARN's memory behavior (such as on EMR).

Nevertheless, it would probably be a good idea to modify my code
to reduce its memory usage. When running my code on my local
cluster, performance was probably bottlenecked.

The job does use a for loop to run the core operations for a
specific number of times, specified as a command line parameter.
If it helps, here is my code:
Python:
https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py (L260
is the core for loop)
Java:

https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
 (L120
is the core for loop)
I would expect the join operations to be a big cause of the
excessive memory usage.

Thanks!

Geoffrey


On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi > wrote:

The Python API is in alpha state currently, so we would have
to check if it is related specifically to that. Looping in
Chesnay who worked on that.

The JVM GC error happens on the client side as that's where
the optimizer runs. How much memory does the client submitting
the job have?

How do you compose the job? Do you have nested loops, e.g.
for() { ... bulk iteration Flink program }?

– Ufuk

On 14 November 2016 at 08:02:26, Geoffrey Mon
(geof...@gmail.com ) wrote:
> Hello all,
>
> I have a pretty complicated plan file using the Flink Python
API running on
> a AWS EMR cluster of m3.xlarge instances using YARN. The
plan is for a
> dictionary learning algorithm and has to run a sequence of
operations many
> times; each sequence involves bulk iterations with join
operations and
> other more intensive operations, and depends on the results
of the previous
> sequence. I have found that when the number of times to run
this sequence
> of operations is high (e.g. 20) I get this exception:
>
> Uncaught error from thread
> [flink-akka.remote.default-remote-dispatcher-7] shutting
down JVM
> since 'akka.jvm-exit-on-fatal-error' is enabled for
ActorSystem[flink]
> java.lang.StackOverflowError
> at

java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
> at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
> at

java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at

Re: Many operations cause StackOverflowError with AWS EMR YARN cluster

2016-11-20 Thread Geoffrey Mon
Hello,

I know that the reuse of the data set in my plan is causing the problem
(after one dictionary atom is learned using the data set "S", "S" is
updated for use with the next dictionary atom). When I comment out the line
updating the data set "S", I have no problem and the plan processing phase
takes substantially less time.

I assume that this is because updating and reusing "S" makes the graph of
transformations much more complicated and forces the optimizer to do much
more work, since for example the final value of "S" depends on all previous
operations combined. Is there a way to replace the for loop in my plan so
that I don't cause this complication and so that memory usage is
manageable? I considered making "S" an iterative data set, but I need to
save each dictionary atom to a file, and I wouldn't be able to do that if
"S" was iterative and not finalized.

Perhaps I would be able to collect "S" at the end of each dictionary atom
and then make the new "S" directly from these values. This however would
require that "collect" be implemented in the Python API.

In addition, I don't think the problem is YARN-specific anymore because I
have been able to reproduce it on a local machine.

Cheers,
Geoffrey

On Mon, Nov 14, 2016 at 11:38 AM Geoffrey Mon  wrote:

> Hi Ufuk,
>
> The master instance of the cluster was also a m3.xlarge instance with 15
> GB RAM, which I would've expected to be enough. I have gotten the program
> to run successfully on a personal virtual cluster where each node has 8 GB
> RAM and where the master node was also a worker node, so the problem
> appears to have something to do with YARN's memory behavior (such as on
> EMR).
>
> Nevertheless, it would probably be a good idea to modify my code to reduce
> its memory usage. When running my code on my local cluster, performance was
> probably bottlenecked.
>
> The job does use a for loop to run the core operations for a specific
> number of times, specified as a command line parameter. If it helps, here
> is my code:
> Python:
> https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py (L260
> is the core for loop)
> Java:
> https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
>  (L120
> is the core for loop)
> I would expect the join operations to be a big cause of the excessive
> memory usage.
>
> Thanks!
>
> Geoffrey
>
>
> On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi  wrote:
>
> The Python API is in alpha state currently, so we would have to check if
> it is related specifically to that. Looping in Chesnay who worked on that.
>
> The JVM GC error happens on the client side as that's where the optimizer
> runs. How much memory does the client submitting the job have?
>
> How do you compose the job? Do you have nested loops, e.g. for() { ...
> bulk iteration Flink program }?
>
> – Ufuk
>
> On 14 November 2016 at 08:02:26, Geoffrey Mon (geof...@gmail.com) wrote:
> > Hello all,
> >
> > I have a pretty complicated plan file using the Flink Python API running
> on
> > a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> > dictionary learning algorithm and has to run a sequence of operations
> many
> > times; each sequence involves bulk iterations with join operations and
> > other more intensive operations, and depends on the results of the
> previous
> > sequence. I have found that when the number of times to run this sequence
> > of operations is high (e.g. 20) I get this exception:
> >
> > Uncaught error from thread
> > [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
> > since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
> > java.lang.StackOverflowError
> > at
> java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> > at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > ..
> >
> > I assume this problem is caused by having to send too many serialized
> > operations between Java and Python. When using a Java implementation of
> the
> > same operations, I also get:
> >
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> > at
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
> > at
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
> > at
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90)
> > at
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69)
> > at
> 

Re: Many operations cause StackOverflowError with AWS EMR YARN cluster

2016-11-14 Thread Geoffrey Mon
Hi Ufuk,

The master instance of the cluster was also a m3.xlarge instance with 15 GB
RAM, which I would've expected to be enough. I have gotten the program to
run successfully on a personal virtual cluster where each node has 8 GB RAM
and where the master node was also a worker node, so the problem appears to
have something to do with YARN's memory behavior (such as on EMR).

Nevertheless, it would probably be a good idea to modify my code to reduce
its memory usage. When running my code on my local cluster, performance was
probably bottlenecked.

The job does use a for loop to run the core operations for a specific
number of times, specified as a command line parameter. If it helps, here
is my code:
Python: https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py
(L260
is the core for loop)
Java:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
(L120
is the core for loop)
I would expect the join operations to be a big cause of the excessive
memory usage.

Thanks!

Geoffrey

On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi  wrote:

> The Python API is in alpha state currently, so we would have to check if
> it is related specifically to that. Looping in Chesnay who worked on that.
>
> The JVM GC error happens on the client side as that's where the optimizer
> runs. How much memory does the client submitting the job have?
>
> How do you compose the job? Do you have nested loops, e.g. for() { ...
> bulk iteration Flink program }?
>
> – Ufuk
>
> On 14 November 2016 at 08:02:26, Geoffrey Mon (geof...@gmail.com) wrote:
> > Hello all,
> >
> > I have a pretty complicated plan file using the Flink Python API running
> on
> > a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> > dictionary learning algorithm and has to run a sequence of operations
> many
> > times; each sequence involves bulk iterations with join operations and
> > other more intensive operations, and depends on the results of the
> previous
> > sequence. I have found that when the number of times to run this sequence
> > of operations is high (e.g. 20) I get this exception:
> >
> > Uncaught error from thread
> > [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
> > since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
> > java.lang.StackOverflowError
> > at
> java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> > at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> > at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> > ..
> >
> > I assume this problem is caused by having to send too many serialized
> > operations between Java and Python. When using a Java implementation of
> the
> > same operations, I also get:
> >
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> > at
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
> > at
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
> > at
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90)
> > at
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69)
> > at
> org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81)
> > at
> org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607)
> > 
> >
> > The problem seems to caused by YARN's handling of memory, because I have
> > gotten the same Python implementation to work on a smaller, local virtual
> > cluster that is not using YARN, even though my local cluster has far
> fewer
> > computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR
> is
> > using. After the YARN job has failed, sometimes a python process is left
> on
> > the cluster using up most of the RAM.
> >
> > How can I solve this issue? I am unsure of how to reduce the number of
> > operations while keeping the same functionality.
> >
> > Thanks,
> > Geoffrey
> >
>
>


Re: Many operations cause StackOverflowError with AWS EMR YARN cluster

2016-11-14 Thread Ufuk Celebi
The Python API is in alpha state currently, so we would have to check if it is 
related specifically to that. Looping in Chesnay who worked on that.

The JVM GC error happens on the client side as that's where the optimizer runs. 
How much memory does the client submitting the job have?

How do you compose the job? Do you have nested loops, e.g. for() { ... bulk 
iteration Flink program }?

– Ufuk

On 14 November 2016 at 08:02:26, Geoffrey Mon (geof...@gmail.com) wrote:
> Hello all,
>  
> I have a pretty complicated plan file using the Flink Python API running on
> a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> dictionary learning algorithm and has to run a sequence of operations many
> times; each sequence involves bulk iterations with join operations and
> other more intensive operations, and depends on the results of the previous
> sequence. I have found that when the number of times to run this sequence
> of operations is high (e.g. 20) I get this exception:
>  
> Uncaught error from thread
> [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
> since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
> java.lang.StackOverflowError
> at 
> java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)  
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)  
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)  
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)  
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)  
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)  
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)  
> ..
>  
> I assume this problem is caused by having to send too many serialized
> operations between Java and Python. When using a Java implementation of the
> same operations, I also get:
>  
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at 
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
>   
> at 
> org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
>   
> at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90) 
>  
> at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69) 
>  
> at 
> org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81)
>   
> at 
> org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607)
>   
> 
>  
> The problem seems to caused by YARN's handling of memory, because I have
> gotten the same Python implementation to work on a smaller, local virtual
> cluster that is not using YARN, even though my local cluster has far fewer
> computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR is
> using. After the YARN job has failed, sometimes a python process is left on
> the cluster using up most of the RAM.
>  
> How can I solve this issue? I am unsure of how to reduce the number of
> operations while keeping the same functionality.
>  
> Thanks,
> Geoffrey
>  



Many operations cause StackOverflowError with AWS EMR YARN cluster

2016-11-13 Thread Geoffrey Mon
Hello all,

I have a pretty complicated plan file using the Flink Python API running on
a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
dictionary learning algorithm and has to run a sequence of operations many
times; each sequence involves bulk iterations with join operations and
other more intensive operations, and depends on the results of the previous
sequence. I have found that when the number of times to run this sequence
of operations is high (e.g. 20) I get this exception:

Uncaught error from thread
[flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
java.lang.StackOverflowError
at 
java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
..

I assume this problem is caused by having to send too many serialized
operations between Java and Python. When using a Java implementation of the
same operations, I also get:

java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
at 
org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90)
at 
org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69)
at 
org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81)
at 
org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607)


The problem seems to caused by YARN's handling of memory, because I have
gotten the same Python implementation to work on a smaller, local virtual
cluster that is not using YARN, even though my local cluster has far fewer
computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR is
using. After the YARN job has failed, sometimes a python process is left on
the cluster using up most of the RAM.

How can I solve this issue? I am unsure of how to reduce the number of
operations while keeping the same functionality.

Thanks,
Geoffrey