[ https://issues.apache.org/jira/browse/SPARK-4672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Reynold Xin updated SPARK-4672: ------------------------------- Comment: was deleted (was: User 'JerryLead' has created a pull request for this issue: https://github.com/apache/spark/pull/3544) > Cut off the super long serialization chain in GraphX to avoid the > StackOverflow error > ------------------------------------------------------------------------------------- > > Key: SPARK-4672 > URL: https://issues.apache.org/jira/browse/SPARK-4672 > Project: Spark > Issue Type: Bug > Components: GraphX, Spark Core > Affects Versions: 1.1.0 > Reporter: Lijie Xu > Priority: Critical > Fix For: 1.2.0 > > > While running iterative algorithms in GraphX, a StackOverflow error will > stably occur in the serialization phase at about 300th iteration. In general, > these kinds of algorithms have two things in common: > # They have a long computing chain. > {code:borderStyle=solid} > (e.g., “degreeGraph=>subGraph=>degreeGraph=>subGraph=>…=>”) > {code} > # They will iterate many times to converge. An example: > {code:borderStyle=solid} > //K-Core Algorithm > val kNum = 5 > var degreeGraph = graph.outerJoinVertices(graph.degrees) { > (vid, vd, degree) => degree.getOrElse(0) > }.cache() > > do { > val subGraph = degreeGraph.subgraph( > vpred = (vid, degree) => degree >= KNum > ).cache() > val newDegreeGraph = subGraph.degrees > degreeGraph = subGraph.outerJoinVertices(newDegreeGraph) { > (vid, vd, degree) => degree.getOrElse(0) > }.cache() > isConverged = check(degreeGraph) > } while(isConverged == false) > {code} > After about 300 iterations, StackOverflow will definitely occur with the > following stack trace: > {code:borderStyle=solid} > Exception in thread "main" org.apache.spark.SparkException: Job aborted due > to stage failure: Task serialization failed: java.lang.StackOverflowError > java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1275) > java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1230) > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1426) > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > {code} > It is a very tricky bug, which only occurs with enough iterations. Since it > took us a long time to find out its causes, we will detail the causes in the > following 3 paragraphs. > > h3. Phase 1: Try using checkpoint() to shorten the lineage > It's easy to come to the thought that the long lineage may be the cause. For > some RDDs, their lineages may grow with the iterations. Also, for some > magical references, their lineage lengths never decrease and finally become > very long. As a result, the call stack of task's > serialization()/deserialization() method will be very long too, which finally > exhausts the whole JVM stack. > In deed, the lineage of some RDDs (e.g., EdgeRDD.partitionsRDD) increases 3 > OneToOne dependencies in each iteration in the above example. Lineage length > refers to the maximum length of OneToOne dependencies (e.g., from the > finalRDD to the ShuffledRDD) in each stage. > To shorten the lineage, a checkpoint() is performed every N (e.g., 10) > iterations. Then, the lineage will drop down when it reaches a certain length > (e.g., 33). > However, StackOverflow error still occurs after 300+ iterations! > h3. Phase 2: Abnormal f closure function leads to a unbreakable > serialization chain > After a long-time debug, we found that an abnormal _*f*_ function closure and > a potential bug in GraphX (will be detailed in Phase 3) are the "Suspect > Zero". They together build another serialization chain that can bypass the > broken lineage cut by checkpoint() (as shown in Figure 1). In other words, > the serialization chain can be as long as the original lineage before > checkpoint(). > Figure 1 shows how the unbreakable serialization chain is generated. Yes, the > OneToOneDep can be cut off by checkpoint(). However, the serialization chain > can still access the previous RDDs through the (1)->(2) reference chain. As a > result, the checkpoint() action is meaningless and the lineage is as long as > that before. > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/g1.png|width=100%! > The (1)->(2) chain can be observed in the debug view (in Figure 2). > {code:borderStyle=solid} > _rdd (i.e., A in Figure 1, checkpointed) -> f -> $outer (VertexRDD) -> > partitionsRDD:MapPartitionsRDD -> RDDs in the previous iterations > {code} > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/g2.png|width=100%! > More description: While a RDD is being serialized, its f function > {code:borderStyle=solid} > e.g., f: (Iterator[A], Iterator[B]) => Iterator[V]) in ZippedPartitionsRDD2 > {code} > will be serialized too. This action will be very dangerous if the f closure > has a member “$outer” that references its outer class (as shown in Figure 1). > This reference will be another way (except the OneToOneDependency) that a RDD > (e.g., PartitionsRDD) can reference the other RDDs (e.g., VertexRDD). Note > that checkpoint() only cuts off the direct lineage, while the function > reference is still kept. So, serialization() can still access the other RDDs > along the f references. > h3. Phase 3: Non-transient member variable of VertexRDD makes things worse > "Reference (1)" in Figure 1 is caused by the abnormal f clousre, while > "Reference (2)" is caused by the potential bug in GraphX: *PartitionsRDD is a > non-transient member variable of VertexRDD*. > With this _small_ bug, the f closure itself (without OneToOne dependency) can > cause StackOverflow error, as shown in the red box in Figure 3: > # While _vertices:VertexRDD_ is being serialized, its member _PartitionsRDD_ > will be serialized too. > # Next, while serializing this _partitionsRDD_, serialization() will > simultaneously serialize its f’s referenced $outer. Here, it is another > _partitionsRDD_. > # Finally, the chain > {code:borderStyle=solid} > "f => f$3 => f$3 => $outer => vertices: VertexRDD => partitionsRDD => … => > ShuffledRDD" > {code} > comes into shape. As a result, the serialization chain can be as long as the > original lineage and finally triggers StackOverflow error. > > !https://raw.githubusercontent.com/JerryLead/Misc/master/SparkPRFigures/g3.png|width=100%! > h2. Conclusions > In conclusion, the root cause of StackOverflow error is the long > serialization chain, which cannot be cut off by _checkpoint()_. This long > chain is caused by the multiple factors, including: > # long lineage > # $outer reference in the f closure > # non-transient member variable > h2. How to fix this error > We propose three pull requests as follows to solve this problem thoroughly. > # PR-3544 > In this pr, we change the "val PartitionsRDD" to be transient in EdgeRDDImpl > and VertexRDDImpl. As a result, while _vertices:VertexRDD_ is being > serialized, its member _PartitionsRDD_ will not be serialized. In other > words, the "Reference (2)" in Figure 1 will be cut off. > # PR-3545 > In this pr, we set "f = null" if ZippedPartitionsRDD is checkpointed. As a > result, when PartitionsRDD is checkpointed, its f closure will be cleared and > the "Reference (1)" (i.e., f => $outer) in Figure 1 will no exist. > # PR-3549 > To cut off the long lineage, we need to perform checkpoint() on > PartitionsRDD. However, current checkpoint() is performed on VertexRDD and > EdgeRDD themselves. As a result, we need to override the checkpoint() methods > in VertexRDDImpl and EdgeRDDImpl to perform checkpoint() on PartitionsRDD. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org