cloud-fan commented on code in PR #55371:
URL: https://github.com/apache/spark/pull/55371#discussion_r3137364745


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.util.AccumulatorContext.internOption
  */
 class SQLMetric(
     val metricType: String,
-    initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
+    val initValue: Long = 0L) extends AccumulatorV2[Long, Long] {

Review Comment:
   Follow-up on 
https://github.com/apache/spark/pull/55371#discussion_r3109651264: you 
mentioned the `val initValue` widening here "could actually just be reverted, I 
must have needed it earlier", but the revert is not in the current tree. 
Reverting `val initValue` back to `initValue` (plain constructor param) should 
still let `SQLLastAttemptMetric.copy` / `newDriverQueryExecutionAcc` compile, 
because Scala auto-synthesizes a private field for primary-constructor params 
when they're referenced outside the constructor body. Could you either apply 
the revert, or update the thread explaining that it's needed after all? 
Non-blocking — late catch on my side from the prior round.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLLastAttemptAccumulator.scala:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.metric
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.{LogEntry, Logging}
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.{BaseSubqueryExec, QueryExecution, 
SparkPlan, SubqueryAdaptiveBroadcastExec, SubqueryBroadcastExec, SubqueryExec, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
BroadcastExchangeLike, ReusedExchangeExec, ShuffleExchangeExec, 
ShuffleExchangeLike}
+import org.apache.spark.util.{AccumulatorV2, LastAttemptAccumulator}
+
+/*
+ * SQLLastAttemptAccumulator is a LastAttemptAccumulator that allows tracking 
the last attempt
+ * updates that happened in the scope of execution of a plan created by a 
specific Dataset's
+ * QueryExecution.
+ *
+ * Tracking RDDs belonging to a Dataset execution.
+ * -----------------------------------------------
+ * Dataset executes executedPlan from its QueryExecution. Each SparkPlan node 
in the
+ * executedPlan saves the RDD with its execution (executeRDD or 
executeColumnarRDD). However,
+ * the root RDD of a Spark Stage that actually gets submitted and executed is 
not necessarily
+ * that RDD. It may be an ephemeral RDD created on the fly when submitting the 
job, e.g.:
+ * - for result stages, there may be additional transformations to format the 
results, like
+ *   apply an Encoder (e.g. turn InternalRows into Rows)
+ *   or transformations for Arrow
+ * - for map stages, there may be some additional transformations to prepare 
the shuffle
+ *   data in correct format.
+ * - operations like dataframe caching may wrap the plan results to format and 
write to cache.
+ * We therefore cannot track the metrics updates just by RDD id. However, each 
SparkPlan also
+ * creates an RDDOperationScope, and wraps the execution it submits by that 
scope.
+ * The completed Tasks should have RDDOperationScope of the SparkPlan that 
submitted the
+ * Stage. We need to extract the RDDOperationScopes from 
Dataset.queryExecution.executedPlan
+ * to track last attempt metric updates coming from that execution.
+ *
+ * Additionally, it is possible that the same queryExecution.executedPlan is 
reused. For
+ * example, when collect() is called multiple times on the same Dataset.
+ * - Part of the execution (e.g. the shuffles) should then be reused. 
Accumulator should still
+ *   keep their partial values associated to its RDDOperationScope, and return 
it for this
+ *   new attempt.
+ * - Some of the execution (e.g. the result stage) may be recomputed. Since 
the SparkPlan will
+ *   be the same, RDDOperationScope will be the same, and this should become a 
newer execution
+ *   of the same RDD, which should replace the previous one.
+ *
+ * AQE plan changes
+ * ----------------
+ * AQE re-optimizes LogicalPlan and creates new SparkPlan. If the new plan 
doesn't contain
+ * some of the QueryStages from the previous plan, they can be cancelled while 
they already
+ * started running and accumulated some metric results.
+ * If the metric is part of SparkPlan.metrics, then the newly created plan 
will have new
+ * metrics and the old metrics would have been discarded; so nothing needs to 
be tracked here.
+ * But if the metric is coming from outside, it can be reused by the new 
SparkPlan.
+ * A new plan will have a new RDD and a new RDDOperationScope, so by tracking 
these for the
+ * final AQE plan, only values from the final plan and execution should be 
aggregated.
+ *
+ * It can also happen that the new AQE plan reuses SparkPlan instances from 
the old plan,
+ * see CancelShuffleStageInBroadcastJoin. However, in that case, the old plan 
will be put
+ * under some new plan in newly submitted Stages. Since we only truly track 
the plans that
+ * submit Stages, these should be different and enough to disambiguate.
+ *
+ * Driver only updates
+ * -------------------
+ * The metric can be updated directly on the driver side, during the execution 
of catalyst
+ * optimizer. One example is [[ConvertToLocalRelation]] optimization rule, 
which constant folds
+ * pieces of the plan.
+ * Execution in this scope is tagged with [[QueryExecution.id]] using
+ * [[SparkContext.DATASET_QUERY_EXECUTION_ID_KEY]] property, and this metric 
is tracking
+ * the metric value separately for each QueryExecution.
+ * Like with LastAttemptAccumulator, the metric will bail out if it's updated 
both from the driver
+ * and from executor Tasks.
+ *
+ * Cached / Checkpointed plans
+ * ---------------------------
+ * If the metric was used inside a cached (df.cache, df.persist) or 
checkpointed (df.checkpoint,
+ * df.localCheckpoint) plan, which is then turned into an RDDScanExec or 
InMemoryTableScanExec
+ * in the Dataset's executedPlan, [[lastAttemptValueForDataset]] and
+ * [[lastAttemptValueForQueryExecution]] are declared undefined behavior. In 
this case,
+ * [[lastAttemptValueForHighestRDDId()]] should be used instead, which returns 
the value from
+ * the execution in which the plan was cached/checkpointed.
+ *
+ * The main issue is if the metric is in the top stage of the cached plan. 
When that plan is
+ * executed in some Dataset (as lazy execution), the metric will be executed 
in the scope of the
+ * stage that contains the InMemoryTableScanExec / RDDScanExec, which will be 
some parent of that
+ * plan, and not plan of the cached plan. So if the cached plan is then used 
in another Dataset,
+ * that Dataset will not have information about that parent.
+ * There could be some hacks done to fix it by recording in the 
InMemoryRelation the scopes in
+ * which it was materialized. There are also other issues, like that 
checkpoint throws away the
+ * plan, so it would also have to record the RDD scopes used during 
checkpointing. This gets
+ * further complicated if recomputations are involved, and are done in yet 
another scope.
+ * It was declared undefined behavior instead of pursuing this.
+ */
+
+/**
+ * A trait that can be mixed into a subclass of [[AccumulatorV2]] to track the 
"logical"
+ * value of the "last attempt" of the execution using the accumulator.
+ * In addition to what [[LastAttemptAccumulator]] does, it allows tracking the 
last attempt
+ * executed in the scope of a Dataset's QueryExecution, via
+ * [[lastAttemptValueForDataset]] and [[lastAttemptValueForQueryExecution]] 
methods.
+ */
+trait SQLLastAttemptAccumulator[IN, OUT, PARTIAL, DRIVER_ACC]
+    extends LastAttemptAccumulator[IN, OUT, PARTIAL] {
+  this: AccumulatorV2[IN, OUT] =>
+
+  /** Create a fresh accumulator to hold driver-side values for one 
QueryExecution. */
+  protected def newDriverQueryExecutionAcc(): DRIVER_ACC
+  /** Add a value to a driver-side per-QueryExecution accumulator. */
+  protected def addToDriverAcc(acc: DRIVER_ACC, value: IN): Unit
+  /** Set the value of a driver-side per-QueryExecution accumulator. */
+  protected def setDriverAcc(acc: DRIVER_ACC, value: OUT): Unit
+  /** Read the value of a driver-side per-QueryExecution accumulator. */
+  protected def driverAccValue(acc: DRIVER_ACC): OUT
+
+  @transient
+  private var lastAttemptDirectDriverQueryExecutionValues: mutable.Map[String, 
DRIVER_ACC] = _
+
+  override def initializeLastAttemptAccumulator()(implicit ct: 
ClassTag[PARTIAL]): Unit = try {
+    super.initializeLastAttemptAccumulator()(ct)
+    lastAttemptDirectDriverQueryExecutionValues = new mutable.HashMap[String, 
DRIVER_ACC]()
+  } catch {
+    case NonFatal(e) =>
+      unexpectedLastAttemptMetricOperation(
+        invalidate = true,
+        reason = "Unexpected exception in initializeLastAttemptAccumulator",
+        exception = Some(e))
+  }
+
+  override def resetLastAttemptAccumulator(): Unit = try {
+    super.resetLastAttemptAccumulator()
+    lastAttemptDirectDriverQueryExecutionValues = new mutable.HashMap[String, 
DRIVER_ACC]()
+  } catch {
+    case NonFatal(e) =>
+      unexpectedLastAttemptMetricOperation(
+        invalidate = true,
+        reason = "Unexpected exception in resetLastAttemptAccumulator",
+        exception = Some(e))
+  }
+
+  override protected def assertValid() = {
+    super.assertValid()
+    assert(lastAttemptDirectDriverQueryExecutionValues != null)
+  }
+
+  protected def getOrCreateDirectDriverQueryExecutionValue(queryExecutionId: 
String): DRIVER_ACC = {
+    lastAttemptDirectDriverQueryExecutionValues.synchronized {
+      if 
(!lastAttemptDirectDriverQueryExecutionValues.contains(queryExecutionId)) {
+        lastAttemptDirectDriverQueryExecutionValues.put(
+          queryExecutionId, newDriverQueryExecutionAcc())
+      }
+      lastAttemptDirectDriverQueryExecutionValues(queryExecutionId)
+    }
+  }
+
+  protected def getActiveDatasetQueryExecutionId: Option[String] = {
+    SparkContext
+      .getActive
+      .flatMap(sc => 
Option(sc.getLocalProperty(SparkContext.DATASET_QUERY_EXECUTION_ID_KEY)))
+  }
+
+  /**
+   * Check if the value is added on the driver side, not from within a task.
+   * If it is set in the scope of a Dataset's QueryExecution, associate it 
with that scope.
+   * This must be called from `add` methods of any AccumulatorV2 subclass 
supporting
+   * SQL last attempt metrics to set what the `value` of the metric is after 
the operation.
+   * This should be called there after [[setValueIfOnDriverSide]].
+   */
+  protected def addQueryExecutionValueIfOnDriverSide(value: IN): Unit = try {
+    // Note: setValueIfOnDriverSide will already make it invalid if there are 
also RDD updates.
+    if (isAtDriverSide && lastAttemptAccumulatorInitialized && 
!lastAttemptAccumulatorInvalid) {
+      // Direct update on the driver, not from within a task.
+      getActiveDatasetQueryExecutionId match {
+        case Some(qeId) =>
+          addToDriverAcc(getOrCreateDirectDriverQueryExecutionValue(qeId), 
value)
+        case None => // pass
+      }
+    }
+  } catch {
+    case NonFatal(e) =>
+      unexpectedLastAttemptMetricOperation(
+        invalidate = true,
+        reason = "Unexpected exception in 
addQueryExecutionValueIfOnDriverSide",
+        exception = Some(e))
+  }
+
+  /**
+   * Like [[addQueryExecutionValueIfOnDriverSide]], but for set operations.
+   */
+  protected def setQueryExecutionValueIfOnDriverSide(value: OUT): Unit = try {
+    if (isAtDriverSide && lastAttemptAccumulatorInitialized && 
!lastAttemptAccumulatorInvalid) {
+      getActiveDatasetQueryExecutionId match {
+        case Some(qeId) =>
+          setDriverAcc(getOrCreateDirectDriverQueryExecutionValue(qeId), value)
+        case None => // pass
+      }
+    }
+  } catch {
+    case NonFatal(e) =>
+      unexpectedLastAttemptMetricOperation(
+        invalidate = true,
+        reason = "Unexpected exception in 
setQueryExecutionValueIfOnDriverSide",
+        exception = Some(e))
+  }
+
+  override def logAccumulatorState: LogEntry = try {
+    val driverQEVals = Option(lastAttemptDirectDriverQueryExecutionValues)
+      .map(_.map { case (key, acc) => s"$key -> ${driverAccValue(acc)}" 
}.mkString("\n"))
+      .getOrElse("<not initialized>")
+    super.logAccumulatorState +
+      log"""
+         |Direct driver QE values:
+         |${MDC(logKeyAccumulatorState, driverQEVals)}
+         """.stripMargin
+  } catch {
+    case NonFatal(e) =>
+      logWarning(log"Unexpected exception in logAccumulatorState", e)
+      log"<Unexpected exception in logAccumulatorState>"
+  }
+
+  /**
+   * Returns the last attempt value of this accumulator, aggregated from the 
last execution of this
+   * QueryExecution.
+   *
+   * @note The output of this method is undefined if this metric was used 
inside a part of the plan
+   *       which was either checkpointed (e.g. df.localCheckpoint(), 
df.checkpoint()) or cached
+   *       (e.g. df.cache(), df.persist()).
+   *       [[lastAttemptValueForHighestRDDId()]] should return the value from 
when the execution in
+   *       which the plan was cached/checkpointed.

Review Comment:
   Sentence fragment: "should return the value from when the execution in which 
the plan was cached/checkpointed." The file-header version at lines 94–97 was 
rewritten in this round to "should be used instead, which returns the value 
from the execution in which the plan was cached/checkpointed", but this method 
`@note` (and the identical one on `lastAttemptValueForDataset` at :276–277) 
wasn't updated in parallel. Late catch from prior round.
   ```suggestion
      *       [[lastAttemptValueForHighestRDDId()]] should be used instead, 
which returns the
      *       value from the execution in which the plan was 
cached/checkpointed.
   ```



##########
core/src/main/scala/org/apache/spark/util/LastAttemptAccumulator.scala:
##########
@@ -0,0 +1,867 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import scala.math.Ordering.Implicits._
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.{LogEntry, Logging, LogKey, LogKeys}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.TaskInfo
+
+/*
+ *  Last Attempt Accumulators are Accumulators that track the value of a 
metric aggregated across
+ *  the "last execution" that produced the values. "Last execution" can be 
defined as:
+ *  - For RDDs: the last execution of a given RDD partition, in the latest 
Stage and Stage attempt
+ *    that recomputed it.
+ *  - Across RDDs: lastAttemptValueForRDDId, lastAttemptValueForRDDIds, 
lastAttemptValueForAllRDDs,
+ *    lastAttemptValueForHighestRDDId let specify that only values from 
specific RDDs should be
+ *    aggregated.
+ *  - For Spark SQL Execution: In SQLLastAttemptAccumulator, 
lastAttemptValueForDataset,
+ *    lastAttemptValueForQueryExecution let specify that only values from the 
last SQL execution of
+ *    a specific Dataset (or QueryExecution) should be aggregated.
+ *
+ *  In specific situations the last attempt value cannot be computed. This is 
both because of known
+ *  specific user actions (e.g. mixing driver updates with task updates), and 
because the
+ *  accumulator performs (and logs) various internal sanity checks and bails 
out if it detects an
+ *  unexpected situation. Therefore, all the lastAttempt methods return an 
Option[OUT], where None
+ *  means that it has bailed out.
+ *
+ *  Updates to the accumulator from completed Tasks are merged in 
mergeLastAttempt, called from
+ *  DAGScheduler.updateAccumulators, called from 
DAGScheduler.handleTaskCompletion in the single
+ *  threaded DAGScheduler event loop. Therefore, we don't need to worry about 
concurrency control
+ *  when updating the accumulator values. However, reading of the last attempt 
value can potentially
+ *  be done concurrently, so we use synchronization. When there is normally no 
contention, JVM
+ *  synchronization should be very low overhead.
+ *
+ *  In order to be able to provide last attempt value, we need to keep track 
of partial metric
+ *  values, so that after a partial re-attempt the partial value can be 
updated, and then
+ *  re-aggregated.
+ *  There are various sources of re-attempts that we have to track:
+ *
+ *  1. Spark Core.
+ *  ==============
+ *    - Updates from failed tasks are filtered in 
Task.collectAccumulatorUpdates before they are
+ *      even passed back to the driver. We don't need to worry about them here.
+ *    - We should not get results from two successful attempts of a Task in 
the same Stage attempt.
+ *      TaskSetManager.handleSuccessfulTask ensures that.
+ *    - Therefore we only need to track Stage retries. The Last Attempt Metric 
will aggregate the
+ *      metric value of a given RDD partition from the last attempt of the 
Stage with the highest
+ *      stageId.
+ *      Normally recomputation creates a new stageAttemptId in the same Stage, 
but there can also
+ *      be multiple new Stages due to:
+ *      - In AQE, a materialized QueryStage is submitted as a new Stage, which 
would normally get
+ *        skipped, as it is already materialized. However, if results of that 
stage have been lost,
+ *        the recomputation will happen in that Stage.
+ *      - If the same Dataset with the same QueryExecution and same 
executedPlan is reused for
+ *        another execution (e.g. again calling collect()). All map stages 
should be materialized,
+ *        so like with AQE, they should be skipped, unless the results have 
been lost. Then,
+ *        recomputation will happen in that Stage. The result stage computing 
the action will be
+ *        fully re-executed.
+ *    - Due to the async nature of cancellation, there can be tasks from 
previous attempts that
+ *      arrive later than the last attempt. Therefore, we need to track and 
compare stageId and
+ *      stageAttemptId of every computed RDD partition, in order to discard 
late comers.

Review Comment:
   Minor: "late comers" is one word — "latecomers".
   ```suggestion
    *      stageAttemptId of every computed RDD partition, in order to discard 
latecomers.
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLLastAttemptAccumulator.scala:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.metric
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.{LogEntry, Logging}
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.{BaseSubqueryExec, QueryExecution, 
SparkPlan, SubqueryAdaptiveBroadcastExec, SubqueryBroadcastExec, SubqueryExec, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
BroadcastExchangeLike, ReusedExchangeExec, ShuffleExchangeExec, 
ShuffleExchangeLike}
+import org.apache.spark.util.{AccumulatorV2, LastAttemptAccumulator}
+
+/*
+ * SQLLastAttemptAccumulator is a LastAttemptAccumulator that allows tracking 
the last attempt
+ * updates that happened in the scope of execution of a plan created by a 
specific Dataset's
+ * QueryExecution.
+ *
+ * Tracking RDDs belonging to a Dataset execution.
+ * -----------------------------------------------
+ * Dataset executes executedPlan from its QueryExecution. Each SparkPlan node 
in the
+ * executedPlan saves the RDD with its execution (executeRDD or 
executeColumnarRDD). However,
+ * the root RDD of a Spark Stage that actually gets submitted and executed is 
not necessarily
+ * that RDD. It may be an ephemeral RDD created on the fly when submitting the 
job, e.g.:
+ * - for result stages, there may be additional transformations to format the 
results, like
+ *   apply an Encoder (e.g. turn InternalRows into Rows)
+ *   or transformations for Arrow
+ * - for map stages, there may be some additional transformations to prepare 
the shuffle
+ *   data in correct format.
+ * - operations like dataframe caching may wrap the plan results to format and 
write to cache.
+ * We therefore cannot track the metrics updates just by RDD id. However, each 
SparkPlan also
+ * creates an RDDOperationScope, and wraps the execution it submits by that 
scope.
+ * The completed Tasks should have RDDOperationScope of the SparkPlan that 
submitted the
+ * Stage. We need to extract the RDDOperationScopes from 
Dataset.queryExecution.executedPlan
+ * to track last attempt metric updates coming from that execution.
+ *
+ * Additionally, it is possible that the same queryExecution.executedPlan is 
reused. For
+ * example, when collect() is called multiple times on the same Dataset.
+ * - Part of the execution (e.g. the shuffles) should then be reused. 
Accumulator should still
+ *   keep their partial values associated to its RDDOperationScope, and return 
it for this
+ *   new attempt.
+ * - Some of the execution (e.g. the result stage) may be recomputed. Since 
the SparkPlan will
+ *   be the same, RDDOperationScope will be the same, and this should become a 
newer execution
+ *   of the same RDD, which should replace the previous one.
+ *
+ * AQE plan changes
+ * ----------------
+ * AQE re-optimizes LogicalPlan and creates new SparkPlan. If the new plan 
doesn't contain
+ * some of the QueryStages from the previous plan, they can be cancelled while 
they already
+ * started running and accumulated some metric results.
+ * If the metric is part of SparkPlan.metrics, then the newly created plan 
will have new
+ * metrics and the old metrics would have been discarded; so nothing needs to 
be tracked here.
+ * But if the metric is coming from outside, it can be reused by the new 
SparkPlan.
+ * A new plan will have a new RDD and a new RDDOperationScope, so by tracking 
these for the
+ * final AQE plan, only values from the final plan and execution should be 
aggregated.
+ *
+ * It can also happen that the new AQE plan reuses SparkPlan instances from 
the old plan,
+ * see CancelShuffleStageInBroadcastJoin. However, in that case, the old plan 
will be put
+ * under some new plan in newly submitted Stages. Since we only truly track 
the plans that
+ * submit Stages, these should be different and enough to disambiguate.
+ *
+ * Driver only updates
+ * -------------------
+ * The metric can be updated directly on the driver side, during the execution 
of catalyst
+ * optimizer. One example is [[ConvertToLocalRelation]] optimization rule, 
which constant folds
+ * pieces of the plan.
+ * Execution in this scope is tagged with [[QueryExecution.id]] using
+ * [[SparkContext.DATASET_QUERY_EXECUTION_ID_KEY]] property, and this metric 
is tracking
+ * the metric value separately for each QueryExecution.
+ * Like with LastAttemptAccumulator, the metric will bail out if it's updated 
both from the driver
+ * and from executor Tasks.
+ *
+ * Cached / Checkpointed plans
+ * ---------------------------
+ * If the metric was used inside a cached (df.cache, df.persist) or 
checkpointed (df.checkpoint,
+ * df.localCheckpoint) plan, which is then turned into an RDDScanExec or 
InMemoryTableScanExec
+ * in the Dataset's executedPlan, [[lastAttemptValueForDataset]] and
+ * [[lastAttemptValueForQueryExecution]] are declared undefined behavior. In 
this case,
+ * [[lastAttemptValueForHighestRDDId()]] should be used instead, which returns 
the value from
+ * the execution in which the plan was cached/checkpointed.
+ *
+ * The main issue is if the metric is in the top stage of the cached plan. 
When that plan is
+ * executed in some Dataset (as lazy execution), the metric will be executed 
in the scope of the
+ * stage that contains the InMemoryTableScanExec / RDDScanExec, which will be 
some parent of that
+ * plan, and not plan of the cached plan. So if the cached plan is then used 
in another Dataset,
+ * that Dataset will not have information about that parent.
+ * There could be some hacks done to fix it by recording in the 
InMemoryRelation the scopes in
+ * which it was materialized. There are also other issues, like that 
checkpoint throws away the
+ * plan, so it would also have to record the RDD scopes used during 
checkpointing. This gets
+ * further complicated if recomputations are involved, and are done in yet 
another scope.
+ * It was declared undefined behavior instead of pursuing this.
+ */
+
+/**
+ * A trait that can be mixed into a subclass of [[AccumulatorV2]] to track the 
"logical"
+ * value of the "last attempt" of the execution using the accumulator.
+ * In addition to what [[LastAttemptAccumulator]] does, it allows tracking the 
last attempt
+ * executed in the scope of a Dataset's QueryExecution, via
+ * [[lastAttemptValueForDataset]] and [[lastAttemptValueForQueryExecution]] 
methods.
+ */
+trait SQLLastAttemptAccumulator[IN, OUT, PARTIAL, DRIVER_ACC]
+    extends LastAttemptAccumulator[IN, OUT, PARTIAL] {
+  this: AccumulatorV2[IN, OUT] =>
+
+  /** Create a fresh accumulator to hold driver-side values for one 
QueryExecution. */
+  protected def newDriverQueryExecutionAcc(): DRIVER_ACC
+  /** Add a value to a driver-side per-QueryExecution accumulator. */
+  protected def addToDriverAcc(acc: DRIVER_ACC, value: IN): Unit
+  /** Set the value of a driver-side per-QueryExecution accumulator. */
+  protected def setDriverAcc(acc: DRIVER_ACC, value: OUT): Unit
+  /** Read the value of a driver-side per-QueryExecution accumulator. */
+  protected def driverAccValue(acc: DRIVER_ACC): OUT
+
+  @transient
+  private var lastAttemptDirectDriverQueryExecutionValues: mutable.Map[String, 
DRIVER_ACC] = _
+
+  override def initializeLastAttemptAccumulator()(implicit ct: 
ClassTag[PARTIAL]): Unit = try {
+    super.initializeLastAttemptAccumulator()(ct)
+    lastAttemptDirectDriverQueryExecutionValues = new mutable.HashMap[String, 
DRIVER_ACC]()
+  } catch {
+    case NonFatal(e) =>
+      unexpectedLastAttemptMetricOperation(
+        invalidate = true,
+        reason = "Unexpected exception in initializeLastAttemptAccumulator",
+        exception = Some(e))
+  }
+
+  override def resetLastAttemptAccumulator(): Unit = try {
+    super.resetLastAttemptAccumulator()
+    lastAttemptDirectDriverQueryExecutionValues = new mutable.HashMap[String, 
DRIVER_ACC]()
+  } catch {
+    case NonFatal(e) =>
+      unexpectedLastAttemptMetricOperation(
+        invalidate = true,
+        reason = "Unexpected exception in resetLastAttemptAccumulator",
+        exception = Some(e))
+  }
+
+  override protected def assertValid() = {
+    super.assertValid()
+    assert(lastAttemptDirectDriverQueryExecutionValues != null)
+  }
+
+  protected def getOrCreateDirectDriverQueryExecutionValue(queryExecutionId: 
String): DRIVER_ACC = {
+    lastAttemptDirectDriverQueryExecutionValues.synchronized {
+      if 
(!lastAttemptDirectDriverQueryExecutionValues.contains(queryExecutionId)) {
+        lastAttemptDirectDriverQueryExecutionValues.put(
+          queryExecutionId, newDriverQueryExecutionAcc())
+      }
+      lastAttemptDirectDriverQueryExecutionValues(queryExecutionId)
+    }
+  }
+
+  protected def getActiveDatasetQueryExecutionId: Option[String] = {
+    SparkContext
+      .getActive
+      .flatMap(sc => 
Option(sc.getLocalProperty(SparkContext.DATASET_QUERY_EXECUTION_ID_KEY)))
+  }
+
+  /**
+   * Check if the value is added on the driver side, not from within a task.
+   * If it is set in the scope of a Dataset's QueryExecution, associate it 
with that scope.
+   * This must be called from `add` methods of any AccumulatorV2 subclass 
supporting
+   * SQL last attempt metrics to set what the `value` of the metric is after 
the operation.
+   * This should be called there after [[setValueIfOnDriverSide]].
+   */
+  protected def addQueryExecutionValueIfOnDriverSide(value: IN): Unit = try {
+    // Note: setValueIfOnDriverSide will already make it invalid if there are 
also RDD updates.
+    if (isAtDriverSide && lastAttemptAccumulatorInitialized && 
!lastAttemptAccumulatorInvalid) {
+      // Direct update on the driver, not from within a task.
+      getActiveDatasetQueryExecutionId match {
+        case Some(qeId) =>
+          addToDriverAcc(getOrCreateDirectDriverQueryExecutionValue(qeId), 
value)
+        case None => // pass
+      }
+    }
+  } catch {
+    case NonFatal(e) =>
+      unexpectedLastAttemptMetricOperation(
+        invalidate = true,
+        reason = "Unexpected exception in 
addQueryExecutionValueIfOnDriverSide",
+        exception = Some(e))
+  }
+
+  /**
+   * Like [[addQueryExecutionValueIfOnDriverSide]], but for set operations.
+   */
+  protected def setQueryExecutionValueIfOnDriverSide(value: OUT): Unit = try {
+    if (isAtDriverSide && lastAttemptAccumulatorInitialized && 
!lastAttemptAccumulatorInvalid) {
+      getActiveDatasetQueryExecutionId match {
+        case Some(qeId) =>
+          setDriverAcc(getOrCreateDirectDriverQueryExecutionValue(qeId), value)
+        case None => // pass
+      }
+    }
+  } catch {
+    case NonFatal(e) =>
+      unexpectedLastAttemptMetricOperation(
+        invalidate = true,
+        reason = "Unexpected exception in 
setQueryExecutionValueIfOnDriverSide",
+        exception = Some(e))
+  }
+
+  override def logAccumulatorState: LogEntry = try {
+    val driverQEVals = Option(lastAttemptDirectDriverQueryExecutionValues)
+      .map(_.map { case (key, acc) => s"$key -> ${driverAccValue(acc)}" 
}.mkString("\n"))
+      .getOrElse("<not initialized>")
+    super.logAccumulatorState +
+      log"""
+         |Direct driver QE values:
+         |${MDC(logKeyAccumulatorState, driverQEVals)}
+         """.stripMargin
+  } catch {
+    case NonFatal(e) =>
+      logWarning(log"Unexpected exception in logAccumulatorState", e)
+      log"<Unexpected exception in logAccumulatorState>"
+  }
+
+  /**
+   * Returns the last attempt value of this accumulator, aggregated from the 
last execution of this
+   * QueryExecution.
+   *
+   * @note The output of this method is undefined if this metric was used 
inside a part of the plan
+   *       which was either checkpointed (e.g. df.localCheckpoint(), 
df.checkpoint()) or cached
+   *       (e.g. df.cache(), df.persist()).
+   *       [[lastAttemptValueForHighestRDDId()]] should return the value from 
when the execution in
+   *       which the plan was cached/checkpointed.
+   *
+   * @return None if the last attempt value cannot be established, Some(value) 
otherwise.
+   */
+  def lastAttemptValueForQueryExecution(qe: QueryExecution): Option[OUT] = {
+    if (lastAttemptAccumulatorInvalid) return None
+    assertValid()
+    // If there was a driver set value defined in the scope of this 
QueryExecution, return that.
+    lastAttemptDirectDriverQueryExecutionValues.get(qe.id.toString) match {
+      case Some(acc) => return Some(driverAccValue(acc))
+      case None => // pass
+    }
+    // Otherwise, gather the RDD scoped from the plan and find metric updates 
from these scopes.

Review Comment:
   Typo: `scoped` → `scopes` (the object being gathered is a `Seq[String]` of 
scope IDs).
   ```suggestion
       // Otherwise, gather the RDD scopes from the plan and find metric 
updates from these scopes.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to