[ 
https://issues.apache.org/jira/browse/SPARK-13704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saisai Shao updated SPARK-13704:
--------------------------------
    Description: 
In some cases, TaskSchedulerImpl.createTaskSetManager can be expensive. For 
example, in a Yarn cluster, it may call the topology script for rack awareness. 
When submit a very large job in a very large Yarn cluster, the topology script 
may take signifiant time to run. And this blocks receiving executors' 
heartbeats, which may result in lost executors

Stacktraces we observed which is related to this issue:
{code}https://issues.apache.org/jira/browse/SPARK-13704#
"dag-scheduler-event-loop" daemon prio=10 tid=0x00007f8392875800 nid=0x26e8 
runnable [0x00007f83576f4000]
   java.lang.Thread.State: RUNNABLE
        at java.io.FileInputStream.readBytes(Native Method)
        at java.io.FileInputStream.read(FileInputStream.java:272)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        - locked <0x00000000f551f460> (a 
java.lang.UNIXProcess$ProcessPipeInputStream)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
        - locked <0x00000000f5529740> (a java.io.InputStreamReader)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:154)
        at java.io.BufferedReader.read1(BufferedReader.java:205)
        at java.io.BufferedReader.read(BufferedReader.java:279)
        - locked <0x00000000f5529740> (a java.io.InputStreamReader)
        at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:728)
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:524)
        at org.apache.hadoop.util.Shell.run(Shell.java:455)
        at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
        at 
org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251)
        at 
org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188)
        at 
org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
        at 
org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
        at 
org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
        at 
org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38)
        at 
org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:210)
        at 
org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:189)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:189)
        at 
org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:158)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at 
org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:157)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:187)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:161)
        - locked <0x00000000ea3b8a88> (a 
org.apache.spark.scheduler.cluster.YarnScheduler)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:872)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
        at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

"sparkDriver-akka.actor.default-dispatcher-15" daemon prio=10 
tid=0x00007f829c020000 nid=0x2737 waiting for monitor entry [0x00007f8355ebd000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:362)
        - waiting to lock <0x00000000ea3b8a88> (a 
org.apache.spark.scheduler.cluster.YarnScheduler)
        at 
org.apache.spark.HeartbeatReceiver$$anonfun$receiveWithLogging$1.applyOrElse(HeartbeatReceiver.scala:46)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

  was:
In some cases, TaskSchedulerImpl.createTaskSetManager can be expensive. For 
example, in a Yarn cluster, it may call the topology script for rack awareness. 
When submit a very large job in a very large Yarn cluster, the topology script 
may take signifiant time to run. And this blocks receiving executors' 
heartbeats, which may result in lost executors

Stacktraces we observed which is related to this issue:
{code}
"dag-scheduler-event-loop" daemon prio=10 tid=0x00007f8392875800 nid=0x26e8 
runnable [0x00007f83576f4000]
   java.lang.Thread.State: RUNNABLE
        at java.io.FileInputStream.readBytes(Native Method)
        at java.io.FileInputStream.read(FileInputStream.java:272)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        - locked <0x00000000f551f460> (a 
java.lang.UNIXProcess$ProcessPipeInputStream)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
        - locked <0x00000000f5529740> (a java.io.InputStreamReader)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:154)
        at java.io.BufferedReader.read1(BufferedReader.java:205)
        at java.io.BufferedReader.read(BufferedReader.java:279)
        - locked <0x00000000f5529740> (a java.io.InputStreamReader)
        at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:728)
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:524)
        at org.apache.hadoop.util.Shell.run(Shell.java:455)
        at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
        at 
org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251)
        at 
org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188)
        at 
org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
        at 
org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
        at 
org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
        at 
org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38)
        at 
org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:210)
        at 
org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:189)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:189)
        at 
org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:158)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at 
org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:157)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:187)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:161)
        - locked <0x00000000ea3b8a88> (a 
org.apache.spark.scheduler.cluster.YarnScheduler)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:872)
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
        at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

