The Python API is in alpha state currently, so we would have to check if it is related specifically to that. Looping in Chesnay who worked on that.
The JVM GC error happens on the client side as that's where the optimizer runs. How much memory does the client submitting the job have? How do you compose the job? Do you have nested loops, e.g. for() { ... bulk iteration Flink program }? – Ufuk On 14 November 2016 at 08:02:26, Geoffrey Mon (geof...@gmail.com) wrote: > Hello all, > > I have a pretty complicated plan file using the Flink Python API running on > a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a > dictionary learning algorithm and has to run a sequence of operations many > times; each sequence involves bulk iterations with join operations and > other more intensive operations, and depends on the results of the previous > sequence. I have found that when the number of times to run this sequence > of operations is high (e.g. 20) I get this exception: > > Uncaught error from thread > [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM > since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink] > java.lang.StackOverflowError > at > java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > .............................. > > I assume this problem is caused by having to send too many serialized > operations between Java and Python. When using a Java implementation of the > same operations, I also get: > > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106) > > at > org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99) > > at > org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90) > > at > org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69) > > at > org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81) > > at > org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607) > > ............ > > The problem seems to caused by YARN's handling of memory, because I have > gotten the same Python implementation to work on a smaller, local virtual > cluster that is not using YARN, even though my local cluster has far fewer > computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR is > using. After the YARN job has failed, sometimes a python process is left on > the cluster using up most of the RAM. > > How can I solve this issue? I am unsure of how to reduce the number of > operations while keeping the same functionality. > > Thanks, > Geoffrey >