Hello all,

I have a pretty complicated plan file using the Flink Python API running on
a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
dictionary learning algorithm and has to run a sequence of operations many
times; each sequence involves bulk iterations with join operations and
other more intensive operations, and depends on the results of the previous
sequence. I have found that when the number of times to run this sequence
of operations is high (e.g. 20) I get this exception:

Uncaught error from thread
[flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
java.lang.StackOverflowError
        at 
java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
..........<snip similar traces>....................

I assume this problem is caused by having to send too many serialized
operations between Java and Python. When using a Java implementation of the
same operations, I also get:

java.lang.OutOfMemoryError: GC overhead limit exceeded
        at 
org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
        at 
org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
        at 
org.apache.flink.optimizer.plan.DualInputPlanNode.<init>(DualInputPlanNode.java:90)
        at 
org.apache.flink.optimizer.plan.DualInputPlanNode.<init>(DualInputPlanNode.java:69)
        at 
org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81)
        at 
org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607)
......<snip>......

The problem seems to caused by YARN's handling of memory, because I have
gotten the same Python implementation to work on a smaller, local virtual
cluster that is not using YARN, even though my local cluster has far fewer
computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR is
using. After the YARN job has failed, sometimes a python process is left on
the cluster using up most of the RAM.

How can I solve this issue? I am unsure of how to reduce the number of
operations while keeping the same functionality.

Thanks,
Geoffrey

Reply via email to