Re: [SPARK-23207] Repro

2019-08-09 Thread Sean Owen
Interesting but I'd put this on the JIRA, and also test vs master
first. It's entirely possible this is something else that was
subsequently fixed, and maybe even backported for 2.4.4.
(I can't quite reproduce it - just makes the second job fail, which is
also puzzling)

On Fri, Aug 9, 2019 at 8:11 AM  wrote:
>
> Hi,
>
>
>
> We are able to reproduce this bug in Spark 2.4 using the following program:
>
>
>
> import scala.sys.process._
>
> import org.apache.spark.TaskContext
>
>
>
> val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, 
> x)}.repartition(20)
>
> res.distinct.count
>
>
>
> // kill an executor in the stage that performs repartition(239)
>
> val df = res.repartition(113).cache.repartition(239).map { x =>
>
>   if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1) {
>
> throw new Exception("pkill -f java".!!)
>
>   }
>
>   x
>
> }
>
> df.distinct.count()
>
>
>
> The first df.distinct.count correctly produces 1
>
> The second df.distinct.count incorrect produces 9769
>
>
>
> If the cache step is removed then the bug does not reproduce.
>
>
>
> Best regards,
>
> Tyson
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



RE: [SPARK-23207] Repro

2019-08-09 Thread tcondie
Hi Sean,

To finish the job, I did need to set spark.stage.maxConsecutiveAttempts to a 
large number e.g., 100; a suggestion from Jiang Xingbo.

I haven't seen any recent movement/PRs on this issue, but I'll see if we can 
repro with a more recent version of Spark. 

Best regards,
Tyson

-Original Message-
From: Sean Owen  
Sent: Friday, August 9, 2019 7:49 AM
To: tcon...@gmail.com
Cc: dev 
Subject: Re: [SPARK-23207] Repro

Interesting but I'd put this on the JIRA, and also test vs master first. It's 
entirely possible this is something else that was subsequently fixed, and maybe 
even backported for 2.4.4.
(I can't quite reproduce it - just makes the second job fail, which is also 
puzzling)

On Fri, Aug 9, 2019 at 8:11 AM  wrote:
>
> Hi,
>
>
>
> We are able to reproduce this bug in Spark 2.4 using the following program:
>
>
>
> import scala.sys.process._
>
> import org.apache.spark.TaskContext
>
>
>
> val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, 
> x)}.repartition(20)
>
> res.distinct.count
>
>
>
> // kill an executor in the stage that performs repartition(239)
>
> val df = res.repartition(113).cache.repartition(239).map { x =>
>
>   if (TaskContext.get.attemptNumber == 0 && 
> TaskContext.get.partitionId < 1) {
>
> throw new Exception("pkill -f java".!!)
>
>   }
>
>   x
>
> }
>
> df.distinct.count()
>
>
>
> The first df.distinct.count correctly produces 1
>
> The second df.distinct.count incorrect produces 9769
>
>
>
> If the cache step is removed then the bug does not reproduce.
>
>
>
> Best regards,
>
> Tyson
>
>


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [SPARK-23207] Repro

2019-08-10 Thread Xiao Li
Hi, Tyson,

Could you open a new JIRA with correctness label? SPARK-23207 might not
cover all the scenarios, especially when you using cache.

Cheers,

Xiao

On Fri, Aug 9, 2019 at 9:26 AM  wrote:

> Hi Sean,
>
> To finish the job, I did need to set spark.stage.maxConsecutiveAttempts to
> a large number e.g., 100; a suggestion from Jiang Xingbo.
>
> I haven't seen any recent movement/PRs on this issue, but I'll see if we
> can repro with a more recent version of Spark.
>
> Best regards,
> Tyson
>
> -Original Message-
> From: Sean Owen 
> Sent: Friday, August 9, 2019 7:49 AM
> To: tcon...@gmail.com
> Cc: dev 
> Subject: Re: [SPARK-23207] Repro
>
> Interesting but I'd put this on the JIRA, and also test vs master first.
> It's entirely possible this is something else that was subsequently fixed,
> and maybe even backported for 2.4.4.
> (I can't quite reproduce it - just makes the second job fail, which is
> also puzzling)
>
> On Fri, Aug 9, 2019 at 8:11 AM  wrote:
> >
> > Hi,
> >
> >
> >
> > We are able to reproduce this bug in Spark 2.4 using the following
> program:
> >
> >
> >
> > import scala.sys.process._
> >
> > import org.apache.spark.TaskContext
> >
> >
> >
> > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000,
> > x)}.repartition(20)
> >
> > res.distinct.count
> >
> >
> >
> > // kill an executor in the stage that performs repartition(239)
> >
> > val df = res.repartition(113).cache.repartition(239).map { x =>
> >
> >   if (TaskContext.get.attemptNumber == 0 &&
> > TaskContext.get.partitionId < 1) {
> >
> > throw new Exception("pkill -f java".!!)
> >
> >   }
> >
> >   x
> >
> > }
> >
> > df.distinct.count()
> >
> >
> >
> > The first df.distinct.count correctly produces 1
> >
> > The second df.distinct.count incorrect produces 9769
> >
> >
> >
> > If the cache step is removed then the bug does not reproduce.
> >
> >
> >
> > Best regards,
> >
> > Tyson
> >
> >
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
[image: Databricks Summit - Watch the talks]
<https://databricks.com/sparkaisummit/north-america>


Re: [SPARK-23207] Repro

2019-08-12 Thread Wenchen Fan
Hi Tyson,

Thanks for reporting it! I quickly checked the related scheduler code but
can't find an obvious place that can go wrong with cached RDD.

Sean said that he can't produce it, but the second job fails. This is
actually expected. We need a lot more changes to completely fix this
problem, so currently the fix is to fail the job if the scheduler needs to
retry an indeterminate shuffle map stage.

It would be great to know if we can reproduce this bug with the master
branch.

Thanks,
Wenchen

On Sun, Aug 11, 2019 at 7:22 AM Xiao Li  wrote:

> Hi, Tyson,
>
> Could you open a new JIRA with correctness label? SPARK-23207 might not
> cover all the scenarios, especially when you using cache.
>
> Cheers,
>
> Xiao
>
> On Fri, Aug 9, 2019 at 9:26 AM  wrote:
>
>> Hi Sean,
>>
>> To finish the job, I did need to set spark.stage.maxConsecutiveAttempts
>> to a large number e.g., 100; a suggestion from Jiang Xingbo.
>>
>> I haven't seen any recent movement/PRs on this issue, but I'll see if we
>> can repro with a more recent version of Spark.
>>
>> Best regards,
>> Tyson
>>
>> -Original Message-
>> From: Sean Owen 
>> Sent: Friday, August 9, 2019 7:49 AM
>> To: tcon...@gmail.com
>> Cc: dev 
>> Subject: Re: [SPARK-23207] Repro
>>
>> Interesting but I'd put this on the JIRA, and also test vs master first.
>> It's entirely possible this is something else that was subsequently fixed,
>> and maybe even backported for 2.4.4.
>> (I can't quite reproduce it - just makes the second job fail, which is
>> also puzzling)
>>
>> On Fri, Aug 9, 2019 at 8:11 AM  wrote:
>> >
>> > Hi,
>> >
>> >
>> >
>> > We are able to reproduce this bug in Spark 2.4 using the following
>> program:
>> >
>> >
>> >
>> > import scala.sys.process._
>> >
>> > import org.apache.spark.TaskContext
>> >
>> >
>> >
>> > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000,
>> > x)}.repartition(20)
>> >
>> > res.distinct.count
>> >
>> >
>> >
>> > // kill an executor in the stage that performs repartition(239)
>> >
>> > val df = res.repartition(113).cache.repartition(239).map { x =>
>> >
>> >   if (TaskContext.get.attemptNumber == 0 &&
>> > TaskContext.get.partitionId < 1) {
>> >
>> > throw new Exception("pkill -f java".!!)
>> >
>> >   }
>> >
>> >   x
>> >
>> > }
>> >
>> > df.distinct.count()
>> >
>> >
>> >
>> > The first df.distinct.count correctly produces 1
>> >
>> > The second df.distinct.count incorrect produces 9769
>> >
>> >
>> >
>> > If the cache step is removed then the bug does not reproduce.
>> >
>> >
>> >
>> > Best regards,
>> >
>> > Tyson
>> >
>> >
>>
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>
> --
> [image: Databricks Summit - Watch the talks]
> <https://databricks.com/sparkaisummit/north-america>
>


Re: [SPARK-23207] Repro

2019-08-12 Thread Yuanjian Li
Hi Tyson,