"sparkDriver-akka.actor.default-dispatcher-15" daemon prio=10 
tid=0x00007f829c020000 nid=0x2737 waiting for monitor entry [0x00007f8355ebd000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:362)
        - waiting to lock <0x00000000ea3b8a88> (a 
org.apache.spark.scheduler.cluster.YarnScheduler)
        at 
org.apache.spark.HeartbeatReceiver$$anonfun$receiveWithLogging$1.applyOrElse(HeartbeatReceiver.scala:46)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}


> TaskSchedulerImpl.createTaskSetManager can be expensive, and result in lost 
> executors due to blocked heartbeats
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13704
>                 URL: https://issues.apache.org/jira/browse/SPARK-13704
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.3.1, 1.4.1, 1.5.2, 1.6.0
>            Reporter: Zhong Wang
>            Assignee: Lantao Jin
>            Priority: Major
>             Fix For: 3.0.0
>
>
> In some cases, TaskSchedulerImpl.createTaskSetManager can be expensive. For 
> example, in a Yarn cluster, it may call the topology script for rack 
> awareness. When submit a very large job in a very large Yarn cluster, the 
> topology script may take signifiant time to run. And this blocks receiving 
> executors' heartbeats, which may result in lost executors
> Stacktraces we observed which is related to this issue:
> {code}https://issues.apache.org/jira/browse/SPARK-13704#
> "dag-scheduler-event-loop" daemon prio=10 tid=0x00007f8392875800 nid=0x26e8 
> runnable [0x00007f83576f4000]
>    java.lang.Thread.State: RUNNABLE
>         at java.io.FileInputStream.readBytes(Native Method)
>         at java.io.FileInputStream.read(FileInputStream.java:272)
>         at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>         at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>         - locked <0x00000000f551f460> (a 
> java.lang.UNIXProcess$ProcessPipeInputStream)
>         at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
>         at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
>         at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
>         - locked <0x00000000f5529740> (a java.io.InputStreamReader)
>         at java.io.InputStreamReader.read(InputStreamReader.java:184)
>         at java.io.BufferedReader.fill(BufferedReader.java:154)
>         at java.io.BufferedReader.read1(BufferedReader.java:205)
>         at java.io.BufferedReader.read(BufferedReader.java:279)
>         - locked <0x00000000f5529740> (a java.io.InputStreamReader)
>         at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:728)
>         at org.apache.hadoop.util.Shell.runCommand(Shell.java:524)
>         at org.apache.hadoop.util.Shell.run(Shell.java:455)
>         at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
>         at 
> org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.runResolveCommand(ScriptBasedMapping.java:251)
>         at 
> org.apache.hadoop.net.ScriptBasedMapping$RawScriptBasedMapping.resolve(ScriptBasedMapping.java:188)
>         at 
> org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)
>         at 
> org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
>         at 
> org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
>         at 
> org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:38)
>         at 
> org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:210)
>         at 
> org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$addPendingTask$1.apply(TaskSetManager.scala:189)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at 
> org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$addPendingTask(TaskSetManager.scala:189)
>         at 
> org.apache.spark.scheduler.TaskSetManager$$anonfun$1.apply$mcVI$sp(TaskSetManager.scala:158)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at 
> org.apache.spark.scheduler.TaskSetManager.<init>(TaskSetManager.scala:157)
>         at 
> org.apache.spark.scheduler.TaskSchedulerImpl.createTaskSetManager(TaskSchedulerImpl.scala:187)
>         at 
> org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:161)
>         - locked <0x00000000ea3b8a88> (a 
> org.apache.spark.scheduler.cluster.YarnScheduler)
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:872)
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> "sparkDriver-akka.actor.default-dispatcher-15" daemon prio=10 
> tid=0x00007f829c020000 nid=0x2737 waiting for monitor entry 
> [0x00007f8355ebd000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at 
> org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:362)
>         - waiting to lock <0x00000000ea3b8a88> (a 
> org.apache.spark.scheduler.cluster.YarnScheduler)
>         at 
> org.apache.spark.HeartbeatReceiver$$anonfun$receiveWithLogging$1.applyOrElse(HeartbeatReceiver.scala:46)
>         at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at 
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
>         at 
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at 
> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to