Fwd: spark graphx storage RDD memory leak
yes I use version 1.6 , and thanks Ted > Begin forwarded message: > > From: Robin East <robin.e...@xense.co.uk> > Subject: Re: spark graphx storage RDD memory leak > Date: April 12, 2016 at 2:13:10 AM GMT+8 > To: zhang juntao <juntao.zhang...@gmail.com> > Cc: Ted Yu <yuzhih...@gmail.com>, dev@spark.apache.org > > this looks like https://issues.apache.org/jira/browse/SPARK-12655 > <https://issues.apache.org/jira/browse/SPARK-12655> fixed in 2.0 > --- > Robin East > Spark GraphX in Action Michael Malak and Robin East > Manning Publications Co. > http://www.manning.com/books/spark-graphx-in-action > <http://www.manning.com/books/spark-graphx-in-action> > > > > > >> On 11 Apr 2016, at 07:23, zhang juntao <juntao.zhang...@gmail.com >> <mailto:juntao.zhang...@gmail.com>> wrote: >> >> thanks ted for replying , >> these three lines can’t release param graph cache, it only release g ( >> graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() ) >> ConnectedComponents.scala param graph will cache in ccGraph and won’t be >> release in Pregel >> def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, >> ED] = { >> val ccGraph = graph.mapVertices { case (vid, _) => vid } >> def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, >> VertexId)] = { >> if (edge.srcAttr < edge.dstAttr) { >> Iterator((edge.dstId, edge.srcAttr)) >> } else if (edge.srcAttr > edge.dstAttr) { >> Iterator((edge.srcId, edge.dstAttr)) >> } else { >> Iterator.empty >> } >> } >> val initialMessage = Long.MaxValue >> Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( >> vprog = (id, attr, msg) => math.min(attr, msg), >> sendMsg = sendMessage, >> mergeMsg = (a, b) => math.min(a, b)) >> } // end of connectedComponents >> } >> thanks >> juntao >> >> >>> Begin forwarded message: >>> >>> From: Ted Yu <yuzhih...@gmail.com <mailto:yuzhih...@gmail.com>> >>> Subject: Re: spark graphx storage RDD memory leak >>> Date: April 11, 2016 at 1:15:23 AM GMT+8 >>> To: zhang juntao <juntao.zhang...@gmail.com >>> <mailto:juntao.zhang...@gmail.com>> >>> Cc: "dev@spark.apache.org <mailto:dev@spark.apache.org>" >>> <dev@spark.apache.org <mailto:dev@spark.apache.org>> >>> >>> I see the following code toward the end of the method: >>> >>> // Unpersist the RDDs hidden by newly-materialized RDDs >>> oldMessages.unpersist(blocking = false) >>> prevG.unpersistVertices(blocking = false) >>> prevG.edges.unpersist(blocking = false) >>> >>> Wouldn't the above achieve same effect ? >>> >>> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang...@gmail.com >>> <mailto:juntao.zhang...@gmail.com>> wrote: >>> hi experts, >>> >>> I’m reporting a problem about spark graphx, I use zeppelin submit spark >>> jobs, >>> note that scala environment shares the same SparkContext, SQLContext >>> instance, >>> and I call Connected components algorithm to do some Business, >>> found that every time when the job finished, some graph storage RDDs >>> weren’t bean released, >>> after several times there would be a lot of storage RDDs existing even >>> through all the jobs have finished . >>> >>> >>> >>> So I check the code of connectedComponents and find that may be a problem >>> in Pregel.scala . >>> when param graph has been cached, there isn’t any way to unpersist, >>> so I add red font code to solve the problem >>> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] >>>(graph: Graph[VD, ED], >>> initialMsg: A, >>> maxIterations: Int = Int.MaxValue, >>> activeDirection: EdgeDirection = EdgeDirection.Either) >>>(vprog: (VertexId, VD, A) => VD, >>> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], >>> mergeMsg: (A, A) => A) >>> : Graph[VD, ED] = >>> { >>> .. >>> var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, >>> initialMsg)).cache() >>> graph.unpersistVertices(blocking = false) >>> graph.edges.unpersist(blocking = false) >>> .. >>> >>> } // end of apply >>> >>> I'm not sure if this is a bug, >>> and thank you for your time, >>> juntao >>> >>> >>> >> >
Re: spark graphx storage RDD memory leak
this looks like https://issues.apache.org/jira/browse/SPARK-12655 <https://issues.apache.org/jira/browse/SPARK-12655> fixed in 2.0 --- Robin East Spark GraphX in Action Michael Malak and Robin East Manning Publications Co. http://www.manning.com/books/spark-graphx-in-action <http://www.manning.com/books/spark-graphx-in-action> > On 11 Apr 2016, at 07:23, zhang juntao <juntao.zhang...@gmail.com> wrote: > > thanks ted for replying , > these three lines can’t release param graph cache, it only release g ( > graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() ) > ConnectedComponents.scala param graph will cache in ccGraph and won’t be > release in Pregel > def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, > ED] = { > val ccGraph = graph.mapVertices { case (vid, _) => vid } > def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, > VertexId)] = { > if (edge.srcAttr < edge.dstAttr) { > Iterator((edge.dstId, edge.srcAttr)) > } else if (edge.srcAttr > edge.dstAttr) { > Iterator((edge.srcId, edge.dstAttr)) > } else { > Iterator.empty > } > } > val initialMessage = Long.MaxValue > Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( > vprog = (id, attr, msg) => math.min(attr, msg), > sendMsg = sendMessage, > mergeMsg = (a, b) => math.min(a, b)) > } // end of connectedComponents > } > thanks > juntao > > >> Begin forwarded message: >> >> From: Ted Yu <yuzhih...@gmail.com <mailto:yuzhih...@gmail.com>> >> Subject: Re: spark graphx storage RDD memory leak >> Date: April 11, 2016 at 1:15:23 AM GMT+8 >> To: zhang juntao <juntao.zhang...@gmail.com >> <mailto:juntao.zhang...@gmail.com>> >> Cc: "dev@spark.apache.org <mailto:dev@spark.apache.org>" >> <dev@spark.apache.org <mailto:dev@spark.apache.org>> >> >> I see the following code toward the end of the method: >> >> // Unpersist the RDDs hidden by newly-materialized RDDs >> oldMessages.unpersist(blocking = false) >> prevG.unpersistVertices(blocking = false) >> prevG.edges.unpersist(blocking = false) >> >> Wouldn't the above achieve same effect ? >> >> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang...@gmail.com >> <mailto:juntao.zhang...@gmail.com>> wrote: >> hi experts, >> >> I’m reporting a problem about spark graphx, I use zeppelin submit spark >> jobs, >> note that scala environment shares the same SparkContext, SQLContext >> instance, >> and I call Connected components algorithm to do some Business, >> found that every time when the job finished, some graph storage RDDs weren’t >> bean released, >> after several times there would be a lot of storage RDDs existing even >> through all the jobs have finished . >> >> >> >> So I check the code of connectedComponents and find that may be a problem >> in Pregel.scala . >> when param graph has been cached, there isn’t any way to unpersist, >> so I add red font code to solve the problem >> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] >>(graph: Graph[VD, ED], >> initialMsg: A, >> maxIterations: Int = Int.MaxValue, >> activeDirection: EdgeDirection = EdgeDirection.Either) >>(vprog: (VertexId, VD, A) => VD, >> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], >> mergeMsg: (A, A) => A) >> : Graph[VD, ED] = >> { >> .. >> var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, >> initialMsg)).cache() >> graph.unpersistVertices(blocking = false) >> graph.edges.unpersist(blocking = false) >> .. >> >> } // end of apply >> >> I'm not sure if this is a bug, >> and thank you for your time, >> juntao >> >> >> >
Fwd: spark graphx storage RDD memory leak
thanks ted for replying , these three lines can’t release param graph cache, it only release g ( graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() ) ConnectedComponents.scala param graph will cache in ccGraph and won’t be release in Pregel def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { val ccGraph = graph.mapVertices { case (vid, _) => vid } def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = { if (edge.srcAttr < edge.dstAttr) { Iterator((edge.dstId, edge.srcAttr)) } else if (edge.srcAttr > edge.dstAttr) { Iterator((edge.srcId, edge.dstAttr)) } else { Iterator.empty } } val initialMessage = Long.MaxValue Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( vprog = (id, attr, msg) => math.min(attr, msg), sendMsg = sendMessage, mergeMsg = (a, b) => math.min(a, b)) } // end of connectedComponents } thanks juntao > Begin forwarded message: > > From: Ted Yu <yuzhih...@gmail.com> > Subject: Re: spark graphx storage RDD memory leak > Date: April 11, 2016 at 1:15:23 AM GMT+8 > To: zhang juntao <juntao.zhang...@gmail.com> > Cc: "dev@spark.apache.org" <dev@spark.apache.org> > > I see the following code toward the end of the method: > > // Unpersist the RDDs hidden by newly-materialized RDDs > oldMessages.unpersist(blocking = false) > prevG.unpersistVertices(blocking = false) > prevG.edges.unpersist(blocking = false) > > Wouldn't the above achieve same effect ? > > On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang...@gmail.com > <mailto:juntao.zhang...@gmail.com>> wrote: > hi experts, > > I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs, > note that scala environment shares the same SparkContext, SQLContext instance, > and I call Connected components algorithm to do some Business, > found that every time when the job finished, some graph storage RDDs weren’t > bean released, > after several times there would be a lot of storage RDDs existing even > through all the jobs have finished . > > > > So I check the code of connectedComponents and find that may be a problem in > Pregel.scala . > when param graph has been cached, there isn’t any way to unpersist, > so I add red font code to solve the problem > def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] >(graph: Graph[VD, ED], > initialMsg: A, > maxIterations: Int = Int.MaxValue, > activeDirection: EdgeDirection = EdgeDirection.Either) >(vprog: (VertexId, VD, A) => VD, > sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], > mergeMsg: (A, A) => A) > : Graph[VD, ED] = > { > .. > var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, > initialMsg)).cache() > graph.unpersistVertices(blocking = false) > graph.edges.unpersist(blocking = false) > .. > > } // end of apply > > I'm not sure if this is a bug, > and thank you for your time, > juntao > > >
Re: spark graphx storage RDD memory leak
I see the following code toward the end of the method: // Unpersist the RDDs hidden by newly-materialized RDDs oldMessages.unpersist(blocking = false) prevG.unpersistVertices(blocking = false) prevG.edges.unpersist(blocking = false) Wouldn't the above achieve same effect ? On Sun, Apr 10, 2016 at 9:08 AM, zhang juntaowrote: > hi experts, > > I’m reporting a problem about spark graphx, I use zeppelin submit spark > jobs, > note that scala environment shares the same SparkContext, SQLContext > instance, > and I call Connected components algorithm to do some Business, > found that every time when the job finished, some graph storage RDDs > weren’t bean released, > after several times there would be a lot of storage RDDs existing even > through all the jobs have finished . > > > So I check the code of connectedComponents and find that may be a problem > in *Pregel.scala* . > when param graph has been cached, there isn’t any way to unpersist, > so I add red font code to solve the problem > > > > > > > > > > > *def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], > initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: > EdgeDirection = EdgeDirection.Either) (vprog: (VertexId, VD, A) => VD, > sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) > => A) : Graph[VD, ED] ={* > > > * ..* > > > > * var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, > initialMsg)).cache() graph.unpersistVertices(blocking = false) > graph.edges.unpersist(blocking = false)* > > * ..* > > > *} // end of apply* > > > I'm not sure if this is a bug, and thank you for your time, juntao > > >
spark graphx storage RDD memory leak
hi experts, I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs, note that scala environment shares the same SparkContext, SQLContext instance, and I call Connected components algorithm to do some Business, found that every time when the job finished, some graph storage RDDs weren’t bean released, after several times there would be a lot of storage RDDs existing even through all the jobs have finished . So I check the code of connectedComponents and find that may be a problem in Pregel.scala . when param graph has been cached, there isn’t any way to unpersist, so I add red font code to solve the problem def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { .. var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() graph.unpersistVertices(blocking = false) graph.edges.unpersist(blocking = false) .. } // end of apply I'm not sure if this is a bug, and thank you for your time, juntao