[jira] [Updated] (SPARK-19326) Speculated task attempts do not get launched in few scenarios

2017-02-02 Thread Tejas Patil (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tejas Patil updated SPARK-19326:

Description: 
Speculated copies of tasks do not get launched in some cases.

Examples:
- All the running executors have no CPU slots left to accommodate a speculated 
copy of the task(s). If the all running executors reside over a set of slow / 
bad hosts, they will keep the job running for long time
- `spark.task.cpus` > 1 and the running executor has not filled up all its CPU 
slots. Since the [speculated copies of tasks should run on different 
host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
 and not the host where the first copy was launched.

In both these cases, `ExecutorAllocationManager` does not know about pending 
speculation task attempts and thinks that all the resource demands are well 
taken care of. ([relevant 
code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])

This adds variation in the job completion times and more importantly SLA misses 
:( In prod, with a large number of jobs, I see this happening more often than 
one would think. Chasing the bad hosts or reason for slowness doesn't scale.

Here is a tiny repro. Note that you need to launch this with (Mesos or YARN or 
standalone deploy mode) along with `--conf spark.speculation=true --conf 
spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=100`

{code}
val n = 100
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 1) {
  Thread.sleep(Long.MaxValue)  // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
{code}


  was:
Speculated copies of tasks do not get launched in some cases.

Examples:
- All the running executors have no CPU slots left to accommodate a speculated 
copy of the task(s). If the all running executors reside over a set of slow / 
bad hosts, they will keep the job running for long time
- `spark.task.cpus` > 1 and the running executor has not filled up all its CPU 
slots. Since the [speculated copies of tasks should run on different 
host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
 and not the host where the first copy was launched.

In both these cases, `ExecutorAllocationManager` does not know about pending 
speculation task attempts and thinks that all the resource demands are well 
taken care of. ([relevant 
code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])

This adds variation in the job completion times and more importantly SLA misses 
:( In prod, with a large number of jobs, I see this happening more often than 
one would think. Chasing the bad hosts or reason for slowness doesn't scale.

Here is a tiny repro. Note that you need to launch this with (Mesos or YARN or 
standalone deploy mode) along with `--conf spark.speculation=true --conf 
spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=100`

{code}
val someRDD = sc.parallelize(1 to 8, 8)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 7) {
  Thread.sleep(Long.MaxValue)  // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
{code}



> Speculated task attempts do not get launched in few scenarios
> -
>
> Key: SPARK-19326
> URL: https://issues.apache.org/jira/browse/SPARK-19326
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Tejas Patil
>
> Speculated copies of tasks do not get launched in some cases.
> Examples:
> - All the running executors have no CPU slots left to accommodate a 
> speculated copy of the task(s). If the all running executors reside over a 
> set of slow / bad hosts, they will keep the job running for long time
> - `spark.task.cpus` > 1 and the running executor has not filled up all its 
> CPU slots. Since the [speculated copies of tasks should run on different 
> host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
>  and not the host where the first copy was launched.
> In both these cases, `ExecutorAllocationManager` does not know about pending 
> speculation task attempts and thinks that all the resource demands are well 
> taken care of. ([relevant 
> 

[jira] [Updated] (SPARK-19326) Speculated task attempts do not get launched in few scenarios

2017-02-02 Thread Tejas Patil (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tejas Patil updated SPARK-19326:

Description: 
Speculated copies of tasks do not get launched in some cases.

Examples:
- All the running executors have no CPU slots left to accommodate a speculated 
copy of the task(s). If the all running executors reside over a set of slow / 
bad hosts, they will keep the job running for long time
- `spark.task.cpus` > 1 and the running executor has not filled up all its CPU 
slots. Since the [speculated copies of tasks should run on different 
host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
 and not the host where the first copy was launched.

In both these cases, `ExecutorAllocationManager` does not know about pending 
speculation task attempts and thinks that all the resource demands are well 
taken care of. ([relevant 
code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])

This adds variation in the job completion times and more importantly SLA misses 
:( In prod, with a large number of jobs, I see this happening more often than 
one would think. Chasing the bad hosts or reason for slowness doesn't scale.

Here is a tiny repro. Note that you need to launch this with (Mesos or YARN or 
standalone deploy mode) along with `--conf spark.speculation=true --conf 
spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=100`

{code}
val someRDD = sc.parallelize(1 to 8, 8)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 7) {
  Thread.sleep(Long.MaxValue)  // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
{code}


  was:
Speculated copies of tasks do not get launched in some cases.

Examples:
- All the running executors have no CPU slots left to accommodate a speculated 
copy of the task(s). If the all running executors reside over a set of slow / 
bad hosts, they will keep the job running for long time
- `spark.task.cpus` > 1 and the running executor has not filled up all its CPU 
slots. Since the [speculated copies of tasks should run on different 
host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
 and not the host where the first copy was launched.

In both these cases, `ExecutorAllocationManager` does not know about pending 
speculation task attempts and thinks that all the resource demands are well 
taken care of. ([relevant 
code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])

This adds variation in the job completion times and more importantly SLA misses 
:( In prod, with a large number of jobs, I see this happening more often than 
one would think. Chasing the bad hosts or reason for slowness doesn't scale.

Here is a tiny repro. Note that you need to launch this with (Mesos or YARN or 
standalone deploy mode) along with `spark.speculation=true`

{code}
val someRDD = sc.parallelize(1 to 8, 8)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 7) {
  Thread.sleep(Long.MaxValue)  // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
{code}



> Speculated task attempts do not get launched in few scenarios
> -
>
> Key: SPARK-19326
> URL: https://issues.apache.org/jira/browse/SPARK-19326
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Tejas Patil
>
> Speculated copies of tasks do not get launched in some cases.
> Examples:
> - All the running executors have no CPU slots left to accommodate a 
> speculated copy of the task(s). If the all running executors reside over a 
> set of slow / bad hosts, they will keep the job running for long time
> - `spark.task.cpus` > 1 and the running executor has not filled up all its 
> CPU slots. Since the [speculated copies of tasks should run on different 
> host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
>  and not the host where the first copy was launched.
> In both these cases, `ExecutorAllocationManager` does not know about pending 
> speculation task attempts and thinks that all the resource demands are well 
> taken care of. ([relevant 
> code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])
> This adds 

[jira] [Updated] (SPARK-19326) Speculated task attempts do not get launched in few scenarios

2017-01-21 Thread Tejas Patil (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tejas Patil updated SPARK-19326:

Description: 
Speculated copies of tasks do not get launched in some cases.

Examples:
- All the running executors have no CPU slots left to accommodate a speculated 
copy of the task(s). If the all running executors reside over a set of slow / 
bad hosts, they will keep the job running for long time
- `spark.task.cpus` > 1 and the running executor has not filled up all its CPU 
slots. Since the [speculated copies of tasks should run on different 
host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
 and not the host where the first copy was launched.

In both these cases, `ExecutorAllocationManager` does not know about pending 
speculation task attempts and thinks that all the resource demands are well 
taken care of. ([relevant 
code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])

This adds variation in the job completion times and more importantly SLA misses 
:( In prod, with a large number of jobs, I see this happening more often than 
one would think. Chasing the bad hosts or reason for slowness doesn't scale.

Here is a tiny repro. Note that you need to launch this with (Mesos or YARN or 
standalone deploy mode) along with `spark.speculation=true`

{code}
val someRDD = sc.parallelize(1 to 8, 8)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 7) {
  Thread.sleep(Long.MaxValue)  // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
{code}


  was:
Speculated copies of tasks do not get launched in some cases.

Examples:
- All the running executors have no CPU slots left to accommodate a speculated 
copy of the task(s). If the all running executors reside over a set of slow / 
bad hosts, they will keep the job running for long time
- `spark.task.cpus` > 1 and the running executor has not filled up all its CPU 
slots. Since the [speculated copies of tasks should run on different 
host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
 and not the host where the first copy was launched.

In both these cases, `ExecutorAllocationManager` does not know about pending 
speculation task attempts and thinks that all the resource demands are well 
taken care of. ([relevant 
code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])

This adds variation in the job completion times and more importantly SLA misses 
:( In prod, with a large number of jobs, I see this happening more often than 
one would think. Chasing the bad hosts or reason for slowness doesn't scale.

Here is a tiny repro. Note that you need to launch this with (Mesos or YARN or 
standalone deploy mode) along with `spark.speculation=true`

{code}
val someRDD = sc.parallelize(1 to 8, 8)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 8) {
  Thread.sleep(Long.MaxValue)  // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
{code}



> Speculated task attempts do not get launched in few scenarios
> -
>
> Key: SPARK-19326
> URL: https://issues.apache.org/jira/browse/SPARK-19326
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Tejas Patil
>
> Speculated copies of tasks do not get launched in some cases.
> Examples:
> - All the running executors have no CPU slots left to accommodate a 
> speculated copy of the task(s). If the all running executors reside over a 
> set of slow / bad hosts, they will keep the job running for long time
> - `spark.task.cpus` > 1 and the running executor has not filled up all its 
> CPU slots. Since the [speculated copies of tasks should run on different 
> host|https://github.com/apache/spark/blob/2e139eed3194c7b8814ff6cf007d4e8a874c1e4d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L283]
>  and not the host where the first copy was launched.
> In both these cases, `ExecutorAllocationManager` does not know about pending 
> speculation task attempts and thinks that all the resource demands are well 
> taken care of. ([relevant 
> code|https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L265])
> This adds variation in the job completion times and more importantly SLA 
> misses :( In prod,