[jira] [Commented] (SPARK-2647) DAGScheduler plugs others when processing one JobSubmitted event
[ https://issues.apache.org/jira/browse/SPARK-2647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16806554#comment-16806554 ] Sujith Chacko commented on SPARK-2647: -- [~joshrosen] - May i know the Jira is where we already addressed/work is in progress . i think still i can see the events are processed sequentially in DAGScheduler - referred master branch code. please guide me. thanks > DAGScheduler plugs others when processing one JobSubmitted event > > > Key: SPARK-2647 > URL: https://issues.apache.org/jira/browse/SPARK-2647 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Reporter: YanTang Zhai >Priority: Major > > If a few of jobs are submitted, DAGScheduler plugs others when processing one > JobSubmitted event. > For example ont JobSubmitted event is processed as follows and costs much time > "spark-akka.actor.default-dispatcher-67" daemon prio=10 > tid=0x7f75ec001000 nid=0x7dd6 in Object.wait() [0x7f76063e1000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:503) > at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130) > - locked <0x000783b17330> (a org.apache.hadoopcdh3.ipc.Client$Call) > at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241) > at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) > at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83) > at > org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60) > at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) > at > org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472) > at > org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498) > at > org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208) > at > org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204) > at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812) > at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797) > at > org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233) > at > StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141) > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at
[jira] [Commented] (SPARK-2647) DAGScheduler plugs others when processing one JobSubmitted event
[ https://issues.apache.org/jira/browse/SPARK-2647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147378#comment-14147378 ] Nan Zhu commented on SPARK-2647: isn't it the expected behaviour as we keep DAGScheduler as a single-thread mode? DAGScheduler plugs others when processing one JobSubmitted event Key: SPARK-2647 URL: https://issues.apache.org/jira/browse/SPARK-2647 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai If a few of jobs are submitted, DAGScheduler plugs others when processing one JobSubmitted event. For example ont JobSubmitted event is processed as follows and costs much time spark-akka.actor.default-dispatcher-67 daemon prio=10 tid=0x7f75ec001000 nid=0x7dd6 in Object.wait() [0x7f76063e1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:503) at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130) - locked 0x000783b17330 (a org.apache.hadoopcdh3.ipc.Client$Call) at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241) at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83) at org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60) at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) at org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472) at org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233) at StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at
[jira] [Commented] (SPARK-2647) DAGScheduler plugs others when processing one JobSubmitted event
[ https://issues.apache.org/jira/browse/SPARK-2647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071755#comment-14071755 ] YanTang Zhai commented on SPARK-2647: - I've created PR: https://github.com/apache/spark/pull/1548. Please help to review. Thanks. DAGScheduler plugs others when processing one JobSubmitted event Key: SPARK-2647 URL: https://issues.apache.org/jira/browse/SPARK-2647 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai If a few of jobs are submitted, DAGScheduler plugs others when processing one JobSubmitted event. For example ont JobSubmitted event is processed as follows and costs much time spark-akka.actor.default-dispatcher-67 daemon prio=10 tid=0x7f75ec001000 nid=0x7dd6 in Object.wait() [0x7f76063e1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:503) at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130) - locked 0x000783b17330 (a org.apache.hadoopcdh3.ipc.Client$Call) at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241) at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83) at org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60) at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) at org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472) at org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233) at StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at
[jira] [Commented] (SPARK-2647) DAGScheduler plugs others when processing one JobSubmitted event
[ https://issues.apache.org/jira/browse/SPARK-2647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071754#comment-14071754 ] Apache Spark commented on SPARK-2647: - User 'YanTangZhai' has created a pull request for this issue: https://github.com/apache/spark/pull/1548 DAGScheduler plugs others when processing one JobSubmitted event Key: SPARK-2647 URL: https://issues.apache.org/jira/browse/SPARK-2647 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai If a few of jobs are submitted, DAGScheduler plugs others when processing one JobSubmitted event. For example ont JobSubmitted event is processed as follows and costs much time spark-akka.actor.default-dispatcher-67 daemon prio=10 tid=0x7f75ec001000 nid=0x7dd6 in Object.wait() [0x7f76063e1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:503) at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130) - locked 0x000783b17330 (a org.apache.hadoopcdh3.ipc.Client$Call) at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241) at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83) at org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60) at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) at org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472) at org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233) at StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at