cloud-fan commented on code in PR #44222:
URL: https://github.com/apache/spark/pull/44222#discussion_r1426175626


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala:
##########
@@ -37,36 +37,47 @@ import org.apache.spark.util.AccumulatorContext.internOption
  * the executor side are automatically propagated and shown in the SQL UI 
through metrics. Updates
  * on the driver side must be explicitly posted using 
[[SQLMetrics.postDriverMetricUpdates()]].
  */
-class SQLMetric(val metricType: String, initValue: Long = 0L) extends 
AccumulatorV2[Long, Long] {
-  // This is a workaround for SPARK-11013.
-  // We may use -1 as initial value of the accumulator, if the accumulator is 
valid, we will
-  // update it at the end of task and the value will be at least 0. Then we 
can filter out the -1
-  // values before calculate max, min, etc.
-  private[this] var _value = initValue
-  private var _zeroValue = initValue
+class SQLMetric(val metricType: String,
+                initValue: Long = 0L,
+                zeroValue: Long = 0L) extends AccumulatorV2[Long, Long] {
+  // initValue defines the initial value of the metric. zeroValue defines the 
lowest value
+  // considered valid. If a SQLMetric is invalid, it is set to zeroValue upon 
receiving any
+  // updates, and it also reports zeroValue as its value to avoid exposing it 
to the user
+  // programatically.
+  //
+  // For many SQLMetrics, we use initValue = -1 and zeroValue = 0 to indicate 
that the metric is
+  // by default invalid. At the end of a task, we will update the metric 
making it valid, and the
+  // invalid metrics will be filtered out when calculating min, max, etc. as a 
workaround for
+  // SPARK-11013.
+  private var _value = initValue
 
   override def copy(): SQLMetric = {
-    val newAcc = new SQLMetric(metricType, _value)
-    newAcc._zeroValue = initValue
+    val newAcc = new SQLMetric(metricType, initValue, zeroValue)
+    newAcc._value = _value
     newAcc
   }
 
-  override def reset(): Unit = _value = _zeroValue
+  override def reset(): Unit = _value = initValue
 
   override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
     case o: SQLMetric =>
-      if (o.value > 0) {
-        if (_value < 0) _value = 0
+      if (o.isValid) {
+        if (!isValid) _value = zeroValue
         _value += o.value
       }
     case _ => throw QueryExecutionErrors.cannotMergeClassWithOtherClassError(
       this.getClass.getName, other.getClass.getName)
   }
 
-  override def isZero: Boolean = _value == _zeroValue
+  // This is used to filter out metrics. Metrics with value equal to initValue 
should
+  // be filtered out, since they are either invalid or safe to filter without 
changing
+  // the aggregation defined in [[SQLMetrics.stringValue]].
+  override def isZero: Boolean = _value == initValue

Review Comment:
   Let's enrich the comment to highlight that, we may want to collect the 0 
value for calculating min/max/avg. We can still link to SPARK-11013.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to