[ 
https://issues.apache.org/jira/browse/SPARK-2647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-2647:
-----------------------------
    Component/s:     (was: Spark Core)
                 Scheduler

> DAGScheduler plugs others when processing one JobSubmitted event
> ----------------------------------------------------------------
>
>                 Key: SPARK-2647
>                 URL: https://issues.apache.org/jira/browse/SPARK-2647
>             Project: Spark
>          Issue Type: Improvement
>          Components: Scheduler
>            Reporter: YanTang Zhai
>
> If a few of jobs are submitted, DAGScheduler plugs others when processing one 
> JobSubmitted event.
> For example ont JobSubmitted event is processed as follows and costs much time
> "spark-akka.actor.default-dispatcher-67" daemon prio=10 
> tid=0x00007f75ec001000 nid=0x7dd6 in Object.wait() [0x00007f76063e1000]
>    java.lang.Thread.State: WAITING (on object monitor)
>       at java.lang.Object.wait(Native Method)
>       at java.lang.Object.wait(Object.java:503)
>       at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130)
>       - locked <0x0000000783b17330> (a org.apache.hadoopcdh3.ipc.Client$Call)
>       at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241)
>       at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
>       at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83)
>       at 
> org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60)
>       at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
>       at 
> org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472)
>       at 
> org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498)
>       at 
> org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208)
>       at 
> org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204)
>       at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812)
>       at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797)
>       at 
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233)
>       at 
> StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141)
>       at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>       at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>       at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54)
>       at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>       at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>       at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:197)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:272)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
>       at scala.collection.immutable.List.foreach(List.scala:318)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
>       at 
> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:279)
>       at 
> org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:219)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:676)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1180)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> The other JobSubmitted event is hung as follows:
> "pool-8-thread-31" prio=10 tid=0x00007f78a8287800 nid=0x8cc in Object.wait() 
> [0x00007f7585654000]
>    java.lang.Thread.State: WAITING (on object monitor)
>       at java.lang.Object.wait(Native Method)
>       at java.lang.Object.wait(Object.java:503)
>       at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
>       - locked <0x00000007a59453d8> (a org.apache.spark.scheduler.JobWaiter)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:451)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1048)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1066)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1087)
>       at shark.execution.FileSinkOperator.execute(FileSinkOperator.scala:165)
>       at shark.execution.SparkTask.execute(SparkTask.scala:99)
>       at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:144)
>       at 
> org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:57)
>       at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1362)
>       at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1146)
>       at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952)
>       at shark.SharkServerHandler.execute(SharkServer.scala:284)
>       at shark.GatedSharkServerHandler.execute(SharkServer.scala:240)
>       at 
> org.apache.hadoop.hive.service.ThriftHive$Processor$execute.getResult(ThriftHive.java:644)
>       at 
> org.apache.hadoop.hive.service.ThriftHive$Processor$execute.getResult(ThriftHive.java:628)
>       at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
>       at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
>       at 
> org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:744)
> I think DAGScheduler could use one thread to handleJobSubmitted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to