[ 
https://issues.apache.org/jira/browse/SPARK-40048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17578369#comment-17578369
 ] 

sam commented on SPARK-40048:
-----------------------------

Also confirmed no eviction seems to be happening with 
https://stackoverflow.com/questions/69596441/is-there-any-stable-method-on-the-sparksession-sparkcontext-rdd-that-we-can-call

Though hard to tell, since we only run this code at the _end_ of the job

> Partitions are traversed multiple times invalidating Accumulator consistency
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-40048
>                 URL: https://issues.apache.org/jira/browse/SPARK-40048
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.4
>            Reporter: sam
>            Priority: Major
>
> We are trying to use Accumulators to count RDDs without having to force 
> `.count()` on them for efficiency reasons.  We are aware tasks can fail and 
> re-run, which will invalidate the value of the accumulator, so we count the 
> number of times a partition has been traversed, so we can detect this.
> The problem is that partitions are being traversed multiple times even though
>  - We cache the RDD in memory
>  - No tasks are failing, no executors are dying.
>  - There is plenty of memory (no RDD eviction)
> The code we use:
> ```
> val count: LongAccumulator
> val partitionTraverseCounts: List[LongAccumulator]
> def incrementTimesCalled(partitionIndex: Int): Unit =
>       partitionTraverseCounts(partitionIndex).add(1)
> def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = {
>     incrementTimesCalled(index)
>     it.map { x =>
>       increment()
>       x
>     }
>   }
> ```
> How we use the above:
> ```
> rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition)
> ```
> We have a 50 partition RDD, and we frequently see odd traverse counts:
> ```
> traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 
> 2, 2, 2, 1, 2)
> ```
> As you can see, some partitions are traversed twice, while others are 
> traversed only once.
> To confirm no task failures:
> ```
> cat job.log | grep -i task | grep -i fail
> ```
> To confirm no memory issues:
> ```
> cat job.log | grep -i memory
> ```
> We see every log line has multiple GB memory free.
> We also don't see any errors or exceptions.
> Question:
> 1. Why is spark traversing a cached RDD multiple times?
> 2. Is there any way to disable this?
> Many thanks,
> Sam



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to