Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21469#discussion_r194613720
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
    @@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { 
self: SparkPlan =>
         val storeMetrics = store.metrics
         longMetric("numTotalStateRows") += storeMetrics.numKeys
         longMetric("stateMemory") += storeMetrics.memoryUsedBytes
    -    storeMetrics.customMetrics.foreach { case (metric, value) =>
    -      longMetric(metric.name) += value
    +    storeMetrics.customMetrics.foreach {
    +      case (metric: StateStoreCustomAverageMetric, value) =>
    +        longMetric(metric.name).set(value * 1.0d)
    --- End diff --
    
    We would be better to think about the actual benefit of exposing the value, 
rather than how to expose the value to somewhere. If we define it as count and 
do aggregation as summation, the aggregated value will be `(partition count * 
versions)` which might be hard for end users to find the meaning from the value.
    
    I'm afraid that exposing this to StreamingQuery as average is not trivial, 
especially SQLMetric is defined as `AccumulatorV2[Long, Long]` so only single 
Long value can be passed. Under the restriction, we couldn't define `merge` 
operation for `average metric`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to