[ https://issues.apache.org/jira/browse/SPARK-40048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17578369#comment-17578369 ]
sam commented on SPARK-40048: ----------------------------- Also confirmed no eviction seems to be happening with https://stackoverflow.com/questions/69596441/is-there-any-stable-method-on-the-sparksession-sparkcontext-rdd-that-we-can-call Though hard to tell, since we only run this code at the _end_ of the job > Partitions are traversed multiple times invalidating Accumulator consistency > ---------------------------------------------------------------------------- > > Key: SPARK-40048 > URL: https://issues.apache.org/jira/browse/SPARK-40048 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.4 > Reporter: sam > Priority: Major > > We are trying to use Accumulators to count RDDs without having to force > `.count()` on them for efficiency reasons. We are aware tasks can fail and > re-run, which will invalidate the value of the accumulator, so we count the > number of times a partition has been traversed, so we can detect this. > The problem is that partitions are being traversed multiple times even though > - We cache the RDD in memory > - No tasks are failing, no executors are dying. > - There is plenty of memory (no RDD eviction) > The code we use: > ``` > val count: LongAccumulator > val partitionTraverseCounts: List[LongAccumulator] > def incrementTimesCalled(partitionIndex: Int): Unit = > partitionTraverseCounts(partitionIndex).add(1) > def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = { > incrementTimesCalled(index) > it.map { x => > increment() > x > } > } > ``` > How we use the above: > ``` > rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition) > ``` > We have a 50 partition RDD, and we frequently see odd traverse counts: > ``` > traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, > 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, > 2, 2, 2, 1, 2) > ``` > As you can see, some partitions are traversed twice, while others are > traversed only once. > To confirm no task failures: > ``` > cat job.log | grep -i task | grep -i fail > ``` > To confirm no memory issues: > ``` > cat job.log | grep -i memory > ``` > We see every log line has multiple GB memory free. > We also don't see any errors or exceptions. > Question: > 1. Why is spark traversing a cached RDD multiple times? > 2. Is there any way to disable this? > Many thanks, > Sam -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org