[ https://issues.apache.org/jira/browse/SPARK-40048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sam updated SPARK-40048: ------------------------ Description: We are trying to use Accumulators to count RDDs without having to force `.count()` on them for efficiency reasons. We are aware tasks can fail and re-run, which will invalidate the value of the accumulator, so we count the number of times a partition has been traversed, so we can detect this. The problem is that partitions are being traversed multiple times even though - We cache the RDD in memory _after we have applied the logic below_ - No tasks are failing, no executors are dying. - There is plenty of memory (no RDD eviction) The code we use: ``` {{val count: LongAccumulator val partitionTraverseCounts: List[LongAccumulator] def increment(): Unit = count.add(1) def incrementTimesCalled(partitionIndex: Int): Unit = partitionTraverseCounts(partitionIndex).add(1) def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = { incrementTimesCalled(index) it.map { x => increment() x } }}} ``` How we use the above: ``` rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition) ``` We have a 50 partition RDD, and we frequently see odd traverse counts: ``` traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 1, 2) ``` As you can see, some partitions are traversed twice, while others are traversed only once. To confirm no task failures: ``` cat job.log | grep -i task | grep -i fail ``` To confirm no memory issues: ``` cat job.log | grep -i memory ``` We see every log line has multiple GB memory free. We also don't see any errors or exceptions. Question: 1. Why is spark traversing a cached RDD multiple times? 2. Is there any way to disable this? Many thanks, Sam was: We are trying to use Accumulators to count RDDs without having to force `.count()` on them for efficiency reasons. We are aware tasks can fail and re-run, which will invalidate the value of the accumulator, so we count the number of times a partition has been traversed, so we can detect this. The problem is that partitions are being traversed multiple times even though - We cache the RDD in memory _after we have applied the logic below_ - No tasks are failing, no executors are dying. - There is plenty of memory (no RDD eviction) The code we use: ``` val count: LongAccumulator val partitionTraverseCounts: List[LongAccumulator] def increment(): Unit = count.add(1) def incrementTimesCalled(partitionIndex: Int): Unit = partitionTraverseCounts(partitionIndex).add(1) def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = { incrementTimesCalled(index) it.map { x => increment() x } } ``` How we use the above: ``` rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition) ``` We have a 50 partition RDD, and we frequently see odd traverse counts: ``` traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 1, 2) ``` As you can see, some partitions are traversed twice, while others are traversed only once. To confirm no task failures: ``` cat job.log | grep -i task | grep -i fail ``` To confirm no memory issues: ``` cat job.log | grep -i memory ``` We see every log line has multiple GB memory free. We also don't see any errors or exceptions. Question: 1. Why is spark traversing a cached RDD multiple times? 2. Is there any way to disable this? Many thanks, Sam > Partitions are traversed multiple times invalidating Accumulator consistency > ---------------------------------------------------------------------------- > > Key: SPARK-40048 > URL: https://issues.apache.org/jira/browse/SPARK-40048 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.4, 3.2.1 > Reporter: sam > Priority: Major > > We are trying to use Accumulators to count RDDs without having to force > `.count()` on them for efficiency reasons. We are aware tasks can fail and > re-run, which will invalidate the value of the accumulator, so we count the > number of times a partition has been traversed, so we can detect this. > The problem is that partitions are being traversed multiple times even though > - We cache the RDD in memory _after we have applied the logic below_ > - No tasks are failing, no executors are dying. > - There is plenty of memory (no RDD eviction) > The code we use: > ``` > {{val count: LongAccumulator > val partitionTraverseCounts: List[LongAccumulator] > def increment(): Unit = count.add(1) > def incrementTimesCalled(partitionIndex: Int): Unit = > partitionTraverseCounts(partitionIndex).add(1) > def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = { > incrementTimesCalled(index) > it.map { x => > increment() > x > } > }}} > ``` > How we use the above: > ``` > rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition) > ``` > We have a 50 partition RDD, and we frequently see odd traverse counts: > ``` > traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, > 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, > 2, 2, 2, 1, 2) > ``` > As you can see, some partitions are traversed twice, while others are > traversed only once. > To confirm no task failures: > ``` > cat job.log | grep -i task | grep -i fail > ``` > To confirm no memory issues: > ``` > cat job.log | grep -i memory > ``` > We see every log line has multiple GB memory free. > We also don't see any errors or exceptions. > Question: > 1. Why is spark traversing a cached RDD multiple times? > 2. Is there any way to disable this? > Many thanks, > Sam -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org