Fwd: spark graphx storage RDD memory leak

2016-04-10 Thread zhang juntao
thanks ted for replying ,
these three lines can’t release param graph cache, it only release g ( 
graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() )
ConnectedComponents.scala param graph will cache in ccGraph and won’t be 
release in Pregel
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, 
ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, 
VertexId)] = {
  if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
  } else if (edge.srcAttr > edge.dstAttr) {
Iterator((edge.srcId, edge.dstAttr))
  } else {
val initialMessage = Long.MaxValue
Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
  vprog = (id, attr, msg) => math.min(attr, msg),
  sendMsg = sendMessage,
  mergeMsg = (a, b) => math.min(a, b))
  } // end of connectedComponents

> Begin forwarded message:
> From: Ted Yu 
> Subject: Re: spark graphx storage RDD memory leak
> Date: April 11, 2016 at 1:15:23 AM GMT+8
> To: zhang juntao 
> Cc: "dev@spark.apache.org" 
> I see the following code toward the end of the method:
>   // Unpersist the RDDs hidden by newly-materialized RDDs
>   oldMessages.unpersist(blocking = false)
>   prevG.unpersistVertices(blocking = false)
>   prevG.edges.unpersist(blocking = false)
> Wouldn't the above achieve same effect ?
> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao  > wrote:
> hi experts,
> I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs, 
> note that scala environment shares the same SparkContext, SQLContext instance,
> and I call  Connected components algorithm to do some Business,  
> found that every time when the job finished, some graph storage RDDs weren’t 
> bean released, 
> after several times there would be a lot of  storage RDDs existing even 
> through all the jobs have finished . 
> So I check the code of connectedComponents  and find that may be a problem in 
> Pregel.scala .
> when param graph has been cached, there isn’t any way to unpersist,  
> so I add red font code to solve the problem
> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
>(graph: Graph[VD, ED],
> initialMsg: A,
> maxIterations: Int = Int.MaxValue,
> activeDirection: EdgeDirection = EdgeDirection.Either)
>(vprog: (VertexId, VD, A) => VD,
> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
> mergeMsg: (A, A) => A)
>   : Graph[VD, ED] =
> {
>   ..
>   var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
> initialMsg)).cache()
>   graph.unpersistVertices(blocking = false)
>   graph.edges.unpersist(blocking = false)
>   ..
> } // end of apply
> I'm not sure if this is a bug, 
> and thank you for your time,
> juntao

RE: Spark Sql on large number of files (~500Megs each) fails after couple of hours

2016-04-10 Thread Yu, Yucai
It is possible not the first failure, could you increase below and rerun?
spark.yarn.executor.memoryOverhead   4096

In my experience, sometimes, netty will use lots of off-heap memory, which may 
lead to exceed container memory limitation and be killed by yarn’s node manager.


From: Yash Sharma [mailto:yash...@gmail.com]
Sent: Monday, April 11, 2016 11:51 AM
To: Yu, Yucai 
Cc: dev@spark.apache.org
Subject: Re: Spark Sql on large number of files (~500Megs each) fails after 
couple of hours

Hi Yucai,
Thanks for the info. I have explored the container logs but did not get lot of 
information from it.

I have seen this error log for few containers but not sure of the cause for it.
1. java.lang.NullPointerException (DiskBlockManager.scala:167)
2. java.lang.ClassCastException: RegisterExecutorFailed

Attaching the log for reference.

16/04/07 13:05:43 INFO storage.MemoryStore: MemoryStore started with capacity 
2.6 GB
16/04/07 13:05:43 INFO executor.CoarseGrainedExecutorBackend: Connecting to 
16/04/07 13:05:43 ERROR executor.CoarseGrainedExecutorBackend: Cannot register 
with driver: 
java.lang.ClassCastException: Cannot cast 
at java.lang.Class.cast(Class.java:3186)
at scala.concurrent.Future$$anonfun$mapTo$1.apply(Future.scala:405)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:254)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
16/04/07 13:05:44 INFO storage.DiskBlockManager: Shutdown hook called
16/04/07 13:05:44 ERROR util.Utils: Uncaught exception in thread T

