[ 
https://issues.apache.org/jira/browse/SPARK-40048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam updated SPARK-40048:
------------------------
    Description: 
We are trying to use Accumulators to count RDDs without having to force 
`.count()` on them for efficiency reasons.  We are aware tasks can fail and 
re-run, which will invalidate the value of the accumulator, so we count the 
number of times a partition has been traversed, so we can detect this.

The problem is that partitions are being traversed multiple times even though

 - We cache the RDD in memory _after we have applied the logic below_
 - No tasks are failing, no executors are dying.
 - There is plenty of memory (no RDD eviction)

The code we use:

```
{{val count: LongAccumulator
val partitionTraverseCounts: List[LongAccumulator]

def increment(): Unit = count.add(1)

def incrementTimesCalled(partitionIndex: Int): Unit =
      partitionTraverseCounts(partitionIndex).add(1)

def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = {
    incrementTimesCalled(index)

    it.map { x =>
      increment()
      x
    }
  }}}
```

How we use the above:

```
rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition)
```


We have a 50 partition RDD, and we frequently see odd traverse counts:

```
traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 
2, 2, 2, 1, 2)
```

As you can see, some partitions are traversed twice, while others are traversed 
only once.

To confirm no task failures:

```
cat job.log | grep -i task | grep -i fail
```

To confirm no memory issues:

```
cat job.log | grep -i memory
```

We see every log line has multiple GB memory free.

We also don't see any errors or exceptions.

Question:

1. Why is spark traversing a cached RDD multiple times?
2. Is there any way to disable this?

Many thanks,

Sam


  was:
We are trying to use Accumulators to count RDDs without having to force 
`.count()` on them for efficiency reasons.  We are aware tasks can fail and 
re-run, which will invalidate the value of the accumulator, so we count the 
number of times a partition has been traversed, so we can detect this.

The problem is that partitions are being traversed multiple times even though

 - We cache the RDD in memory _after we have applied the logic below_
 - No tasks are failing, no executors are dying.
 - There is plenty of memory (no RDD eviction)

The code we use:

```
val count: LongAccumulator
val partitionTraverseCounts: List[LongAccumulator]

def increment(): Unit = count.add(1)

def incrementTimesCalled(partitionIndex: Int): Unit =
      partitionTraverseCounts(partitionIndex).add(1)

def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = {
    incrementTimesCalled(index)

    it.map { x =>
      increment()
      x
    }
  }
```

How we use the above:

```
rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition)
```


We have a 50 partition RDD, and we frequently see odd traverse counts:

```
traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 
2, 2, 2, 1, 2)
```

As you can see, some partitions are traversed twice, while others are traversed 
only once.

To confirm no task failures:

```
cat job.log | grep -i task | grep -i fail
```

To confirm no memory issues:

```
cat job.log | grep -i memory
```

We see every log line has multiple GB memory free.

We also don't see any errors or exceptions.

Question:

1. Why is spark traversing a cached RDD multiple times?
2. Is there any way to disable this?

Many thanks,

Sam



> Partitions are traversed multiple times invalidating Accumulator consistency
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-40048
>                 URL: https://issues.apache.org/jira/browse/SPARK-40048
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.4, 3.2.1
>            Reporter: sam
>            Priority: Major
>
> We are trying to use Accumulators to count RDDs without having to force 
> `.count()` on them for efficiency reasons.  We are aware tasks can fail and 
> re-run, which will invalidate the value of the accumulator, so we count the 
> number of times a partition has been traversed, so we can detect this.
> The problem is that partitions are being traversed multiple times even though
>  - We cache the RDD in memory _after we have applied the logic below_
>  - No tasks are failing, no executors are dying.
>  - There is plenty of memory (no RDD eviction)
> The code we use:
> ```
> {{val count: LongAccumulator
> val partitionTraverseCounts: List[LongAccumulator]
> def increment(): Unit = count.add(1)
> def incrementTimesCalled(partitionIndex: Int): Unit =
>       partitionTraverseCounts(partitionIndex).add(1)
> def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = {
>     incrementTimesCalled(index)
>     it.map { x =>
>       increment()
>       x
>     }
>   }}}
> ```
> How we use the above:
> ```
> rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition)
> ```
> We have a 50 partition RDD, and we frequently see odd traverse counts:
> ```
> traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 
> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 
> 2, 2, 2, 1, 2)
> ```
> As you can see, some partitions are traversed twice, while others are 
> traversed only once.
> To confirm no task failures:
> ```
> cat job.log | grep -i task | grep -i fail
> ```
> To confirm no memory issues:
> ```
> cat job.log | grep -i memory
> ```
> We see every log line has multiple GB memory free.
> We also don't see any errors or exceptions.
> Question:
> 1. Why is spark traversing a cached RDD multiple times?
> 2. Is there any way to disable this?
> Many thanks,
> Sam



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to