[ https://issues.apache.org/jira/browse/SPARK-40048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579231#comment-17579231 ]
sam commented on SPARK-40048: ----------------------------- [~hyukjin.kwon] Unfortunately bumping to 3.2.1 did not fix the issue: ``` traverseCounts: List(2, 2, 1, 1) ``` I didn't think 3.2.1 would have fixed this anyway because I think this is a Spark "feature" that was introduced around the time DataFrames, SparkSQL, etc was being introduced. My belief is that Spark is intentionally not caching some RDDs because it believes it's more efficient not to do so. Prior to SparkSQL, I remember Spark was much more deterministic in it's behaviour, and thus accumulators where much more reliable feature. > Partitions are traversed multiple times invalidating Accumulator consistency > ---------------------------------------------------------------------------- > > Key: SPARK-40048 > URL: https://issues.apache.org/jira/browse/SPARK-40048 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.4 > Reporter: sam > Priority: Major > > We are trying to use Accumulators to count RDDs without having to force > `.count()` on them for efficiency reasons. We are aware tasks can fail and > re-run, which will invalidate the value of the accumulator, so we count the > number of times a partition has been traversed, so we can detect this. > The problem is that partitions are being traversed multiple times even though > - We cache the RDD in memory _after we have applied the logic below_ > - No tasks are failing, no executors are dying. > - There is plenty of memory (no RDD eviction) > The code we use: > ``` > val count: LongAccumulator > val partitionTraverseCounts: List[LongAccumulator] > def increment(): Unit = count.add(1) > def incrementTimesCalled(partitionIndex: Int): Unit = > partitionTraverseCounts(partitionIndex).add(1) > def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = { > incrementTimesCalled(index) > it.map { x => > increment() > x > } > } > ``` > How we use the above: > ``` > rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition) > ``` > We have a 50 partition RDD, and we frequently see odd traverse counts: > ``` > traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, > 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, > 2, 2, 2, 1, 2) > ``` > As you can see, some partitions are traversed twice, while others are > traversed only once. > To confirm no task failures: > ``` > cat job.log | grep -i task | grep -i fail > ``` > To confirm no memory issues: > ``` > cat job.log | grep -i memory > ``` > We see every log line has multiple GB memory free. > We also don't see any errors or exceptions. > Question: > 1. Why is spark traversing a cached RDD multiple times? > 2. Is there any way to disable this? > Many thanks, > Sam -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org