Re: Spark Sql on large number of files (~500Megs each) fails after couple of hours

2016-04-10 Thread Yash Sharma
Hi Yucai,
Thanks for the info. I have explored the container logs but did not get lot
of information from it.

I have seen this error log for few containers but not sure of the cause for
1. java.lang.NullPointerException (DiskBlockManager.scala:167)
2. java.lang.ClassCastException: RegisterExecutorFailed

Attaching the log for reference.

16/04/07 13:05:43 INFO storage.MemoryStore: MemoryStore started with
> capacity 2.6 GB
> 16/04/07 13:05:43 INFO executor.CoarseGrainedExecutorBackend: Connecting
> to driver: akka.tcp://
> sparkDriver@
> 16/04/07 13:05:43 ERROR executor.CoarseGrainedExecutorBackend: Cannot
> register with driver: akka.tcp://
> sparkDriver@
> java.lang.ClassCastException: Cannot cast
> org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$RegisterExecutorFailed
> to
> org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$RegisteredExecutor$
> at java.lang.Class.cast(Class.java:3186)
> at scala.concurrent.Future$$anonfun$mapTo$1.apply(Future.scala:405)
> at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
> at scala.util.Try$.apply(Try.scala:161)
> at scala.util.Success.map(Try.scala:206)
> at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
> at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643)
> at
> scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658)
> at
> scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
> at
> scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
> at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at
> scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$KeptPromise.onComplete(Promise.scala:333)
> at
> scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:254)
> at
> scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
> at
> scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
> at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
> at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:935)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 16/04/07 13:05:44 INFO storage.DiskBlockManager: Shutdown hook called
> 16/04/07 13:05:44 ERROR util.Utils: Uncaught exception in thread Thread-2
> java.lang.NullPointerException
> at org.apache.spark.storage.DiskBlockManager.org
> $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:167)
> at
> org.apache.spark.storage.DiskBlockManager$$anonfun$addShutdownHook$1.apply$mcV$sp(DiskBlockManager.scala:149)
> at
> org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scal

RE: Spark Sql on large number of files (~500Megs each) fails after couple of hours

2016-04-10 Thread Yu, Yucai
Hi Yash,

How about checking the executor(yarn container) log? Most of time, it shows 
more details, we are using CDH, the log is at:

[yucai@sr483 container_1457699919227_0094_01_14]$ pwd
[yucai@sr483 container_1457699919227_0094_01_14]$ ls -tlr
total 408
-rw-r--r-- 1 yucai DP 382676 Mar 13 18:04 stderr
-rw-r--r-- 1 yucai DP  22302 Mar 13 18:04 stdout

Please pay attention, you had better check the first failure container .


From: Yash Sharma [mailto:yash...@gmail.com]
Sent: Monday, April 11, 2016 10:46 AM
To: dev@spark.apache.org
Subject: Spark Sql on large number of files (~500Megs each) fails after couple 
of hours

Hi All,
I am trying Spark Sql on a dataset ~16Tb with large number of files (~50K). 
Each file is roughly 400-500 Megs.

