Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21617#discussion_r197981651 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -48,12 +49,13 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = - new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) + new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numLateInputRows) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ - ("memoryUsedBytes" -> JInt(memoryUsedBytes)) + ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ + ("numLateInputRows" -> JInt(numLateInputRows)) --- End diff -- @arunmahadevan > Here you are measuring the number of "keys" filtered out of the state store since they have crossed the late threshold correct ? No, it is based on "input" rows which are filtered out due to watermark threshold. Note that the meaning of "input" is relative, cause it doesn't represent for input rows in overall query, but represents for input rows in state operator. > Its better if we could rather expose the actual number of events that were late. I guess the comment is based on missing thing, but I would think that it would be correct that we filtered out late events from the first phase of query (not from state operator) so that we can get correct count of late events. For now filters affect the count.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org