cloud-fan commented on code in PR #55371:
URL: https://github.com/apache/spark/pull/55371#discussion_r3137364745
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.util.AccumulatorContext.internOption
*/
class SQLMetric(
val metricType: String,
- initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
+ val initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
Review Comment:
Follow-up on
https://github.com/apache/spark/pull/55371#discussion_r3109651264: you
mentioned the `val initValue` widening here "could actually just be reverted, I
must have needed it earlier", but the revert is not in the current tree.
Reverting `val initValue` back to `initValue` (plain constructor param) should
still let `SQLLastAttemptMetric.copy` / `newDriverQueryExecutionAcc` compile,
because Scala auto-synthesizes a private field for primary-constructor params
when they're referenced outside the constructor body. Could you either apply
the revert, or update the thread explaining that it's needed after all?
Non-blocking — late catch on my side from the prior round.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLLastAttemptAccumulator.scala:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.metric
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.{LogEntry, Logging}
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.{BaseSubqueryExec, QueryExecution,
SparkPlan, SubqueryAdaptiveBroadcastExec, SubqueryBroadcastExec, SubqueryExec,
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
BroadcastExchangeLike, ReusedExchangeExec, ShuffleExchangeExec,
ShuffleExchangeLike}
+import org.apache.spark.util.{AccumulatorV2, LastAttemptAccumulator}
+
+/*
+ * SQLLastAttemptAccumulator is a LastAttemptAccumulator that allows tracking
the last attempt
+ * updates that happened in the scope of execution of a plan created by a
specific Dataset's
+ * QueryExecution.
+ *
+ * Tracking RDDs belonging to a Dataset execution.
+ * -----------------------------------------------
+ * Dataset executes executedPlan from its QueryExecution. Each SparkPlan node
in the
+ * executedPlan saves the RDD with its execution (executeRDD or
executeColumnarRDD). However,
+ * the root RDD of a Spark Stage that actually gets submitted and executed is
not necessarily
+ * that RDD. It may be an ephemeral RDD created on the fly when submitting the
job, e.g.:
+ * - for result stages, there may be additional transformations to format the
results, like
+ * apply an Encoder (e.g. turn InternalRows into Rows)
+ * or transformations for Arrow
+ * - for map stages, there may be some additional transformations to prepare
the shuffle
+ * data in correct format.
+ * - operations like dataframe caching may wrap the plan results to format and
write to cache.
+ * We therefore cannot track the metrics updates just by RDD id. However, each
SparkPlan also
+ * creates an RDDOperationScope, and wraps the execution it submits by that
scope.
+ * The completed Tasks should have RDDOperationScope of the SparkPlan that
submitted the
+ * Stage. We need to extract the RDDOperationScopes from
Dataset.queryExecution.executedPlan
+ * to track last attempt metric updates coming from that execution.
+ *
+ * Additionally, it is possible that the same queryExecution.executedPlan is
reused. For
+ * example, when collect() is called multiple times on the same Dataset.
+ * - Part of the execution (e.g. the shuffles) should then be reused.
Accumulator should still
+ * keep their partial values associated to its RDDOperationScope, and return
it for this
+ * new attempt.
+ * - Some of the execution (e.g. the result stage) may be recomputed. Since
the SparkPlan will
+ * be the same, RDDOperationScope will be the same, and this should become a
newer execution
+ * of the same RDD, which should replace the previous one.
+ *
+ * AQE plan changes
+ * ----------------
+ * AQE re-optimizes LogicalPlan and creates new SparkPlan. If the new plan
doesn't contain
+ * some of the QueryStages from the previous plan, they can be cancelled while
they already
+ * started running and accumulated some metric results.
+ * If the metric is part of SparkPlan.metrics, then the newly created plan
will have new
+ * metrics and the old metrics would have been discarded; so nothing needs to
be tracked here.
+ * But if the metric is coming from outside, it can be reused by the new
SparkPlan.
+ * A new plan will have a new RDD and a new RDDOperationScope, so by tracking
these for the
+ * final AQE plan, only values from the final plan and execution should be
aggregated.
+ *
+ * It can also happen that the new AQE plan reuses SparkPlan instances from
the old plan,
+ * see CancelShuffleStageInBroadcastJoin. However, in that case, the old plan
will be put
+ * under some new plan in newly submitted Stages. Since we only truly track
the plans that
+ * submit Stages, these should be different and enough to disambiguate.
+ *
+ * Driver only updates
+ * -------------------
+ * The metric can be updated directly on the driver side, during the execution
of catalyst
+ * optimizer. One example is [[ConvertToLocalRelation]] optimization rule,
which constant folds
+ * pieces of the plan.
+ * Execution in this scope is tagged with [[QueryExecution.id]] using
+ * [[SparkContext.DATASET_QUERY_EXECUTION_ID_KEY]] property, and this metric
is tracking
+ * the metric value separately for each QueryExecution.
+ * Like with LastAttemptAccumulator, the metric will bail out if it's updated
both from the driver
+ * and from executor Tasks.
+ *
+ * Cached / Checkpointed plans
+ * ---------------------------
+ * If the metric was used inside a cached (df.cache, df.persist) or
checkpointed (df.checkpoint,
+ * df.localCheckpoint) plan, which is then turned into an RDDScanExec or
InMemoryTableScanExec
+ * in the Dataset's executedPlan, [[lastAttemptValueForDataset]] and
+ * [[lastAttemptValueForQueryExecution]] are declared undefined behavior. In
this case,
+ * [[lastAttemptValueForHighestRDDId()]] should be used instead, which returns
the value from
+ * the execution in which the plan was cached/checkpointed.
+ *
+ * The main issue is if the metric is in the top stage of the cached plan.
When that plan is
+ * executed in some Dataset (as lazy execution), the metric will be executed
in the scope of the
+ * stage that contains the InMemoryTableScanExec / RDDScanExec, which will be
some parent of that
+ * plan, and not plan of the cached plan. So if the cached plan is then used
in another Dataset,
+ * that Dataset will not have information about that parent.
+ * There could be some hacks done to fix it by recording in the
InMemoryRelation the scopes in
+ * which it was materialized. There are also other issues, like that
checkpoint throws away the
+ * plan, so it would also have to record the RDD scopes used during
checkpointing. This gets
+ * further complicated if recomputations are involved, and are done in yet
another scope.
+ * It was declared undefined behavior instead of pursuing this.
+ */
+
+/**
+ * A trait that can be mixed into a subclass of [[AccumulatorV2]] to track the
"logical"
+ * value of the "last attempt" of the execution using the accumulator.
+ * In addition to what [[LastAttemptAccumulator]] does, it allows tracking the
last attempt
+ * executed in the scope of a Dataset's QueryExecution, via
+ * [[lastAttemptValueForDataset]] and [[lastAttemptValueForQueryExecution]]
methods.
+ */
+trait SQLLastAttemptAccumulator[IN, OUT, PARTIAL, DRIVER_ACC]
+ extends LastAttemptAccumulator[IN, OUT, PARTIAL] {
+ this: AccumulatorV2[IN, OUT] =>
+
+ /** Create a fresh accumulator to hold driver-side values for one
QueryExecution. */
+ protected def newDriverQueryExecutionAcc(): DRIVER_ACC
+ /** Add a value to a driver-side per-QueryExecution accumulator. */
+ protected def addToDriverAcc(acc: DRIVER_ACC, value: IN): Unit
+ /** Set the value of a driver-side per-QueryExecution accumulator. */
+ protected def setDriverAcc(acc: DRIVER_ACC, value: OUT): Unit
+ /** Read the value of a driver-side per-QueryExecution accumulator. */
+ protected def driverAccValue(acc: DRIVER_ACC): OUT
+
+ @transient
+ private var lastAttemptDirectDriverQueryExecutionValues: mutable.Map[String,
DRIVER_ACC] = _
+
+ override def initializeLastAttemptAccumulator()(implicit ct:
ClassTag[PARTIAL]): Unit = try {
+ super.initializeLastAttemptAccumulator()(ct)
+ lastAttemptDirectDriverQueryExecutionValues = new mutable.HashMap[String,
DRIVER_ACC]()
+ } catch {
+ case NonFatal(e) =>
+ unexpectedLastAttemptMetricOperation(
+ invalidate = true,
+ reason = "Unexpected exception in initializeLastAttemptAccumulator",
+ exception = Some(e))
+ }
+
+ override def resetLastAttemptAccumulator(): Unit = try {
+ super.resetLastAttemptAccumulator()
+ lastAttemptDirectDriverQueryExecutionValues = new mutable.HashMap[String,
DRIVER_ACC]()
+ } catch {
+ case NonFatal(e) =>
+ unexpectedLastAttemptMetricOperation(
+ invalidate = true,
+ reason = "Unexpected exception in resetLastAttemptAccumulator",
+ exception = Some(e))
+ }
+
+ override protected def assertValid() = {
+ super.assertValid()
+ assert(lastAttemptDirectDriverQueryExecutionValues != null)
+ }
+
+ protected def getOrCreateDirectDriverQueryExecutionValue(queryExecutionId:
String): DRIVER_ACC = {
+ lastAttemptDirectDriverQueryExecutionValues.synchronized {
+ if
(!lastAttemptDirectDriverQueryExecutionValues.contains(queryExecutionId)) {
+ lastAttemptDirectDriverQueryExecutionValues.put(
+ queryExecutionId, newDriverQueryExecutionAcc())
+ }
+ lastAttemptDirectDriverQueryExecutionValues(queryExecutionId)
+ }
+ }
+
+ protected def getActiveDatasetQueryExecutionId: Option[String] = {
+ SparkContext
+ .getActive
+ .flatMap(sc =>
Option(sc.getLocalProperty(SparkContext.DATASET_QUERY_EXECUTION_ID_KEY)))
+ }
+
+ /**
+ * Check if the value is added on the driver side, not from within a task.
+ * If it is set in the scope of a Dataset's QueryExecution, associate it
with that scope.
+ * This must be called from `add` methods of any AccumulatorV2 subclass
supporting
+ * SQL last attempt metrics to set what the `value` of the metric is after
the operation.
+ * This should be called there after [[setValueIfOnDriverSide]].
+ */
+ protected def addQueryExecutionValueIfOnDriverSide(value: IN): Unit = try {
+ // Note: setValueIfOnDriverSide will already make it invalid if there are
also RDD updates.
+ if (isAtDriverSide && lastAttemptAccumulatorInitialized &&
!lastAttemptAccumulatorInvalid) {
+ // Direct update on the driver, not from within a task.
+ getActiveDatasetQueryExecutionId match {
+ case Some(qeId) =>
+ addToDriverAcc(getOrCreateDirectDriverQueryExecutionValue(qeId),
value)
+ case None => // pass
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ unexpectedLastAttemptMetricOperation(
+ invalidate = true,
+ reason = "Unexpected exception in
addQueryExecutionValueIfOnDriverSide",
+ exception = Some(e))
+ }
+
+ /**
+ * Like [[addQueryExecutionValueIfOnDriverSide]], but for set operations.
+ */
+ protected def setQueryExecutionValueIfOnDriverSide(value: OUT): Unit = try {
+ if (isAtDriverSide && lastAttemptAccumulatorInitialized &&
!lastAttemptAccumulatorInvalid) {
+ getActiveDatasetQueryExecutionId match {
+ case Some(qeId) =>
+ setDriverAcc(getOrCreateDirectDriverQueryExecutionValue(qeId), value)
+ case None => // pass
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ unexpectedLastAttemptMetricOperation(
+ invalidate = true,
+ reason = "Unexpected exception in
setQueryExecutionValueIfOnDriverSide",
+ exception = Some(e))
+ }
+
+ override def logAccumulatorState: LogEntry = try {
+ val driverQEVals = Option(lastAttemptDirectDriverQueryExecutionValues)
+ .map(_.map { case (key, acc) => s"$key -> ${driverAccValue(acc)}"
}.mkString("\n"))
+ .getOrElse("<not initialized>")
+ super.logAccumulatorState +
+ log"""
+ |Direct driver QE values:
+ |${MDC(logKeyAccumulatorState, driverQEVals)}
+ """.stripMargin
+ } catch {
+ case NonFatal(e) =>
+ logWarning(log"Unexpected exception in logAccumulatorState", e)
+ log"<Unexpected exception in logAccumulatorState>"
+ }
+
+ /**
+ * Returns the last attempt value of this accumulator, aggregated from the
last execution of this
+ * QueryExecution.
+ *
+ * @note The output of this method is undefined if this metric was used
inside a part of the plan
+ * which was either checkpointed (e.g. df.localCheckpoint(),
df.checkpoint()) or cached
+ * (e.g. df.cache(), df.persist()).
+ * [[lastAttemptValueForHighestRDDId()]] should return the value from
when the execution in
+ * which the plan was cached/checkpointed.
Review Comment:
Sentence fragment: "should return the value from when the execution in which
the plan was cached/checkpointed." The file-header version at lines 94–97 was
rewritten in this round to "should be used instead, which returns the value
from the execution in which the plan was cached/checkpointed", but this method
`@note` (and the identical one on `lastAttemptValueForDataset` at :276–277)
wasn't updated in parallel. Late catch from prior round.
```suggestion
* [[lastAttemptValueForHighestRDDId()]] should be used instead,
which returns the
* value from the execution in which the plan was
cached/checkpointed.
```
##########
core/src/main/scala/org/apache/spark/util/LastAttemptAccumulator.scala:
##########
@@ -0,0 +1,867 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import scala.math.Ordering.Implicits._
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.{LogEntry, Logging, LogKey, LogKeys}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.TaskInfo
+
+/*
+ * Last Attempt Accumulators are Accumulators that track the value of a
metric aggregated across
+ * the "last execution" that produced the values. "Last execution" can be
defined as:
+ * - For RDDs: the last execution of a given RDD partition, in the latest
Stage and Stage attempt
+ * that recomputed it.
+ * - Across RDDs: lastAttemptValueForRDDId, lastAttemptValueForRDDIds,
lastAttemptValueForAllRDDs,
+ * lastAttemptValueForHighestRDDId let specify that only values from
specific RDDs should be
+ * aggregated.
+ * - For Spark SQL Execution: In SQLLastAttemptAccumulator,
lastAttemptValueForDataset,
+ * lastAttemptValueForQueryExecution let specify that only values from the
last SQL execution of
+ * a specific Dataset (or QueryExecution) should be aggregated.
+ *
+ * In specific situations the last attempt value cannot be computed. This is
both because of known
+ * specific user actions (e.g. mixing driver updates with task updates), and
because the
+ * accumulator performs (and logs) various internal sanity checks and bails
out if it detects an
+ * unexpected situation. Therefore, all the lastAttempt methods return an
Option[OUT], where None
+ * means that it has bailed out.
+ *
+ * Updates to the accumulator from completed Tasks are merged in
mergeLastAttempt, called from
+ * DAGScheduler.updateAccumulators, called from
DAGScheduler.handleTaskCompletion in the single
+ * threaded DAGScheduler event loop. Therefore, we don't need to worry about
concurrency control
+ * when updating the accumulator values. However, reading of the last attempt
value can potentially
+ * be done concurrently, so we use synchronization. When there is normally no
contention, JVM
+ * synchronization should be very low overhead.
+ *
+ * In order to be able to provide last attempt value, we need to keep track
of partial metric
+ * values, so that after a partial re-attempt the partial value can be
updated, and then
+ * re-aggregated.
+ * There are various sources of re-attempts that we have to track:
+ *
+ * 1. Spark Core.
+ * ==============
+ * - Updates from failed tasks are filtered in
Task.collectAccumulatorUpdates before they are
+ * even passed back to the driver. We don't need to worry about them here.
+ * - We should not get results from two successful attempts of a Task in
the same Stage attempt.
+ * TaskSetManager.handleSuccessfulTask ensures that.
+ * - Therefore we only need to track Stage retries. The Last Attempt Metric
will aggregate the
+ * metric value of a given RDD partition from the last attempt of the
Stage with the highest
+ * stageId.
+ * Normally recomputation creates a new stageAttemptId in the same Stage,
but there can also
+ * be multiple new Stages due to:
+ * - In AQE, a materialized QueryStage is submitted as a new Stage, which
would normally get
+ * skipped, as it is already materialized. However, if results of that
stage have been lost,
+ * the recomputation will happen in that Stage.
+ * - If the same Dataset with the same QueryExecution and same
executedPlan is reused for
+ * another execution (e.g. again calling collect()). All map stages
should be materialized,
+ * so like with AQE, they should be skipped, unless the results have
been lost. Then,
+ * recomputation will happen in that Stage. The result stage computing
the action will be
+ * fully re-executed.
+ * - Due to the async nature of cancellation, there can be tasks from
previous attempts that
+ * arrive later than the last attempt. Therefore, we need to track and
compare stageId and
+ * stageAttemptId of every computed RDD partition, in order to discard
late comers.
Review Comment:
Minor: "late comers" is one word — "latecomers".
```suggestion
* stageAttemptId of every computed RDD partition, in order to discard
latecomers.
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLLastAttemptAccumulator.scala:
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.metric
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.{LogEntry, Logging}
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.{BaseSubqueryExec, QueryExecution,
SparkPlan, SubqueryAdaptiveBroadcastExec, SubqueryBroadcastExec, SubqueryExec,
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
BroadcastExchangeLike, ReusedExchangeExec, ShuffleExchangeExec,
ShuffleExchangeLike}
+import org.apache.spark.util.{AccumulatorV2, LastAttemptAccumulator}
+
+/*
+ * SQLLastAttemptAccumulator is a LastAttemptAccumulator that allows tracking
the last attempt
+ * updates that happened in the scope of execution of a plan created by a
specific Dataset's
+ * QueryExecution.
+ *
+ * Tracking RDDs belonging to a Dataset execution.
+ * -----------------------------------------------
+ * Dataset executes executedPlan from its QueryExecution. Each SparkPlan node
in the
+ * executedPlan saves the RDD with its execution (executeRDD or
executeColumnarRDD). However,
+ * the root RDD of a Spark Stage that actually gets submitted and executed is
not necessarily
+ * that RDD. It may be an ephemeral RDD created on the fly when submitting the
job, e.g.:
+ * - for result stages, there may be additional transformations to format the
results, like
+ * apply an Encoder (e.g. turn InternalRows into Rows)
+ * or transformations for Arrow
+ * - for map stages, there may be some additional transformations to prepare
the shuffle
+ * data in correct format.
+ * - operations like dataframe caching may wrap the plan results to format and
write to cache.
+ * We therefore cannot track the metrics updates just by RDD id. However, each
SparkPlan also
+ * creates an RDDOperationScope, and wraps the execution it submits by that
scope.
+ * The completed Tasks should have RDDOperationScope of the SparkPlan that
submitted the
+ * Stage. We need to extract the RDDOperationScopes from
Dataset.queryExecution.executedPlan
+ * to track last attempt metric updates coming from that execution.
+ *
+ * Additionally, it is possible that the same queryExecution.executedPlan is
reused. For
+ * example, when collect() is called multiple times on the same Dataset.
+ * - Part of the execution (e.g. the shuffles) should then be reused.
Accumulator should still
+ * keep their partial values associated to its RDDOperationScope, and return
it for this
+ * new attempt.
+ * - Some of the execution (e.g. the result stage) may be recomputed. Since
the SparkPlan will
+ * be the same, RDDOperationScope will be the same, and this should become a
newer execution
+ * of the same RDD, which should replace the previous one.
+ *
+ * AQE plan changes
+ * ----------------
+ * AQE re-optimizes LogicalPlan and creates new SparkPlan. If the new plan
doesn't contain
+ * some of the QueryStages from the previous plan, they can be cancelled while
they already
+ * started running and accumulated some metric results.
+ * If the metric is part of SparkPlan.metrics, then the newly created plan
will have new
+ * metrics and the old metrics would have been discarded; so nothing needs to
be tracked here.
+ * But if the metric is coming from outside, it can be reused by the new
SparkPlan.
+ * A new plan will have a new RDD and a new RDDOperationScope, so by tracking
these for the
+ * final AQE plan, only values from the final plan and execution should be
aggregated.
+ *
+ * It can also happen that the new AQE plan reuses SparkPlan instances from
the old plan,
+ * see CancelShuffleStageInBroadcastJoin. However, in that case, the old plan
will be put
+ * under some new plan in newly submitted Stages. Since we only truly track
the plans that
+ * submit Stages, these should be different and enough to disambiguate.
+ *
+ * Driver only updates
+ * -------------------
+ * The metric can be updated directly on the driver side, during the execution
of catalyst
+ * optimizer. One example is [[ConvertToLocalRelation]] optimization rule,
which constant folds
+ * pieces of the plan.
+ * Execution in this scope is tagged with [[QueryExecution.id]] using
+ * [[SparkContext.DATASET_QUERY_EXECUTION_ID_KEY]] property, and this metric
is tracking
+ * the metric value separately for each QueryExecution.
+ * Like with LastAttemptAccumulator, the metric will bail out if it's updated
both from the driver
+ * and from executor Tasks.
+ *
+ * Cached / Checkpointed plans
+ * ---------------------------
+ * If the metric was used inside a cached (df.cache, df.persist) or
checkpointed (df.checkpoint,
+ * df.localCheckpoint) plan, which is then turned into an RDDScanExec or
InMemoryTableScanExec
+ * in the Dataset's executedPlan, [[lastAttemptValueForDataset]] and
+ * [[lastAttemptValueForQueryExecution]] are declared undefined behavior. In
this case,
+ * [[lastAttemptValueForHighestRDDId()]] should be used instead, which returns
the value from
+ * the execution in which the plan was cached/checkpointed.
+ *
+ * The main issue is if the metric is in the top stage of the cached plan.
When that plan is
+ * executed in some Dataset (as lazy execution), the metric will be executed
in the scope of the
+ * stage that contains the InMemoryTableScanExec / RDDScanExec, which will be
some parent of that
+ * plan, and not plan of the cached plan. So if the cached plan is then used
in another Dataset,
+ * that Dataset will not have information about that parent.
+ * There could be some hacks done to fix it by recording in the
InMemoryRelation the scopes in
+ * which it was materialized. There are also other issues, like that
checkpoint throws away the
+ * plan, so it would also have to record the RDD scopes used during
checkpointing. This gets
+ * further complicated if recomputations are involved, and are done in yet
another scope.
+ * It was declared undefined behavior instead of pursuing this.
+ */
+
+/**
+ * A trait that can be mixed into a subclass of [[AccumulatorV2]] to track the
"logical"
+ * value of the "last attempt" of the execution using the accumulator.
+ * In addition to what [[LastAttemptAccumulator]] does, it allows tracking the
last attempt
+ * executed in the scope of a Dataset's QueryExecution, via
+ * [[lastAttemptValueForDataset]] and [[lastAttemptValueForQueryExecution]]
methods.
+ */
+trait SQLLastAttemptAccumulator[IN, OUT, PARTIAL, DRIVER_ACC]
+ extends LastAttemptAccumulator[IN, OUT, PARTIAL] {
+ this: AccumulatorV2[IN, OUT] =>
+
+ /** Create a fresh accumulator to hold driver-side values for one
QueryExecution. */
+ protected def newDriverQueryExecutionAcc(): DRIVER_ACC
+ /** Add a value to a driver-side per-QueryExecution accumulator. */
+ protected def addToDriverAcc(acc: DRIVER_ACC, value: IN): Unit
+ /** Set the value of a driver-side per-QueryExecution accumulator. */
+ protected def setDriverAcc(acc: DRIVER_ACC, value: OUT): Unit
+ /** Read the value of a driver-side per-QueryExecution accumulator. */
+ protected def driverAccValue(acc: DRIVER_ACC): OUT
+
+ @transient
+ private var lastAttemptDirectDriverQueryExecutionValues: mutable.Map[String,
DRIVER_ACC] = _
+
+ override def initializeLastAttemptAccumulator()(implicit ct:
ClassTag[PARTIAL]): Unit = try {
+ super.initializeLastAttemptAccumulator()(ct)
+ lastAttemptDirectDriverQueryExecutionValues = new mutable.HashMap[String,
DRIVER_ACC]()
+ } catch {
+ case NonFatal(e) =>
+ unexpectedLastAttemptMetricOperation(
+ invalidate = true,
+ reason = "Unexpected exception in initializeLastAttemptAccumulator",
+ exception = Some(e))
+ }
+
+ override def resetLastAttemptAccumulator(): Unit = try {
+ super.resetLastAttemptAccumulator()
+ lastAttemptDirectDriverQueryExecutionValues = new mutable.HashMap[String,
DRIVER_ACC]()
+ } catch {
+ case NonFatal(e) =>
+ unexpectedLastAttemptMetricOperation(
+ invalidate = true,
+ reason = "Unexpected exception in resetLastAttemptAccumulator",
+ exception = Some(e))
+ }
+
+ override protected def assertValid() = {
+ super.assertValid()
+ assert(lastAttemptDirectDriverQueryExecutionValues != null)
+ }
+
+ protected def getOrCreateDirectDriverQueryExecutionValue(queryExecutionId:
String): DRIVER_ACC = {
+ lastAttemptDirectDriverQueryExecutionValues.synchronized {
+ if
(!lastAttemptDirectDriverQueryExecutionValues.contains(queryExecutionId)) {
+ lastAttemptDirectDriverQueryExecutionValues.put(
+ queryExecutionId, newDriverQueryExecutionAcc())
+ }
+ lastAttemptDirectDriverQueryExecutionValues(queryExecutionId)
+ }
+ }
+
+ protected def getActiveDatasetQueryExecutionId: Option[String] = {
+ SparkContext
+ .getActive
+ .flatMap(sc =>
Option(sc.getLocalProperty(SparkContext.DATASET_QUERY_EXECUTION_ID_KEY)))
+ }
+
+ /**
+ * Check if the value is added on the driver side, not from within a task.
+ * If it is set in the scope of a Dataset's QueryExecution, associate it
with that scope.
+ * This must be called from `add` methods of any AccumulatorV2 subclass
supporting
+ * SQL last attempt metrics to set what the `value` of the metric is after
the operation.
+ * This should be called there after [[setValueIfOnDriverSide]].
+ */
+ protected def addQueryExecutionValueIfOnDriverSide(value: IN): Unit = try {
+ // Note: setValueIfOnDriverSide will already make it invalid if there are
also RDD updates.
+ if (isAtDriverSide && lastAttemptAccumulatorInitialized &&
!lastAttemptAccumulatorInvalid) {
+ // Direct update on the driver, not from within a task.
+ getActiveDatasetQueryExecutionId match {
+ case Some(qeId) =>
+ addToDriverAcc(getOrCreateDirectDriverQueryExecutionValue(qeId),
value)
+ case None => // pass
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ unexpectedLastAttemptMetricOperation(
+ invalidate = true,
+ reason = "Unexpected exception in
addQueryExecutionValueIfOnDriverSide",
+ exception = Some(e))
+ }
+
+ /**
+ * Like [[addQueryExecutionValueIfOnDriverSide]], but for set operations.
+ */
+ protected def setQueryExecutionValueIfOnDriverSide(value: OUT): Unit = try {
+ if (isAtDriverSide && lastAttemptAccumulatorInitialized &&
!lastAttemptAccumulatorInvalid) {
+ getActiveDatasetQueryExecutionId match {
+ case Some(qeId) =>
+ setDriverAcc(getOrCreateDirectDriverQueryExecutionValue(qeId), value)
+ case None => // pass
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ unexpectedLastAttemptMetricOperation(
+ invalidate = true,
+ reason = "Unexpected exception in
setQueryExecutionValueIfOnDriverSide",
+ exception = Some(e))
+ }
+
+ override def logAccumulatorState: LogEntry = try {
+ val driverQEVals = Option(lastAttemptDirectDriverQueryExecutionValues)
+ .map(_.map { case (key, acc) => s"$key -> ${driverAccValue(acc)}"
}.mkString("\n"))
+ .getOrElse("<not initialized>")
+ super.logAccumulatorState +
+ log"""
+ |Direct driver QE values:
+ |${MDC(logKeyAccumulatorState, driverQEVals)}
+ """.stripMargin
+ } catch {
+ case NonFatal(e) =>
+ logWarning(log"Unexpected exception in logAccumulatorState", e)
+ log"<Unexpected exception in logAccumulatorState>"
+ }
+
+ /**
+ * Returns the last attempt value of this accumulator, aggregated from the
last execution of this
+ * QueryExecution.
+ *
+ * @note The output of this method is undefined if this metric was used
inside a part of the plan
+ * which was either checkpointed (e.g. df.localCheckpoint(),
df.checkpoint()) or cached
+ * (e.g. df.cache(), df.persist()).
+ * [[lastAttemptValueForHighestRDDId()]] should return the value from
when the execution in
+ * which the plan was cached/checkpointed.
+ *
+ * @return None if the last attempt value cannot be established, Some(value)
otherwise.
+ */
+ def lastAttemptValueForQueryExecution(qe: QueryExecution): Option[OUT] = {
+ if (lastAttemptAccumulatorInvalid) return None
+ assertValid()
+ // If there was a driver set value defined in the scope of this
QueryExecution, return that.
+ lastAttemptDirectDriverQueryExecutionValues.get(qe.id.toString) match {
+ case Some(acc) => return Some(driverAccValue(acc))
+ case None => // pass
+ }
+ // Otherwise, gather the RDD scoped from the plan and find metric updates
from these scopes.
Review Comment:
Typo: `scoped` → `scopes` (the object being gathered is a `Seq[String]` of
scope IDs).
```suggestion
// Otherwise, gather the RDD scopes from the plan and find metric
updates from these scopes.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]