Fwd: spark graphx storage RDD memory leak

2016-04-11 Thread zhang juntao
yes I use version 1.6 , and thanks Ted 

> Begin forwarded message:
> 
> From: Robin East <robin.e...@xense.co.uk>
> Subject: Re: spark graphx storage RDD memory leak
> Date: April 12, 2016 at 2:13:10 AM GMT+8
> To: zhang juntao <juntao.zhang...@gmail.com>
> Cc: Ted Yu <yuzhih...@gmail.com>, dev@spark.apache.org
> 
> this looks like https://issues.apache.org/jira/browse/SPARK-12655 
> <https://issues.apache.org/jira/browse/SPARK-12655> fixed in 2.0
> ---
> Robin East
> Spark GraphX in Action Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action 
> <http://www.manning.com/books/spark-graphx-in-action>
> 
> 
> 
> 
> 
>> On 11 Apr 2016, at 07:23, zhang juntao <juntao.zhang...@gmail.com 
>> <mailto:juntao.zhang...@gmail.com>> wrote:
>> 
>> thanks ted for replying ,
>> these three lines can’t release param graph cache, it only release g ( 
>> graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() )
>> ConnectedComponents.scala param graph will cache in ccGraph and won’t be 
>> release in Pregel
>>   def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, 
>> ED] = {
>> val ccGraph = graph.mapVertices { case (vid, _) => vid }
>> def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, 
>> VertexId)] = {
>>   if (edge.srcAttr < edge.dstAttr) {
>> Iterator((edge.dstId, edge.srcAttr))
>>   } else if (edge.srcAttr > edge.dstAttr) {
>> Iterator((edge.srcId, edge.dstAttr))
>>   } else {
>> Iterator.empty
>>   }
>> }
>> val initialMessage = Long.MaxValue
>> Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
>>   vprog = (id, attr, msg) => math.min(attr, msg),
>>   sendMsg = sendMessage,
>>   mergeMsg = (a, b) => math.min(a, b))
>>   } // end of connectedComponents
>> }
>> thanks
>> juntao
>> 
>> 
>>> Begin forwarded message:
>>> 
>>> From: Ted Yu <yuzhih...@gmail.com <mailto:yuzhih...@gmail.com>>
>>> Subject: Re: spark graphx storage RDD memory leak
>>> Date: April 11, 2016 at 1:15:23 AM GMT+8
>>> To: zhang juntao <juntao.zhang...@gmail.com 
>>> <mailto:juntao.zhang...@gmail.com>>
>>> Cc: "dev@spark.apache.org <mailto:dev@spark.apache.org>" 
>>> <dev@spark.apache.org <mailto:dev@spark.apache.org>>
>>> 
>>> I see the following code toward the end of the method:
>>> 
>>>   // Unpersist the RDDs hidden by newly-materialized RDDs
>>>   oldMessages.unpersist(blocking = false)
>>>   prevG.unpersistVertices(blocking = false)
>>>   prevG.edges.unpersist(blocking = false)
>>> 
>>> Wouldn't the above achieve same effect ?
>>> 
>>> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang...@gmail.com 
>>> <mailto:juntao.zhang...@gmail.com>> wrote:
>>> hi experts,
>>> 
>>> I’m reporting a problem about spark graphx, I use zeppelin submit spark 
>>> jobs, 
>>> note that scala environment shares the same SparkContext, SQLContext 
>>> instance,
>>> and I call  Connected components algorithm to do some Business,  
>>> found that every time when the job finished, some graph storage RDDs 
>>> weren’t bean released, 
>>> after several times there would be a lot of  storage RDDs existing even 
>>> through all the jobs have finished . 
>>> 
>>> 
>>> 
>>> So I check the code of connectedComponents  and find that may be a problem 
>>> in Pregel.scala .
>>> when param graph has been cached, there isn’t any way to unpersist,  
>>> so I add red font code to solve the problem
>>> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
>>>(graph: Graph[VD, ED],
>>> initialMsg: A,
>>> maxIterations: Int = Int.MaxValue,
>>> activeDirection: EdgeDirection = EdgeDirection.Either)
>>>(vprog: (VertexId, VD, A) => VD,
>>> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
>>> mergeMsg: (A, A) => A)
>>>   : Graph[VD, ED] =
>>> {
>>>   ..
>>>   var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
>>> initialMsg)).cache()
>>>   graph.unpersistVertices(blocking = false)
>>>   graph.edges.unpersist(blocking = false)
>>>   ..
>>> 
>>> } // end of apply
>>> 
>>> I'm not sure if this is a bug, 
>>> and thank you for your time,
>>> juntao
>>> 
>>> 
>>> 
>> 
> 



