Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21617#discussion_r197981651
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
    @@ -48,12 +49,13 @@ class StateOperatorProgress private[sql](
       def prettyJson: String = pretty(render(jsonValue))
     
       private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
    -    new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes)
    +    new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, numLateInputRows)
     
       private[sql] def jsonValue: JValue = {
         ("numRowsTotal" -> JInt(numRowsTotal)) ~
         ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
    -    ("memoryUsedBytes" -> JInt(memoryUsedBytes))
    +    ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
    +    ("numLateInputRows" -> JInt(numLateInputRows))
    --- End diff --
    
    @arunmahadevan 
    
    > Here you are measuring the number of "keys" filtered out of the state 
store since they have crossed the late threshold correct ?
    
    No, it is based on "input" rows which are filtered out due to watermark 
threshold. Note that the meaning of "input" is relative, cause it doesn't 
represent for input rows in overall query, but represents for input rows in 
state operator.
    
    > Its better if we could rather expose the actual number of events that 
were late.
    
    I guess the comment is based on missing thing, but I would think that it 
would be correct that we filtered out late events from the first phase of query 
(not from state operator) so that we can get correct count of late events. For 
now filters affect the count.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to