[ https://issues.apache.org/jira/browse/SPARK-29878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973637#comment-16973637 ]
Aman Omer commented on SPARK-29878: ----------------------------------- [https://github.com/apache/spark/pull/7469] this PR had introduced the cache() on newEdges and vertices > Improper cache strategies in GraphX > ----------------------------------- > > Key: SPARK-29878 > URL: https://issues.apache.org/jira/browse/SPARK-29878 > Project: Spark > Issue Type: Improvement > Components: GraphX > Affects Versions: 3.0.0 > Reporter: Dong Wang > Priority: Major > > I have run examples.graphx.SSPExample and looked through the RDD dependency > graphs as well as persist operations. There are some improper cache > strategies in GraphX. The same situations also exist when I run > ConnectedComponentsExample. > 1. vertices.cache() and newEdges.cache() are unnecessary > In SSPExample, a graph is initialized by GraphImpl.mapVertices(). In this > method, a GraphImpl object is created using GraphImpl.apply(vertices, edges), > and RDD vertices/newEdges are cached in apply(). But these two RDDs are not > directly used anymore (their children RDDs has been cached) in SSPExample, so > the persists can be unnecessary here. > However, the other examples may need these two persists, so I think they > cannot be simply removed. It might be hard to fix. > {code:scala} > def apply[VD: ClassTag, ED: ClassTag]( > vertices: VertexRDD[VD], > edges: EdgeRDD[ED]): GraphImpl[VD, ED] = { > vertices.cache() // It is unnecessary for SSPExample and > ConnectedComponentsExample > // Convert the vertex partitions in edges to the correct type > val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]] > .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD]) > .cache() // It is unnecessary for SSPExample and > ConnectedComponentsExample > GraphImpl.fromExistingRDDs(vertices, newEdges) > } > {code} > 2. Missing persist on newEdges > SSSPExample will invoke pregel to do execution. Pregel will ultilize > ReplicatedVertexView.upgrade(). I find that RDD newEdges will be directly use > by multiple actions in Pregel. So newEdges should be persisted. > Same as the above issue, this issue is also found in > ConnectedComponentsExample. It is also hard to fix, because the persist added > may be unnecessary for other examples. > {code:scala} > // Pregel.scala > // compute the messages > var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) // > newEdges is created here > val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)]( > checkpointInterval, graph.vertices.sparkContext) > messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) > var activeMessages = messages.count() // The first time use newEdges > ... > while (activeMessages > 0 && i < maxIterations) { > // Receive the messages and update the vertices. > prevG = g > g = g.joinVertices(messages)(vprog) // Generate g will depends on > newEdges > ... > activeMessages = messages.count() // The second action to use newEdges. > newEdges should be unpersisted after this instruction. > {code} > {code:scala} > // ReplicatedVertexView.scala > def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: > Boolean): Unit = { > ... > val newEdges = > edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { > (ePartIter, shippedVertsIter) => ePartIter.map { > case (pid, edgePartition) => > (pid, > edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))) > } > }) > edges = newEdges // newEdges should be persisted > hasSrcId = includeSrc > hasDstId = includeDst > } > } > {code} > As I don't have much knowledge about Graphx, so I don't know how to fix these > issues well. > This issue is reported by our tool CacheCheck, which is used to dynamically > detecting persist()/unpersist() api misuses. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org