Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r206755595 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -48,12 +49,24 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = - new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) + new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics) private[sql] def jsonValue: JValue = { - ("numRowsTotal" -> JInt(numRowsTotal)) ~ - ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ - ("memoryUsedBytes" -> JInt(memoryUsedBytes)) + def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { + if (map.isEmpty) return JNothing + val keys = map.keySet.asScala.toSeq.sorted + keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _) + } + + val jsonVal = ("numRowsTotal" -> JInt(numRowsTotal)) ~ + ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ + ("memoryUsedBytes" -> JInt(memoryUsedBytes)) + + if (!customMetrics.isEmpty) { --- End diff -- Actually didn't notice that. Thanks for letting me know! Will simplify.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org