[ https://issues.apache.org/jira/browse/SPARK-13704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Saisai Shao updated SPARK-13704: -------------------------------- Description: In some cases, TaskSchedulerImpl.createTaskSetManager can be expensive. For example, in a Yarn cluster, it may call the topology script for rack awareness. When submit a very large job in a very large Yarn cluster, the topology script may take signifiant time to run. And this blocks receiving executors' heartbeats, which may result in lost executors Stacktraces we observed which is related to this issue: {code}https://issues.apache.org/jira/browse/SPARK-13704# "dag-scheduler-event-loop" daemon prio=10 tid=0x00007f8392875800 nid=0x26e8 runnable [0x00007f83576f4000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:272) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) - locked <0x00000000f551f460> (a java.lang.UNIXProcess$ProcessPipeInputStream) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) - locked <0x00000000f5529740> (a java.io.InputStreamReader) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.read1(BufferedReader.java:205) at java.io.BufferedReader.read(BufferedReader.java:279) - locked <0x00000000f5529740> (a java.io.InputStreamReader) at org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:728) at org.apache.hadoop.util.Shell.runCommand(Shell.java:524) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251) at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188) at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119) at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101) at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81) at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38) at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:210) at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:189) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:189) at org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:158) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:157) at org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:187) at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:161) - locked <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:872) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) "sparkDriver-akka.actor.default-dispatcher-15" daemon prio=10 tid=0x00007f829c020000 nid=0x2737 waiting for monitor entry [0x00007f8355ebd000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:362) - waiting to lock <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler) at org.apache.spark.HeartbeatReceiver$$anonfun$receiveWithLogging$1.applyOrElse(HeartbeatReceiver.scala:46) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} was: In some cases, TaskSchedulerImpl.createTaskSetManager can be expensive. For example, in a Yarn cluster, it may call the topology script for rack awareness. When submit a very large job in a very large Yarn cluster, the topology script may take signifiant time to run. And this blocks receiving executors' heartbeats, which may result in lost executors Stacktraces we observed which is related to this issue: {code} "dag-scheduler-event-loop" daemon prio=10 tid=0x00007f8392875800 nid=0x26e8 runnable [0x00007f83576f4000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.readBytes(Native Method) at java.io.FileInputStream.read(FileInputStream.java:272) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) - locked <0x00000000f551f460> (a java.lang.UNIXProcess$ProcessPipeInputStream) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) - locked <0x00000000f5529740> (a java.io.InputStreamReader) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.read1(BufferedReader.java:205) at java.io.BufferedReader.read(BufferedReader.java:279) - locked <0x00000000f5529740> (a java.io.InputStreamReader) at org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:728) at org.apache.hadoop.util.Shell.runCommand(Shell.java:524) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251) at org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188) at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119) at org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101) at org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81) at org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38) at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:210) at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:189) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:189) at org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:158) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:157) at org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:187) at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:161) - locked <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:872) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) "sparkDriver-akka.actor.default-dispatcher-15" daemon prio=10 tid=0x00007f829c020000 nid=0x2737 waiting for monitor entry [0x00007f8355ebd000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:362) - waiting to lock <0x00000000ea3b8a88> (a org.apache.spark.scheduler.cluster.YarnScheduler) at org.apache.spark.HeartbeatReceiver$$anonfun$receiveWithLogging$1.applyOrElse(HeartbeatReceiver.scala:46) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} > TaskSchedulerImpl.createTaskSetManager can be expensive, and result in lost > executors due to blocked heartbeats > --------------------------------------------------------------------------------------------------------------- > > Key: SPARK-13704 > URL: https://issues.apache.org/jira/browse/SPARK-13704 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 1.3.1, 1.4.1, 1.5.2, 1.6.0 > Reporter: Zhong Wang > Assignee: Lantao Jin > Priority: Major > Fix For: 3.0.0 > > > In some cases, TaskSchedulerImpl.createTaskSetManager can be expensive. For > example, in a Yarn cluster, it may call the topology script for rack > awareness. When submit a very large job in a very large Yarn cluster, the > topology script may take signifiant time to run. And this blocks receiving > executors' heartbeats, which may result in lost executors > Stacktraces we observed which is related to this issue: > {code}https://issues.apache.org/jira/browse/SPARK-13704# > "dag-scheduler-event-loop" daemon prio=10 tid=0x00007f8392875800 nid=0x26e8 > runnable [0x00007f83576f4000] > java.lang.Thread.State: RUNNABLE > at java.io.FileInputStream.readBytes(Native Method) > at java.io.FileInputStream.read(FileInputStream.java:272) > at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) > at java.io.BufferedInputStream.read(BufferedInputStream.java:334) > - locked <0x00000000f551f460> (a > java.lang.UNIXProcess$ProcessPipeInputStream) > at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) > - locked <0x00000000f5529740> (a java.io.InputStreamReader) > at java.io.InputStreamReader.read(InputStreamReader.java:184) > at java.io.BufferedReader.fill(BufferedReader.java:154) > at java.io.BufferedReader.read1(BufferedReader.java:205) > at java.io.BufferedReader.read(BufferedReader.java:279) > - locked <0x00000000f5529740> (a java.io.InputStreamReader) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:728) > at org.apache.hadoop.util.Shell.runCommand(Shell.java:524) > at org.apache.hadoop.util.Shell.run(Shell.java:455) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) > at > org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251) > at > org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188) > at > org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119) > at > org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101) > at > org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81) > at > org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38) > at > org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:210) > at > org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:189) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:189) > at > org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:158) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at > org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:157) > at > org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:187) > at > org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:161) > - locked <0x00000000ea3b8a88> (a > org.apache.spark.scheduler.cluster.YarnScheduler) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:872) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > "sparkDriver-akka.actor.default-dispatcher-15" daemon prio=10 > tid=0x00007f829c020000 nid=0x2737 waiting for monitor entry > [0x00007f8355ebd000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:362) > - waiting to lock <0x00000000ea3b8a88> (a > org.apache.spark.scheduler.cluster.YarnScheduler) > at > org.apache.spark.HeartbeatReceiver$$anonfun$receiveWithLogging$1.applyOrElse(HeartbeatReceiver.scala:46) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) > at > org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org