[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646825#comment-17646825
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 9:20 PM:
-----------------------------------------------------------------------

> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that local read iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.


was (Author: mridulm80):
> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.

> Accumulator undercounting in the case of retry task with rdd cache
> ------------------------------------------------------------------
>
>                 Key: SPARK-41497
>                 URL: https://issues.apache.org/jira/browse/SPARK-41497
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>            Reporter: wuyi
>            Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
>     .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
>     myAcc.add(100)
>     iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to