Re: [SPARK-23207] Repro
Hi Tyson, Thanks for the reporting! I reproduced this locally based on your code with some changes, which only keep the wrong answer job. The code as below: import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() I think the reason for the wrong answer is, in the CachedRDDBuilder, we miss tracking the `isOrderSensitive` characteristic to the newly created MapPartitionsRDD. Jira created in: https://issues.apache.org/jira/browse/SPARK-28699. The fix will base on Wenchen's work SPARK-23243. Currently, we make the job fail when we find an indeterminate stage retry. Feel free to have a review. The support of Spark rerun the indeterminate stage will be done after SPARK-25341 <https://issues.apache.org/jira/browse/SPARK-25341>. If you need the indeterminate stage after cache operation right now, you can test on this branch <https://github.com/xuanyuanking/spark/tree/SPARK-28699-RERUN>. Best, Yuanjian Wenchen Fan 于2019年8月12日周一 下午8:19写道: > Hi Tyson, > > Thanks for reporting it! I quickly checked the related scheduler code but > can't find an obvious place that can go wrong with cached RDD. > > Sean said that he can't produce it, but the second job fails. This is > actually expected. We need a lot more changes to completely fix this > problem, so currently the fix is to fail the job if the scheduler needs to > retry an indeterminate shuffle map stage. > > It would be great to know if we can reproduce this bug with the master > branch. > > Thanks, > Wenchen > > On Sun, Aug 11, 2019 at 7:22 AM Xiao Li wrote: > >> Hi, Tyson, >> >> Could you open a new JIRA with correctness label? SPARK-23207 might not >> cover all the scenarios, especially when you using cache. >> >> Cheers, >> >> Xiao >> >> On Fri, Aug 9, 2019 at 9:26 AM wrote: >> >>> Hi Sean, >>> >>> To finish the job, I did need to set spark.stage.maxConsecutiveAttempts >>> to a large number e.g., 100; a suggestion from Jiang Xingbo. >>> >>> I haven't seen any recent movement/PRs on this issue, but I'll see if we >>> can repro with a more recent version of Spark. >>> >>> Best regards, >>> Tyson >>> >>> -Original Message- >>> From: Sean Owen >>> Sent: Friday, August 9, 2019 7:49 AM >>> To: tcon...@gmail.com >>> Cc: dev >>> Subject: Re: [SPARK-23207] Repro >>> >>> Interesting but I'd put this on the JIRA, and also test vs master first. >>> It's entirely possible this is something else that was subsequently fixed, >>> and maybe even backported for 2.4.4. >>> (I can't quite reproduce it - just makes the second job fail, which is >>> also puzzling) >>> >>> On Fri, Aug 9, 2019 at 8:11 AM wrote: >>> > >>> > Hi, >>> > >>> > >>> > >>> > We are able to reproduce this bug in Spark 2.4 using the following >>> program: >>> > >>> > >>> > >>> > import scala.sys.process._ >>> > >>> > import org.apache.spark.TaskContext >>> > >>> > >>> > >>> > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, >>> > x)}.repartition(20) >>> > >>> > res.distinct.count >>> > >>> > >>> > >>> > // kill an executor in the stage that performs repartition(239) >>> > >>> > val df = res.repartition(113).cache.repartition(239).map { x => >>> > >>> > if (TaskContext.get.attemptNumber == 0 && >>> > TaskContext.get.partitionId < 1) { >>> > >>> > throw new Exception("pkill -f java".!!) >>> > >>> > } >>> > >>> > x >>> > >>> > } >>> > >>> > df.distinct.count() >>> > >>> > >>> > >>> > The first df.distinct.count correctly produces 1 >>> > >>> > The second df.distinct.count incorrect produces 9769 >>> > >>> > >>> > >>> > If the cache step is removed then the bug does not reproduce. >>> > >>> > >>> > >>> > Best regards, >>> > >>> > Tyson >>> > >>> > >>> >>> >>> - >>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>> >>> >> >> -- >> [image: Databricks Summit - Watch the talks] >> <https://databricks.com/sparkaisummit/north-america> >> >
Re: [SPARK-23207] Repro
Hi Tyson, Thanks for reporting it! I quickly checked the related scheduler code but can't find an obvious place that can go wrong with cached RDD. Sean said that he can't produce it, but the second job fails. This is actually expected. We need a lot more changes to completely fix this problem, so currently the fix is to fail the job if the scheduler needs to retry an indeterminate shuffle map stage. It would be great to know if we can reproduce this bug with the master branch. Thanks, Wenchen On Sun, Aug 11, 2019 at 7:22 AM Xiao Li wrote: > Hi, Tyson, > > Could you open a new JIRA with correctness label? SPARK-23207 might not > cover all the scenarios, especially when you using cache. > > Cheers, > > Xiao > > On Fri, Aug 9, 2019 at 9:26 AM wrote: > >> Hi Sean, >> >> To finish the job, I did need to set spark.stage.maxConsecutiveAttempts >> to a large number e.g., 100; a suggestion from Jiang Xingbo. >> >> I haven't seen any recent movement/PRs on this issue, but I'll see if we >> can repro with a more recent version of Spark. >> >> Best regards, >> Tyson >> >> -Original Message- >> From: Sean Owen >> Sent: Friday, August 9, 2019 7:49 AM >> To: tcon...@gmail.com >> Cc: dev >> Subject: Re: [SPARK-23207] Repro >> >> Interesting but I'd put this on the JIRA, and also test vs master first. >> It's entirely possible this is something else that was subsequently fixed, >> and maybe even backported for 2.4.4. >> (I can't quite reproduce it - just makes the second job fail, which is >> also puzzling) >> >> On Fri, Aug 9, 2019 at 8:11 AM wrote: >> > >> > Hi, >> > >> > >> > >> > We are able to reproduce this bug in Spark 2.4 using the following >> program: >> > >> > >> > >> > import scala.sys.process._ >> > >> > import org.apache.spark.TaskContext >> > >> > >> > >> > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, >> > x)}.repartition(20) >> > >> > res.distinct.count >> > >> > >> > >> > // kill an executor in the stage that performs repartition(239) >> > >> > val df = res.repartition(113).cache.repartition(239).map { x => >> > >> > if (TaskContext.get.attemptNumber == 0 && >> > TaskContext.get.partitionId < 1) { >> > >> > throw new Exception("pkill -f java".!!) >> > >> > } >> > >> > x >> > >> > } >> > >> > df.distinct.count() >> > >> > >> > >> > The first df.distinct.count correctly produces 1 >> > >> > The second df.distinct.count incorrect produces 9769 >> > >> > >> > >> > If the cache step is removed then the bug does not reproduce. >> > >> > >> > >> > Best regards, >> > >> > Tyson >> > >> > >> >> >> - >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> >> > > -- > [image: Databricks Summit - Watch the talks] > <https://databricks.com/sparkaisummit/north-america> >
Re: [SPARK-23207] Repro
Hi, Tyson, Could you open a new JIRA with correctness label? SPARK-23207 might not cover all the scenarios, especially when you using cache. Cheers, Xiao On Fri, Aug 9, 2019 at 9:26 AM wrote: > Hi Sean, > > To finish the job, I did need to set spark.stage.maxConsecutiveAttempts to > a large number e.g., 100; a suggestion from Jiang Xingbo. > > I haven't seen any recent movement/PRs on this issue, but I'll see if we > can repro with a more recent version of Spark. > > Best regards, > Tyson > > -Original Message- > From: Sean Owen > Sent: Friday, August 9, 2019 7:49 AM > To: tcon...@gmail.com > Cc: dev > Subject: Re: [SPARK-23207] Repro > > Interesting but I'd put this on the JIRA, and also test vs master first. > It's entirely possible this is something else that was subsequently fixed, > and maybe even backported for 2.4.4. > (I can't quite reproduce it - just makes the second job fail, which is > also puzzling) > > On Fri, Aug 9, 2019 at 8:11 AM wrote: > > > > Hi, > > > > > > > > We are able to reproduce this bug in Spark 2.4 using the following > program: > > > > > > > > import scala.sys.process._ > > > > import org.apache.spark.TaskContext > > > > > > > > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, > > x)}.repartition(20) > > > > res.distinct.count > > > > > > > > // kill an executor in the stage that performs repartition(239) > > > > val df = res.repartition(113).cache.repartition(239).map { x => > > > > if (TaskContext.get.attemptNumber == 0 && > > TaskContext.get.partitionId < 1) { > > > > throw new Exception("pkill -f java".!!) > > > > } > > > > x > > > > } > > > > df.distinct.count() > > > > > > > > The first df.distinct.count correctly produces 1 > > > > The second df.distinct.count incorrect produces 9769 > > > > > > > > If the cache step is removed then the bug does not reproduce. > > > > > > > > Best regards, > > > > Tyson > > > > > > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > -- [image: Databricks Summit - Watch the talks] <https://databricks.com/sparkaisummit/north-america>
RE: [SPARK-23207] Repro
Hi Sean, To finish the job, I did need to set spark.stage.maxConsecutiveAttempts to a large number e.g., 100; a suggestion from Jiang Xingbo. I haven't seen any recent movement/PRs on this issue, but I'll see if we can repro with a more recent version of Spark. Best regards, Tyson -Original Message- From: Sean Owen Sent: Friday, August 9, 2019 7:49 AM To: tcon...@gmail.com Cc: dev Subject: Re: [SPARK-23207] Repro Interesting but I'd put this on the JIRA, and also test vs master first. It's entirely possible this is something else that was subsequently fixed, and maybe even backported for 2.4.4. (I can't quite reproduce it - just makes the second job fail, which is also puzzling) On Fri, Aug 9, 2019 at 8:11 AM wrote: > > Hi, > > > > We are able to reproduce this bug in Spark 2.4 using the following program: > > > > import scala.sys.process._ > > import org.apache.spark.TaskContext > > > > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, > x)}.repartition(20) > > res.distinct.count > > > > // kill an executor in the stage that performs repartition(239) > > val df = res.repartition(113).cache.repartition(239).map { x => > > if (TaskContext.get.attemptNumber == 0 && > TaskContext.get.partitionId < 1) { > > throw new Exception("pkill -f java".!!) > > } > > x > > } > > df.distinct.count() > > > > The first df.distinct.count correctly produces 1 > > The second df.distinct.count incorrect produces 9769 > > > > If the cache step is removed then the bug does not reproduce. > > > > Best regards, > > Tyson > > - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Re: [SPARK-23207] Repro
Interesting but I'd put this on the JIRA, and also test vs master first. It's entirely possible this is something else that was subsequently fixed, and maybe even backported for 2.4.4. (I can't quite reproduce it - just makes the second job fail, which is also puzzling) On Fri, Aug 9, 2019 at 8:11 AM wrote: > > Hi, > > > > We are able to reproduce this bug in Spark 2.4 using the following program: > > > > import scala.sys.process._ > > import org.apache.spark.TaskContext > > > > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, > x)}.repartition(20) > > res.distinct.count > > > > // kill an executor in the stage that performs repartition(239) > > val df = res.repartition(113).cache.repartition(239).map { x => > > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1) { > > throw new Exception("pkill -f java".!!) > > } > > x > > } > > df.distinct.count() > > > > The first df.distinct.count correctly produces 1 > > The second df.distinct.count incorrect produces 9769 > > > > If the cache step is removed then the bug does not reproduce. > > > > Best regards, > > Tyson > > - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
[SPARK-23207] Repro
Hi, We are able to reproduce this bug in Spark 2.4 using the following program: import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)}.repartition(20) res.distinct.count // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1) { throw new Exception("pkill -f java".!!) } x } df.distinct.count() The first df.distinct.count correctly produces 1 The second df.distinct.count incorrect produces 9769 If the cache step is removed then the bug does not reproduce. Best regards, Tyson