Thanks for the reporting!
I reproduced this locally based on your code with some changes, which only
keep the wrong answer job. The code as below:

import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).cache.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 &&
TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber
== 0) {
throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()

I think the reason for the wrong answer is, in the CachedRDDBuilder, we
miss tracking the `isOrderSensitive` characteristic to the newly created
MapPartitionsRDD. Jira created in:
https://issues.apache.org/jira/browse/SPARK-28699.
The fix will base on Wenchen's work SPARK-23243. Currently, we make the job
fail when we find an indeterminate stage retry. Feel free to have a review.

The support of Spark rerun the indeterminate stage will be done after
SPARK-25341 <https://issues.apache.org/jira/browse/SPARK-25341>. If you
need the indeterminate stage after cache operation right now, you can test
on this branch
<https://github.com/xuanyuanking/spark/tree/SPARK-28699-RERUN>.

Best,
Yuanjian

Wenchen Fan  于2019年8月12日周一 下午8:19写道:

> Hi Tyson,
>
> Thanks for reporting it! I quickly checked the related scheduler code but
> can't find an obvious place that can go wrong with cached RDD.
>
> Sean said that he can't produce it, but the second job fails. This is
> actually expected. We need a lot more changes to completely fix this
> problem, so currently the fix is to fail the job if the scheduler needs to
> retry an indeterminate shuffle map stage.
>
> It would be great to know if we can reproduce this bug with the master
> branch.
>
> Thanks,
> Wenchen
>
> On Sun, Aug 11, 2019 at 7:22 AM Xiao Li  wrote:
>
>> Hi, Tyson,
>>
>> Could you open a new JIRA with correctness label? SPARK-23207 might not
>> cover all the scenarios, especially when you using cache.
>>
>> Cheers,
>>
>> Xiao
>>
>> On Fri, Aug 9, 2019 at 9:26 AM  wrote:
>>
>>> Hi Sean,
>>>
>>> To finish the job, I did need to set spark.stage.maxConsecutiveAttempts
>>> to a large number e.g., 100; a suggestion from Jiang Xingbo.
>>>
>>> I haven't seen any recent movement/PRs on this issue, but I'll see if we
>>> can repro with a more recent version of Spark.
>>>
>>> Best regards,
>>> Tyson
>>>
>>> -Original Message-
>>> From: Sean Owen 
>>> Sent: Friday, August 9, 2019 7:49 AM
>>> To: tcon...@gmail.com
>>> Cc: dev 
>>> Subject: Re: [SPARK-23207] Repro
>>>
>>> Interesting but I'd put this on the JIRA, and also test vs master first.
>>> It's entirely possible this is something else that was subsequently fixed,
>>> and maybe even backported for 2.4.4.
>>> (I can't quite reproduce it - just makes the second job fail, which is
>>> also puzzling)
>>>
>>> On Fri, Aug 9, 2019 at 8:11 AM  wrote:
>>> >
>>> > Hi,
>>> >
>>> >
>>> >
>>> > We are able to reproduce this bug in Spark 2.4 using the following
>>> program:
>>> >
>>> >
>>> >
>>> > import scala.sys.process._
>>> >
>>> > import org.apache.spark.TaskContext
>>> >
>>> >
>>> >
>>> > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000,
>>> > x)}.repartition(20)
>>> >
>>> > res.distinct.count
>>> >
>>> >
>>> >
>>> > // kill an executor in the stage that performs repartition(239)
>>> >
>>> > val df = res.repartition(113).cache.repartition(239).map { x =>
>>> >
>>> >   if (TaskContext.get.attemptNumber == 0 &&
>>> > TaskContext.get.partitionId < 1) {
>>> >
>>> > throw new Exception("pkill -f java".!!)
>>> >
>>> >   }
>>> >
>>> >   x
>>> >
>>> > }
>>> >
>>> > df.distinct.count()
>>> >
>>> >
>>> >
>>> > The first df.distinct.count correctly produces 1
>>> >
>>> > The second df.distinct.count incorrect produces 9769
>>> >
>>> >
>>> >
>>> > If the cache step is removed then the bug does not reproduce.
>>> >
>>> >
>>> >
>>> > Best regards,
>>> >
>>> > Tyson
>>> >
>>> >
>>>
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>> [image: Databricks Summit - Watch the talks]
>> <https://databricks.com/sparkaisummit/north-america>
>>
>