Re: spark graphx storage RDD memory leak

2016-04-11 Thread Robin East
this looks like https://issues.apache.org/jira/browse/SPARK-12655 
<https://issues.apache.org/jira/browse/SPARK-12655> fixed in 2.0
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 
<http://www.manning.com/books/spark-graphx-in-action>





> On 11 Apr 2016, at 07:23, zhang juntao <juntao.zhang...@gmail.com> wrote:
> 
> thanks ted for replying ,
> these three lines can’t release param graph cache, it only release g ( 
> graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() )
> ConnectedComponents.scala param graph will cache in ccGraph and won’t be 
> release in Pregel
>   def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, 
> ED] = {
> val ccGraph = graph.mapVertices { case (vid, _) => vid }
> def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, 
> VertexId)] = {
>   if (edge.srcAttr < edge.dstAttr) {
> Iterator((edge.dstId, edge.srcAttr))
>   } else if (edge.srcAttr > edge.dstAttr) {
> Iterator((edge.srcId, edge.dstAttr))
>   } else {
> Iterator.empty
>   }
> }
> val initialMessage = Long.MaxValue
> Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
>   vprog = (id, attr, msg) => math.min(attr, msg),
>   sendMsg = sendMessage,
>   mergeMsg = (a, b) => math.min(a, b))
>   } // end of connectedComponents
> }
> thanks
> juntao
> 
> 
>> Begin forwarded message:
>> 
>> From: Ted Yu <yuzhih...@gmail.com <mailto:yuzhih...@gmail.com>>
>> Subject: Re: spark graphx storage RDD memory leak
>> Date: April 11, 2016 at 1:15:23 AM GMT+8
>> To: zhang juntao <juntao.zhang...@gmail.com 
>> <mailto:juntao.zhang...@gmail.com>>
>> Cc: "dev@spark.apache.org <mailto:dev@spark.apache.org>" 
>> <dev@spark.apache.org <mailto:dev@spark.apache.org>>
>> 
>> I see the following code toward the end of the method:
>> 
>>   // Unpersist the RDDs hidden by newly-materialized RDDs
>>   oldMessages.unpersist(blocking = false)
>>   prevG.unpersistVertices(blocking = false)
>>   prevG.edges.unpersist(blocking = false)
>> 
>> Wouldn't the above achieve same effect ?
>> 
>> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang...@gmail.com 
>> <mailto:juntao.zhang...@gmail.com>> wrote:
>> hi experts,
>> 
>> I’m reporting a problem about spark graphx, I use zeppelin submit spark 
>> jobs, 
>> note that scala environment shares the same SparkContext, SQLContext 
>> instance,
>> and I call  Connected components algorithm to do some Business,  
>> found that every time when the job finished, some graph storage RDDs weren’t 
>> bean released, 
>> after several times there would be a lot of  storage RDDs existing even 
>> through all the jobs have finished . 
>> 
>> 
>> 
>> So I check the code of connectedComponents  and find that may be a problem 
>> in Pregel.scala .
>> when param graph has been cached, there isn’t any way to unpersist,  
>> so I add red font code to solve the problem
>> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
>>(graph: Graph[VD, ED],
>> initialMsg: A,
>> maxIterations: Int = Int.MaxValue,
>> activeDirection: EdgeDirection = EdgeDirection.Either)
>>(vprog: (VertexId, VD, A) => VD,
>> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
>> mergeMsg: (A, A) => A)
>>   : Graph[VD, ED] =
>> {
>>   ..
>>   var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
>> initialMsg)).cache()
>>   graph.unpersistVertices(blocking = false)
>>   graph.edges.unpersist(blocking = false)
>>   ..
>> 
>> } // end of apply
>> 
>> I'm not sure if this is a bug, 
>> and thank you for your time,
>> juntao
>> 
>> 
>> 
> 



Fwd: spark graphx storage RDD memory leak

2016-04-11 Thread zhang juntao
thanks ted for replying ,
these three lines can’t release param graph cache, it only release g ( 
graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() )
ConnectedComponents.scala param graph will cache in ccGraph and won’t be 
release in Pregel
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, 
ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, 
VertexId)] = {
  if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
  } else if (edge.srcAttr > edge.dstAttr) {
Iterator((edge.srcId, edge.dstAttr))
  } else {
Iterator.empty
  }
}
val initialMessage = Long.MaxValue
Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
  vprog = (id, attr, msg) => math.min(attr, msg),
  sendMsg = sendMessage,
  mergeMsg = (a, b) => math.min(a, b))
  } // end of connectedComponents
}
thanks
juntao


