[ 
https://issues.apache.org/jira/browse/SPARK-40048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579214#comment-17579214
 ] 

Hyukjin Kwon commented on SPARK-40048:
--------------------------------------

Spark 2.4 is EOL. mind trying Spark 3.1+?

> Partitions are traversed multiple times invalidating Accumulator consistency
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-40048
>                 URL: https://issues.apache.org/jira/browse/SPARK-40048
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.4
>            Reporter: sam
>            Priority: Major
>
> We are trying to use Accumulators to count RDDs without having to force 
> `.count()` on them for efficiency reasons.  We are aware tasks can fail and 
> re-run, which will invalidate the value of the accumulator, so we count the 
> number of times a partition has been traversed, so we can detect this.
> The problem is that partitions are being traversed multiple times even though
>  - We cache the RDD in memory _after we have applied the logic below_
>  - No tasks are failing, no executors are dying.
>  - There is plenty of memory (no RDD eviction)
> The code we use:
> ```
> val count: LongAccumulator
> val partitionTraverseCounts: List[LongAccumulator]
> def incrementTimesCalled(partitionIndex: Int): Unit =
>       partitionTraverseCounts(partitionIndex).add(1)
> def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = {
>     incrementTimesCalled(index)
>     it.map { x =>
>       increment()
>       x
>     }
>   }
> ```
> How we use the above:
> ```
> rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition)
> ```
> We have a 50 partition RDD, and we frequently see odd traverse counts:
> ```
> traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 
> 2, 2, 2, 1, 2)
> ```
> As you can see, some partitions are traversed twice, while others are 
> traversed only once.
> To confirm no task failures:
> ```
> cat job.log | grep -i task | grep -i fail
> ```
> To confirm no memory issues:
> ```
> cat job.log | grep -i memory
> ```
> We see every log line has multiple GB memory free.
> We also don't see any errors or exceptions.
> Question:
> 1. Why is spark traversing a cached RDD multiple times?
> 2. Is there any way to disable this?
> Many thanks,
> Sam



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to