Hello all, I have a pretty complicated plan file using the Flink Python API running on a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a dictionary learning algorithm and has to run a sequence of operations many times; each sequence involves bulk iterations with join operations and other more intensive operations, and depends on the results of the previous sequence. I have found that when the number of times to run this sequence of operations is high (e.g. 20) I get this exception:
Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink] java.lang.StackOverflowError at java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ..........<snip similar traces>.................... I assume this problem is caused by having to send too many serialized operations between Java and Python. When using a Java implementation of the same operations, I also get: java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106) at org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99) at org.apache.flink.optimizer.plan.DualInputPlanNode.<init>(DualInputPlanNode.java:90) at org.apache.flink.optimizer.plan.DualInputPlanNode.<init>(DualInputPlanNode.java:69) at org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81) at org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607) ......<snip>...... The problem seems to caused by YARN's handling of memory, because I have gotten the same Python implementation to work on a smaller, local virtual cluster that is not using YARN, even though my local cluster has far fewer computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR is using. After the YARN job has failed, sometimes a python process is left on the cluster using up most of the RAM. How can I solve this issue? I am unsure of how to reduce the number of operations while keeping the same functionality. Thanks, Geoffrey