> Begin forwarded message:
> 
> From: Ted Yu <yuzhih...@gmail.com>
> Subject: Re: spark graphx storage RDD memory leak
> Date: April 11, 2016 at 1:15:23 AM GMT+8
> To: zhang juntao <juntao.zhang...@gmail.com>
> Cc: "dev@spark.apache.org" <dev@spark.apache.org>
> 
> I see the following code toward the end of the method:
> 
>   // Unpersist the RDDs hidden by newly-materialized RDDs
>   oldMessages.unpersist(blocking = false)
>   prevG.unpersistVertices(blocking = false)
>   prevG.edges.unpersist(blocking = false)
> 
> Wouldn't the above achieve same effect ?
> 
> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang...@gmail.com 
> <mailto:juntao.zhang...@gmail.com>> wrote:
> hi experts,
> 
> I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs, 
> note that scala environment shares the same SparkContext, SQLContext instance,
> and I call  Connected components algorithm to do some Business,  
> found that every time when the job finished, some graph storage RDDs weren’t 
> bean released, 
> after several times there would be a lot of  storage RDDs existing even 
> through all the jobs have finished . 
> 
> 
> 
> So I check the code of connectedComponents  and find that may be a problem in 
> Pregel.scala .
> when param graph has been cached, there isn’t any way to unpersist,  
> so I add red font code to solve the problem
> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
>(graph: Graph[VD, ED],
> initialMsg: A,
> maxIterations: Int = Int.MaxValue,
> activeDirection: EdgeDirection = EdgeDirection.Either)
>(vprog: (VertexId, VD, A) => VD,
> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
> mergeMsg: (A, A) => A)
>   : Graph[VD, ED] =
> {
>   ..
>   var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
> initialMsg)).cache()
>   graph.unpersistVertices(blocking = false)
>   graph.edges.unpersist(blocking = false)
>   ..
> 
> } // end of apply
> 
> I'm not sure if this is a bug, 
> and thank you for your time,
> juntao
> 
> 
> 



Re: spark graphx storage RDD memory leak

2016-04-10 Thread Ted Yu
I see the following code toward the end of the method:

  // Unpersist the RDDs hidden by newly-materialized RDDs
  oldMessages.unpersist(blocking = false)
  prevG.unpersistVertices(blocking = false)
  prevG.edges.unpersist(blocking = false)

Wouldn't the above achieve same effect ?

On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao 
wrote:

> hi experts,
>
> I’m reporting a problem about spark graphx, I use zeppelin submit spark
> jobs,
> note that scala environment shares the same SparkContext, SQLContext
> instance,
> and I call  Connected components algorithm to do some Business,
> found that every time when the job finished, some graph storage RDDs
> weren’t bean released,
> after several times there would be a lot of  storage RDDs existing even
> through all the jobs have finished .
>
>
> So I check the code of connectedComponents  and find that may be a problem
> in *Pregel.scala* .
> when param graph has been cached, there isn’t any way to unpersist,
> so I add red font code to solve the problem
>
>
>
>
>
>
>
>
>
>
> *def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]   (graph: Graph[VD, ED],  
>   initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: 
> EdgeDirection = EdgeDirection.Either)   (vprog: (VertexId, VD, A) => VD,
> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) 
> => A)  : Graph[VD, ED] ={*
>
>
> *  ..*
>
>
>
> *  var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
> initialMsg)).cache()  graph.unpersistVertices(blocking = false)  
> graph.edges.unpersist(blocking = false)*
>
> * ..*
>
>
> *} // end of apply*
>
>
> I'm not sure if this is a bug, and thank you for your time, juntao
>
>
>


spark graphx storage RDD memory leak

2016-04-10 Thread zhang juntao
hi experts,

I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs, 
note that scala environment shares the same SparkContext, SQLContext instance,
and I call  Connected components algorithm to do some Business,  
found that every time when the job finished, some graph storage RDDs weren’t 
bean released, 
after several times there would be a lot of  storage RDDs existing even through 
all the jobs have finished . 



So I check the code of connectedComponents  and find that may be a problem in 
Pregel.scala .
when param graph has been cached, there isn’t any way to unpersist,  
so I add red font code to solve the problem
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
   (graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
   (vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
  : Graph[VD, ED] =
{
  ..
  var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
initialMsg)).cache()
  graph.unpersistVertices(blocking = false)
  graph.edges.unpersist(blocking = false)
  ..

} // end of apply

I'm not sure if this is a bug, 
and thank you for your time,
juntao