[ https://issues.apache.org/jira/browse/SPARK-2647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen updated SPARK-2647: ----------------------------- Component/s: (was: Spark Core) Scheduler > DAGScheduler plugs others when processing one JobSubmitted event > ---------------------------------------------------------------- > > Key: SPARK-2647 > URL: https://issues.apache.org/jira/browse/SPARK-2647 > Project: Spark > Issue Type: Improvement > Components: Scheduler > Reporter: YanTang Zhai > > If a few of jobs are submitted, DAGScheduler plugs others when processing one > JobSubmitted event. > For example ont JobSubmitted event is processed as follows and costs much time > "spark-akka.actor.default-dispatcher-67" daemon prio=10 > tid=0x00007f75ec001000 nid=0x7dd6 in Object.wait() [0x00007f76063e1000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:503) > at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130) > - locked <0x0000000783b17330> (a org.apache.hadoopcdh3.ipc.Client$Call) > at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241) > at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) > at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83) > at > org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60) > at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) > at > org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472) > at > org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498) > at > org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208) > at > org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204) > at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812) > at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797) > at > org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233) > at > StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141) > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:197) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:272) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269) > at > org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:279) > at > org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:219) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:676) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1180) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > The other JobSubmitted event is hung as follows: > "pool-8-thread-31" prio=10 tid=0x00007f78a8287800 nid=0x8cc in Object.wait() > [0x00007f7585654000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:503) > at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) > - locked <0x00000007a59453d8> (a org.apache.spark.scheduler.JobWaiter) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:451) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1048) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1066) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1087) > at shark.execution.FileSinkOperator.execute(FileSinkOperator.scala:165) > at shark.execution.SparkTask.execute(SparkTask.scala:99) > at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:144) > at > org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:57) > at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1362) > at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1146) > at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952) > at shark.SharkServerHandler.execute(SharkServer.scala:284) > at shark.GatedSharkServerHandler.execute(SharkServer.scala:240) > at > org.apache.hadoop.hive.service.ThriftHive$Processor$execute.getResult(ThriftHive.java:644) > at > org.apache.hadoop.hive.service.ThriftHive$Processor$execute.getResult(ThriftHive.java:628) > at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) > at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) > at > org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > I think DAGScheduler could use one thread to handleJobSubmitted. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org