Hi Tyson,

Thanks for reporting it! I quickly checked the related scheduler code but
can't find an obvious place that can go wrong with cached RDD.

Sean said that he can't produce it, but the second job fails. This is
actually expected. We need a lot more changes to completely fix this
problem, so currently the fix is to fail the job if the scheduler needs to
retry an indeterminate shuffle map stage.

It would be great to know if we can reproduce this bug with the master
branch.

Thanks,
Wenchen

On Sun, Aug 11, 2019 at 7:22 AM Xiao Li <lix...@databricks.com> wrote:

> Hi, Tyson,
>
> Could you open a new JIRA with correctness label? SPARK-23207 might not
> cover all the scenarios, especially when you using cache.
>
> Cheers,
>
> Xiao
>
> On Fri, Aug 9, 2019 at 9:26 AM <tcon...@gmail.com> wrote:
>
>> Hi Sean,
>>
>> To finish the job, I did need to set spark.stage.maxConsecutiveAttempts
>> to a large number e.g., 100; a suggestion from Jiang Xingbo.
>>
>> I haven't seen any recent movement/PRs on this issue, but I'll see if we
>> can repro with a more recent version of Spark.
>>
>> Best regards,
>> Tyson
>>
>> -----Original Message-----
>> From: Sean Owen <sro...@gmail.com>
>> Sent: Friday, August 9, 2019 7:49 AM
>> To: tcon...@gmail.com
>> Cc: dev <dev@spark.apache.org>
>> Subject: Re: [SPARK-23207] Repro
>>
>> Interesting but I'd put this on the JIRA, and also test vs master first.
>> It's entirely possible this is something else that was subsequently fixed,
>> and maybe even backported for 2.4.4.
>> (I can't quite reproduce it - just makes the second job fail, which is
>> also puzzling)
>>
>> On Fri, Aug 9, 2019 at 8:11 AM <tcon...@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> >
>> >
>> > We are able to reproduce this bug in Spark 2.4 using the following
>> program:
>> >
>> >
>> >
>> > import scala.sys.process._
>> >
>> > import org.apache.spark.TaskContext
>> >
>> >
>> >
>> > val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000,
>> > x)}.repartition(20)
>> >
>> > res.distinct.count
>> >
>> >
>> >
>> > // kill an executor in the stage that performs repartition(239)
>> >
>> > val df = res.repartition(113).cache.repartition(239).map { x =>
>> >
>> >   if (TaskContext.get.attemptNumber == 0 &&
>> > TaskContext.get.partitionId < 1) {
>> >
>> >     throw new Exception("pkill -f java".!!)
>> >
>> >   }
>> >
>> >   x
>> >
>> > }
>> >
>> > df.distinct.count()
>> >
>> >
>> >
>> > The first df.distinct.count correctly produces 100000000
>> >
>> > The second df.distinct.count incorrect produces 99999769
>> >
>> >
>> >
>> > If the cache step is removed then the bug does not reproduce.
>> >
>> >
>> >
>> > Best regards,
>> >
>> > Tyson
>> >
>> >
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>
> --
> [image: Databricks Summit - Watch the talks]
> <https://databricks.com/sparkaisummit/north-america>
>

Reply via email to