[ 
https://issues.apache.org/jira/browse/SPARK-40048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17578447#comment-17578447
 ] 

sam commented on SPARK-40048:
-----------------------------

I've found a very dodgy hack around with this:

```
def forceCacheInDriver(numPartitions: Int)(implicit logLevel: LogLevel): RDD[T] 
= {
      Logger.info(s"Forcing the caching of RDD in drivers memory: ${rdd.name}")
      val collected = rdd.collect()
      Logger.info(s"Collected size: ${collected.length}, now making RDD from 
collected")
      rdd.sparkContext.makeRDD(collected, numSlices = numPartitions)
    }
```

So now when Spark refuses to properly cache an RDD into memory, I use this.  Of 
course this isn't desirable since we'll need to ensure the driver has enough 
memory to hold the entire RDD.

Would really love to know the reason why Spark refuses to cache my RDD 
properly!  It's especially confusing because in our case the RDDs very easily 
fit into memory, so can't believe it's got anything to do with eviction.

> Partitions are traversed multiple times invalidating Accumulator consistency
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-40048
>                 URL: https://issues.apache.org/jira/browse/SPARK-40048
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.4
>            Reporter: sam
>            Priority: Major
>
> We are trying to use Accumulators to count RDDs without having to force 
> `.count()` on them for efficiency reasons.  We are aware tasks can fail and 
> re-run, which will invalidate the value of the accumulator, so we count the 
> number of times a partition has been traversed, so we can detect this.
> The problem is that partitions are being traversed multiple times even though
>  - We cache the RDD in memory _after we have applied the logic below_
>  - No tasks are failing, no executors are dying.
>  - There is plenty of memory (no RDD eviction)
> The code we use:
> ```
> val count: LongAccumulator
> val partitionTraverseCounts: List[LongAccumulator]
> def incrementTimesCalled(partitionIndex: Int): Unit =
>       partitionTraverseCounts(partitionIndex).add(1)
> def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = {
>     incrementTimesCalled(index)
>     it.map { x =>
>       increment()
>       x
>     }
>   }
> ```
> How we use the above:
> ```
> rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition)
> ```
> We have a 50 partition RDD, and we frequently see odd traverse counts:
> ```
> traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 
> 2, 2, 2, 1, 2)
> ```
> As you can see, some partitions are traversed twice, while others are 
> traversed only once.
> To confirm no task failures:
> ```
> cat job.log | grep -i task | grep -i fail
> ```
> To confirm no memory issues:
> ```
> cat job.log | grep -i memory
> ```
> We see every log line has multiple GB memory free.
> We also don't see any errors or exceptions.
> Question:
> 1. Why is spark traversing a cached RDD multiple times?
> 2. Is there any way to disable this?
> Many thanks,
> Sam



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to