[ https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646825#comment-17646825 ]
Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 9:20 PM: ----------------------------------------------------------------------- > For example, a task is constructed by `rdd1.cache().rdd2`. So if the task > fails due to rdd2's computation, I think rdd1's cache should still be able to > reuse. We have three cases here. For some initial task T1: a) Computation of rdd2 within T1 should have no issues, since initial computation would result in caching it locally - and rdd2 computation is using the result of that local read iterator [1]. b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 will not use result of T1's cache, since it not marked as usable (even though the block might be still marked cached [2]). c) Replication and/or decom and T2 runs - same case as (b). [1] We should special case and allow reads from the same task which cached the block - even if it has not yet been marked usable. [2] When T1 failed, we would drop the blocks which got cached due to it. But even in the case of a race, the flag prevents the use of the cached block. was (Author: mridulm80): > For example, a task is constructed by `rdd1.cache().rdd2`. So if the task > fails due to rdd2's computation, I think rdd1's cache should still be able to > reuse. We have three cases here. For some initial task T1: a) Computation of rdd2 within T1 should have no issues, since initial computation would result in caching it locally - and rdd2 computation is using the result of that iterator [1]. b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 will not use result of T1's cache, since it not marked as usable (even though the block might be still marked cached [2]). c) Replication and/or decom and T2 runs - same case as (b). [1] We should special case and allow reads from the same task which cached the block - even if it has not yet been marked usable. [2] When T1 failed, we would drop the blocks which got cached due to it. But even in the case of a race, the flag prevents the use of the cached block. > Accumulator undercounting in the case of retry task with rdd cache > ------------------------------------------------------------------ > > Key: SPARK-41497 > URL: https://issues.apache.org/jira/browse/SPARK-41497 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1 > Reporter: wuyi > Priority: Major > > Accumulator could be undercounted when the retried task has rdd cache. See > the example below and you could also find the completed and reproducible > example at > [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc] > > {code:scala} > test("SPARK-XXX") { > // Set up a cluster with 2 executors > val conf = new SparkConf() > .setMaster("local-cluster[2, 1, > 1024]").setAppName("TaskSchedulerImplSuite") > sc = new SparkContext(conf) > // Set up a custom task scheduler. The scheduler will fail the first task > attempt of the job > // submitted below. In particular, the failed first attempt task would > success on computation > // (accumulator accounting, result caching) but only fail to report its > success status due > // to the concurrent executor lost. The second task attempt would success. > taskScheduler = setupSchedulerWithCustomStatusUpdate(sc) > val myAcc = sc.longAccumulator("myAcc") > // Initiate a rdd with only one partition so there's only one task and > specify the storage level > // with MEMORY_ONLY_2 so that the rdd result will be cached on both two > executors. > val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter => > myAcc.add(100) > iter.map(x => x + 1) > }.persist(StorageLevel.MEMORY_ONLY_2) > // This will pass since the second task attempt will succeed > assert(rdd.count() === 10) > // This will fail due to `myAcc.add(100)` won't be executed during the > second task attempt's > // execution. Because the second task attempt will load the rdd cache > directly instead of > // executing the task function so `myAcc.add(100)` is skipped. > assert(myAcc.value === 100) > } {code} > > We could also hit this issue with decommission even if the rdd only has one > copy. For example, decommission could migrate the rdd cache block to another > executor (the result is actually the same with 2 copies) and the > decommissioned executor lost before the task reports its success status to > the driver. > > And the issue is a bit more complicated than expected to fix. I have tried to > give some fixes but all of them are not ideal: > Option 1: Clean up any rdd cache related to the failed task: in practice, > this option can already fix the issue in most cases. However, theoretically, > rdd cache could be reported to the driver right after the driver cleans up > the failed task's caches due to asynchronous communication. So this option > can’t resolve the issue thoroughly; > Option 2: Disallow rdd cache reuse across the task attempts for the same > task: this option can 100% fix the issue. The problem is this way can also > affect the case where rdd cache can be reused across the attempts (e.g., when > there is no accumulator operation in the task), which can have perf > regression; > Option 3: Introduce accumulator cache: first, this requires a new framework > for supporting accumulator cache; second, the driver should improve its logic > to distinguish whether the accumulator cache value should be reported to the > user to avoid overcounting. For example, in the case of task retry, the value > should be reported. However, in the case of rdd cache reuse, the value > shouldn’t be reported (should it?); > Option 4: Do task success validation when a task trying to load the rdd > cache: this way defines a rdd cache is only valid/accessible if the task has > succeeded. This way could be either overkill or a bit complex (because > currently Spark would clean up the task state once it’s finished. So we need > to maintain a structure to know if task once succeeded or not. ) -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org