Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r194613720 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => val storeMetrics = store.metrics longMetric("numTotalStateRows") += storeMetrics.numKeys longMetric("stateMemory") += storeMetrics.memoryUsedBytes - storeMetrics.customMetrics.foreach { case (metric, value) => - longMetric(metric.name) += value + storeMetrics.customMetrics.foreach { + case (metric: StateStoreCustomAverageMetric, value) => + longMetric(metric.name).set(value * 1.0d) --- End diff -- We would be better to think about the actual benefit of exposing the value, rather than how to expose the value to somewhere. If we define it as count and do aggregation as summation, the aggregated value will be `(partition count * versions)` which might be hard for end users to find the meaning from the value. I'm afraid that exposing this to StreamingQuery as average is not trivial, especially SQLMetric is defined as `AccumulatorV2[Long, Long]` so only single Long value can be passed. Under the restriction, we couldn't define `merge` operation for `average metric`.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org