[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2023-01-02 Thread Juliusz Sompolski (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17653648#comment-17653648
 ] 

Juliusz Sompolski edited comment on SPARK-41497 at 1/2/23 4:22 PM:
---

Note that this issue leads to a correctness issue in Delta Merge, because it 
depends on a SetAccumulator as a side output channel for gathering files that 
need to be rewritten by the Merge: 
https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala#L445-L449
Delta assumes that Spark accumulators can overcount (in some cases where task 
retries update them in duplicate), but it was assumed that it should never 
undercount and lose output like that...

Missing some files there can result in duplicate records being inserted instead 
of existing records being updated.


was (Author: juliuszsompolski):
Note that this issue leads to a correctness issue in Delta Merge, because it 
depends on a SetAccumulator as a side output channel for gathering files that 
need to be rewritten by the Merge: 
https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala#L445-L449

Missing some files there can result in duplicate records being inserted instead 
of existing records being updated.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in 

[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-14 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646972#comment-17646972
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/14/22 8:03 AM:
---

[~Ngone51] Agree, that is what I was not sure of (whether we can detect this 
scenario about use of accumulators which might be updated subsequently). Note 
that updates to the same accumulator can happen before and after a cache in 
user code - so we might be able to only catch scenario when there are no 
accumulators.
If I am not wrong, SQL makes very heavy use of accumulators, and so most stages 
will end up having them anyway - right ?

I would expect this scenario (even without accumulator) to be fairly low 
frequency enough that the cost of extra recomputation might be fine.


was (Author: mridulm80):
[~Ngone51] Agree, that is what I was not sure of.
I would expect this scenario (even without accumulator) to be fairly low 
frequency enough that the cost of extra recomputation might be fine.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 

[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread wuyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646971#comment-17646971
 ] 

wuyi edited comment on SPARK-41497 at 12/14/22 7:31 AM:


I'm thinking if we could improve the improved Option 4 by changing the rdd 
cache reuse condition a bit:

 

if there are no accumulators (external only probably) values changed after the 
rdd computation, then the rdd's cache should be marked as usable/visible no 
matter whether the task succeeds or fail;

 

If there are accumulators values changed after the rdd computation, then the 
rdd's cache should only be marked as usable/visible only when the task succeeds.

 

(let me think further and see if it's doable..)


was (Author: ngone51):
I'm thinking if we could improve the improved Option 4 by changing the rdd 
cache reuse condition a bit:

 

if there're no accumulators (external only probably) values changed after the 
rdd computation, then the rdd's cache should be marked as usable/visible no 
matter whether the task succeeds or fail.

 

(let me think further and see if it's doable..)

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: 

[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 9:20 PM:
---

> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that local read iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.


was (Author: mridulm80):
> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in 

[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 9:20 PM:
---

> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that local read iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).

Probably 'usable' is incorrect term - 'visible' might be better ? That is, is 
this block visible to others (outside of the generating task).

[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.


was (Author: mridulm80):
> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that local read iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes 

[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-12 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646477#comment-17646477
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 7:41 AM:
---

Agree - there appears to be a bunch of scenarios where this can be triggered.
Essentially, whenever block save succeeds and task itself fails, we can end up 
with this scenario.
As detailed, this could be storage level with replication > 1, block 
decomissioning, etc - where executor or driver has replicated the data.
I think this can also happen when task itself fails - but after persisting the 
data (even without replication or decomm) - for example, due to some 
local/transient shuffle write issues (for example) or commitdenied, etc.

For the options listed:
Agree, option 1 does not solve the issue.
I am also not inclined towards option 2 due to the potential perf impact - 
though I would expect this to be a rare scenario.
Option 3 looks like a very involved approach, and I am not sure if we can cover 
all the corner cases.

I am wondering if we can modify option 4 such that it helps.
There are multiple approaches perhaps - one strawman proposal:
a) Add a bit to BlockStatus indicating whether block can be used or not. And 
currently this bit gets flipped when the task which computed it successfully 
completes.
b) Maintain a taskId -> BlockStatus* mapping - which is cleaned up whenever a 
task completes (if successful, then flip bit - else remove its blocks - and 
replicas (if any)).
c) Propagate taskId in reportBlockStatus from doPutIterator, etc - where new 
block is getting created in the system.

Thoughts ?


was (Author: mridulm80):
Agree - there appears to be a bunch of scenarios where this can be triggered.
Essentially, whenever block save succeeds and task itself fails, we can end up 
with this scenario.
As detailed, this could be storage level with replication > 1, block 
decomissioning, etc - where executor or driver has replicated the data.
I think this can also happen when task itself fails - but after persisting the 
data (even without replication or decomm) - for example, due to some 
local/transient shuffle write issues (for example).

For the options listed:
Agree, option 1 does not solve the issue.
I am also not inclined towards option 2 due to the potential perf impact - 
though I would expect this to be a rare scenario.
Option 3 looks like a very involved approach, and I am not sure if we can cover 
all the corner cases.

I am wondering if we can modify option 4 such that it helps.
There are multiple approaches perhaps - one strawman proposal:
a) Add a bit to BlockStatus indicating whether block can be used or not. And 
currently this bit gets flipped when the task which computed it successfully 
completes.
b) Maintain a taskId -> BlockStatus* mapping - which is cleaned up whenever a 
task completes (if successful, then flip bit - else remove its blocks - and 
replicas (if any)).
c) Propagate taskId in reportBlockStatus from doPutIterator, etc - where new 
block is getting created in the system.

Thoughts ?

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task