I am issuing a fairly simple hive query on the dataset with just filters (No 
groupBy's and Joins) and the job is very very slow. It runs for 7-8 hrs and 
processes about 80-100 Gigs on a 12 node cluster.

I have experimented with different values of spark.sql.shuffle.partitions from 
20 to 4000 but havn't seen lot of difference.

From the logs I have the yarn error attached at end [1]. I have got the below 
spark configs [2] for the job.

Is there any other tuning I need to look into. Any tips would be appreciated,


2. Spark config -
--master yarn-client
--driver-memory 1G
--executor-memory 10G
--executor-cores 5
--conf spark.dynamicAllocation.enabled=true
--conf spark.shuffle.service.enabled=true
--conf spark.dynamicAllocation.initialExecutors=2
--conf spark.dynamicAllocation.minExecutors=2

1. Yarn Error:

16/04/07 13:05:37 INFO yarn.YarnAllocator: Container marked as failed: 
container_1459747472046_1618_02_03. Exit status: 1. Diagnostics: Exception 
from container-launch.
Container id: container_1459747472046_1618_02_03
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1

Spark Jenkins test configurations

2016-04-10 Thread cherry_zhang
We are running unit test on our own Jenkins Server, but we encounter some
problems about that, so could someone give me a detail list of the
configurations about Jenkins Server? thx.

View this message in context: 
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Spark Sql on large number of files (~500Megs each) fails after couple of hours

2016-04-10 Thread Yash Sharma
Hi All,
I am trying Spark Sql on a dataset ~16Tb with large number of files (~50K).
Each file is roughly 400-500 Megs.

I am issuing a fairly simple hive query on the dataset with just filters
(No groupBy's and Joins) and the job is very very slow. It runs for 7-8 hrs
and processes about 80-100 Gigs on a 12 node cluster.

I have experimented with different values of spark.sql.shuffle.partitions
from 20 to 4000 but havn't seen lot of difference.

>From the logs I have the yarn error attached at end [1]. I have got the
below spark configs [2] for the job.

Is there any other tuning I need to look into. Any tips would be


2. Spark config -
--master yarn-client
--driver-memory 1G
--executor-memory 10G
--executor-cores 5
--conf spark.dynamicAllocation.enabled=true
--conf spark.shuffle.service.enabled=true
--conf spark.dynamicAllocation.initialExecutors=2
--conf spark.dynamicAllocation.minExecutors=2

1. Yarn Error:

> 16/04/07 13:05:37 INFO yarn.YarnAllocator: Container marked as failed:
> container_1459747472046_1618_02_03. Exit status: 1. Diagnostics:
> Exception from container-launch.
> Container id: container_1459747472046_1618_02_03
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
> at org.apache.hadoop.util.Shell.run(Shell.java:455)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Container exited with a non-zero exit code 1

Re: spark graphx storage RDD memory leak

2016-04-10 Thread Ted Yu
I see the following code toward the end of the method:

  // Unpersist the RDDs hidden by newly-materialized RDDs
  oldMessages.unpersist(blocking = false)
  prevG.unpersistVertices(blocking = false)
  prevG.edges.unpersist(blocking = false)

Wouldn't the above achieve same effect ?

On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao 

> hi experts,
> I’m reporting a problem about spark graphx, I use zeppelin submit spark
> jobs,
> note that scala environment shares the same SparkContext, SQLContext
> instance,
> and I call  Connected components algorithm to do some Business,
> found that every time when the job finished, some graph storage RDDs
> weren’t bean released,
> after several times there would be a lot of  storage RDDs existing even
> through all the jobs have finished .
> So I check the code of connectedComponents  and find that may be a problem
> in *Pregel.scala* .
> when param graph has been cached, there isn’t any way to unpersist,
> so I add red font code to solve the problem
> *def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]   (graph: Graph[VD, ED],  
>   initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: 
> EdgeDirection = EdgeDirection.Either)   (vprog: (VertexId, VD, A) => VD,
> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) 
> => A)  : Graph[VD, ED] ={*
> *  ..*
> *  var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
> initialMsg)).cache()  graph.unpersistVertices(blocking = false)  
> graph.edges.unpersist(blocking = false)*
> * ..*
> *} // end of apply*
> I'm not sure if this is a bug, and thank you for your time, juntao

spark graphx storage RDD memory leak

2016-04-10 Thread zhang juntao
hi experts,

I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs, 
note that scala environment shares the same SparkContext, SQLContext instance,
and I call  Connected components algorithm to do some Business,  
found that every time when the job finished, some graph storage RDDs weren’t 
bean released, 
after several times there would be a lot of  storage RDDs existing even through 
all the jobs have finished . 

So I check the code of connectedComponents  and find that may be a problem in 
Pregel.scala .
when param graph has been cached, there isn’t any way to unpersist,  
so I add red font code to solve the problem
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
   (graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
   (vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
  : Graph[VD, ED] =
  var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
  graph.unpersistVertices(blocking = false)
  graph.edges.unpersist(blocking = false)

} // end of apply

I'm not sure if this is a bug, 
and thank you for your time,