cloud-fan commented on code in PR #55585:
URL: https://github.com/apache/spark/pull/55585#discussion_r3160927423


##########
core/src/main/scala/org/apache/spark/util/LastAttemptAccumulator.scala:
##########
@@ -171,67 +171,170 @@ private class LastAttemptRDDVals[@specialized T](
   // however should not happen in practice and would likely produce other 
unexpected effects.
   var lastSqlExecutionId: Option[Long] = None
 
-  def numPartitions: Int = stageIds.length
+  // Common (stageId, stageAttemptId, taskAttemptNumber) shared by the 
majority of computed
+  // partitions. In the common case (no stage retries), every computed 
partition has the same
+  // attempt tuple, so we store it once at the RDD level instead of allocating 
three N-sized int
+  // arrays. The values are set on the first update and never changed; 
partitions whose attempt
+  // differs are recorded in the override arrays below.
+  // EMPTY_ID until the first update.
+  private var commonStageId: Int = EMPTY_ID
+  private var commonStageAttemptId: Int = EMPTY_ID
+  private var commonTaskAttemptNumber: Int = EMPTY_ID
+
+  // Bitmap of partitions that have been computed, one bit per partition 
packed into longs.
+  // A bit is set when a partition receives its first update; a partition with 
a clear bit has not
+  // been computed (e.g. early stop in take/limit, AQE task coalescing).
+  // Reads of an individual long are atomic on 64-bit JVMs, matching the loose 
concurrency
+  // semantics of the original per-partition int arrays.
+  private val computedBitmap: Array[Long] =
+    new Array[Long]((partitionPartialVals.length + 63) >>> 6)
+
+  // Per-partition override arrays for each component of the attempt tuple. 
Each is allocated
+  // lazily and independently the first time some partition's value for that 
component diverges
+  // from the common; until then the field is null and no per-partition state 
is kept for that
+  // component. Once allocated, an array is sized [[numPartitions]]: entries 
equal to EMPTY_ID
+  // mean "match the common value" and any other value is the per-partition 
override. This way:
+  //  - RDDs without retries pay zero per-partition allocations (all three 
fields stay null).
+  //  - A pure stage retry (new stageAttemptId, same stageId, 
taskAttemptNumber resets to 0)
+  //    allocates only [[overrideStageAttemptIds]].
+  //  - A mid-stage retry (executor lost, some tasks restart with a higher 
taskAttemptNumber)
+  //    allocates only [[overrideTaskAttemptNumbers]].
+  //  - Whole-stage cross-Stage retry (new stageId) allocates 
[[overrideStageIds]] too.
+  //
+  // Concurrency: update() is called only from the DAGScheduler scheduler 
loop. Some readers of
+  // the state can run concurrently (e.g. logAccumulatorState formatting). The 
fields are
+  // declared @volatile, and the new array is fully populated before the field 
is assigned, so a
+  // reader either sees null (use common) or sees an array whose Array.fill 
initialization is
+  // visible. In-place element writes for subsequent overrides are plain ints; 
readers may see
+  // them eventually, matching the loose semantics of the original 
per-partition int arrays.
+  @volatile private var overrideStageIds: Array[Int] = null
+  @volatile private var overrideStageAttemptIds: Array[Int] = null
+  @volatile private var overrideTaskAttemptNumbers: Array[Int] = null
+
+  def numPartitions: Int = partitionPartialVals.length
 
   def isEmptyAt(partitionId: Int): Boolean = {
-    if (stageIds(partitionId) == LastAttemptRDDVals.EMPTY_ID) {
-      assert(stageAttemptIds(partitionId) == LastAttemptRDDVals.EMPTY_ID)
-      assert(taskAttemptNumbers(partitionId) == LastAttemptRDDVals.EMPTY_ID)
-      true
+    val word = computedBitmap(partitionId >>> 6)
+    ((word >>> (partitionId & 63)) & 1L) == 0L
+  }
+
+  private def setComputedBit(partitionId: Int): Unit = {
+    val idx = partitionId >>> 6
+    computedBitmap(idx) = computedBitmap(idx) | (1L << (partitionId & 63))
+  }
+
+  /**
+   * Records a new value for one component (stageId / stageAttemptId / 
taskAttemptNumber) of the
+   * attempt tuple at `partitionId`, allocating the override array on first 
divergence. Returns
+   * the array reference the caller should write back to the @volatile field - 
either a freshly
+   * allocated and populated array (first override for this component) or the 
existing array
+   * after an in-place update. Once the array exists, the value is always 
written, even when it
+   * matches the common - lookupComponent returns it correctly either way.

Review Comment:
   Small wording: "the common" reads as an incomplete noun phrase. Suggest "the 
common value" — same issue at line 194 in the override-arrays comment block 
("diverges from the common"); line 196 already uses the full phrase, so this 
also makes the doc internally consistent.
   
   ```suggestion
      * after an in-place update. Once the array exists, the value is always 
written, even when it
      * matches the common value - lookupComponent returns it correctly either 
way.
   ```



##########
core/src/main/scala/org/apache/spark/util/LastAttemptAccumulator.scala:
##########
@@ -171,67 +171,170 @@ private class LastAttemptRDDVals[@specialized T](
   // however should not happen in practice and would likely produce other 
unexpected effects.
   var lastSqlExecutionId: Option[Long] = None
 
-  def numPartitions: Int = stageIds.length
+  // Common (stageId, stageAttemptId, taskAttemptNumber) shared by the 
majority of computed
+  // partitions. In the common case (no stage retries), every computed 
partition has the same
+  // attempt tuple, so we store it once at the RDD level instead of allocating 
three N-sized int
+  // arrays. The values are set on the first update and never changed; 
partitions whose attempt
+  // differs are recorded in the override arrays below.
+  // EMPTY_ID until the first update.
+  private var commonStageId: Int = EMPTY_ID
+  private var commonStageAttemptId: Int = EMPTY_ID
+  private var commonTaskAttemptNumber: Int = EMPTY_ID
+
+  // Bitmap of partitions that have been computed, one bit per partition 
packed into longs.
+  // A bit is set when a partition receives its first update; a partition with 
a clear bit has not
+  // been computed (e.g. early stop in take/limit, AQE task coalescing).
+  // Reads of an individual long are atomic on 64-bit JVMs, matching the loose 
concurrency
+  // semantics of the original per-partition int arrays.
+  private val computedBitmap: Array[Long] =
+    new Array[Long]((partitionPartialVals.length + 63) >>> 6)
+
+  // Per-partition override arrays for each component of the attempt tuple. 
Each is allocated
+  // lazily and independently the first time some partition's value for that 
component diverges
+  // from the common; until then the field is null and no per-partition state 
is kept for that
+  // component. Once allocated, an array is sized [[numPartitions]]: entries 
equal to EMPTY_ID
+  // mean "match the common value" and any other value is the per-partition 
override. This way:
+  //  - RDDs without retries pay zero per-partition allocations (all three 
fields stay null).
+  //  - A pure stage retry (new stageAttemptId, same stageId, 
taskAttemptNumber resets to 0)
+  //    allocates only [[overrideStageAttemptIds]].
+  //  - A mid-stage retry (executor lost, some tasks restart with a higher 
taskAttemptNumber)
+  //    allocates only [[overrideTaskAttemptNumbers]].
+  //  - Whole-stage cross-Stage retry (new stageId) allocates 
[[overrideStageIds]] too.
+  //
+  // Concurrency: update() is called only from the DAGScheduler scheduler 
loop. Some readers of
+  // the state can run concurrently (e.g. logAccumulatorState formatting). The 
fields are
+  // declared @volatile, and the new array is fully populated before the field 
is assigned, so a
+  // reader either sees null (use common) or sees an array whose Array.fill 
initialization is
+  // visible. In-place element writes for subsequent overrides are plain ints; 
readers may see
+  // them eventually, matching the loose semantics of the original 
per-partition int arrays.
+  @volatile private var overrideStageIds: Array[Int] = null
+  @volatile private var overrideStageAttemptIds: Array[Int] = null
+  @volatile private var overrideTaskAttemptNumbers: Array[Int] = null
+
+  def numPartitions: Int = partitionPartialVals.length
 
   def isEmptyAt(partitionId: Int): Boolean = {
-    if (stageIds(partitionId) == LastAttemptRDDVals.EMPTY_ID) {
-      assert(stageAttemptIds(partitionId) == LastAttemptRDDVals.EMPTY_ID)
-      assert(taskAttemptNumbers(partitionId) == LastAttemptRDDVals.EMPTY_ID)
-      true
+    val word = computedBitmap(partitionId >>> 6)
+    ((word >>> (partitionId & 63)) & 1L) == 0L
+  }
+
+  private def setComputedBit(partitionId: Int): Unit = {
+    val idx = partitionId >>> 6
+    computedBitmap(idx) = computedBitmap(idx) | (1L << (partitionId & 63))
+  }
+
+  /**
+   * Records a new value for one component (stageId / stageAttemptId / 
taskAttemptNumber) of the
+   * attempt tuple at `partitionId`, allocating the override array on first 
divergence. Returns
+   * the array reference the caller should write back to the @volatile field - 
either a freshly
+   * allocated and populated array (first override for this component) or the 
existing array
+   * after an in-place update. Once the array exists, the value is always 
written, even when it
+   * matches the common - lookupComponent returns it correctly either way.
+   */
+  private def setOverrideComponent(
+      array: Array[Int],
+      partitionId: Int,
+      value: Int,
+      common: Int): Array[Int] = {
+    if (array == null) {
+      if (value == common) null
+      else {
+        val newArr = Array.fill(partitionPartialVals.length)(EMPTY_ID)
+        newArr(partitionId) = value
+        newArr
+      }
     } else {
-      false
+      array(partitionId) = value
+      array
+    }
+  }
+
+  /** Reads one component's value at `partitionId`, falling back to `common` 
when the override
+   *  array is null or the entry is still EMPTY_ID (slot never written - 
typically a partition
+   *  that was computed before this component's override array was allocated). 
*/

Review Comment:
   The parenthetical mixes subjects ("slot never written … typically a 
partition…") and only covers one of the two cases that leave a slot at EMPTY_ID 
— the other being a partition that was not yet computed when this override 
array was first allocated for a different partition. Suggest:
   
   ```suggestion
     /** Reads one component's value at `partitionId`, falling back to `common` 
when the override
      *  array is null or the entry is still EMPTY_ID (the slot was either not 
yet written, or was
      *  initialized to EMPTY_ID and never overwritten because the partition's 
value matched the
      *  common when the array was first allocated for a different partition). 
*/
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to