HeartSaVioR commented on a change in pull request #28040: [SPARK-31278][SS] Fix 
StreamingQuery output rows metric
URL: https://github.com/apache/spark/pull/28040#discussion_r399723128
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ##########
 @@ -253,9 +253,11 @@ abstract class FileStreamSinkSuite extends StreamTest {
 
       addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 
10 = 113 after this
       check((100L, 105L) -> 2L)  // no-data-batch emits results on 100-105,
+      assert(query.lastProgress.sink.numOutputRows === 1)
 
 Review comment:
   The value is -1 instead of 0 if it doesn't support output metrics, and as 
you can see the error message in Jenkins build, here the value is 0 instead of 
-1, because the patch overwrites the value to 0 when the batch hasn't run. So 
yes the last progress here is for "no data & no run", though the new commit 
should fix this problem.
   
   > V1 suite
   
   ```
   {
     "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
     "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
     "name" : null,
     "timestamp" : "2020-03-28T23:33:08.567Z",
     "batchId" : 0,
     "numInputRows" : 1,
     "inputRowsPerSecond" : 83.33333333333333,
     "processedRowsPerSecond" : 0.3835826620636747,
     "durationMs" : {
       "addBatch" : 2055,
       "getBatch" : 2,
       "latestOffset" : 0,
       "queryPlanning" : 449,
       "triggerExecution" : 2607,
       "walCommit" : 49
     },
     "eventTime" : {
       "avg" : "1970-01-01T00:01:40.000Z",
       "max" : "1970-01-01T00:01:40.000Z",
       "min" : "1970-01-01T00:01:40.000Z",
       "watermark" : "1970-01-01T00:00:00.000Z"
     },
     "stateOperators" : [ {
       "numRowsTotal" : 1,
       "numRowsUpdated" : 1,
       "memoryUsedBytes" : 1400,
       "customMetrics" : {
         "loadedMapCacheHitCount" : 0,
         "loadedMapCacheMissCount" : 0,
         "stateOnCurrentVersionSizeBytes" : 680
       }
     } ],
     "sources" : [ {
       "description" : "MemoryStream[value#1L]",
       "startOffset" : null,
       "endOffset" : 0,
       "numInputRows" : 1,
       "inputRowsPerSecond" : 83.33333333333333,
       "processedRowsPerSecond" : 0.3835826620636747
     } ],
     "sink" : {
       "description" : 
"FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
       "numOutputRows" : -1
     }
   }
   {
     "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
     "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
     "name" : null,
     "timestamp" : "2020-03-28T23:33:11.185Z",
     "batchId" : 1,
     "numInputRows" : 0,
     "inputRowsPerSecond" : 0.0,
     "processedRowsPerSecond" : 0.0,
     "durationMs" : {
       "addBatch" : 935,
       "getBatch" : 0,
       "latestOffset" : 0,
       "queryPlanning" : 52,
       "triggerExecution" : 1101,
       "walCommit" : 70
     },
     "eventTime" : {
       "watermark" : "1970-01-01T00:01:30.000Z"
     },
     "stateOperators" : [ {
       "numRowsTotal" : 1,
       "numRowsUpdated" : 0,
       "memoryUsedBytes" : 2272,
       "customMetrics" : {
         "loadedMapCacheHitCount" : 10,
         "loadedMapCacheMissCount" : 0,
         "stateOnCurrentVersionSizeBytes" : 720
       }
     } ],
     "sources" : [ {
       "description" : "MemoryStream[value#1L]",
       "startOffset" : 0,
       "endOffset" : 0,
       "numInputRows" : 0,
       "inputRowsPerSecond" : 0.0,
       "processedRowsPerSecond" : 0.0
     } ],
     "sink" : {
       "description" : 
"FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
       "numOutputRows" : -1
     }
   }
   {
     "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
     "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
     "name" : null,
     "timestamp" : "2020-03-28T23:33:12.287Z",
     "batchId" : 2,
     "numInputRows" : 0,
     "inputRowsPerSecond" : 0.0,
     "processedRowsPerSecond" : 0.0,
     "durationMs" : {
       "latestOffset" : 0,
       "triggerExecution" : 0
     },
     "eventTime" : {
       "watermark" : "1970-01-01T00:01:30.000Z"
     },
     "stateOperators" : [ {
       "numRowsTotal" : 1,
       "numRowsUpdated" : 0,
       "memoryUsedBytes" : 2272,
       "customMetrics" : {
         "loadedMapCacheHitCount" : 10,
         "loadedMapCacheMissCount" : 0,
         "stateOnCurrentVersionSizeBytes" : 720
       }
     } ],
     "sources" : [ {
       "description" : "MemoryStream[value#1L]",
       "startOffset" : 0,
       "endOffset" : 0,
       "numInputRows" : 0,
       "inputRowsPerSecond" : 0.0,
       "processedRowsPerSecond" : 0.0
     } ],
     "sink" : {
       "description" : 
"FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
       "numOutputRows" : 0
     }
   }
   {
     "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
     "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
     "name" : null,
     "timestamp" : "2020-03-28T23:33:13.066Z",
     "batchId" : 2,
     "numInputRows" : 2,
     "inputRowsPerSecond" : 153.84615384615384,
     "processedRowsPerSecond" : 3.2258064516129035,
     "durationMs" : {
       "addBatch" : 482,
       "getBatch" : 0,
       "latestOffset" : 0,
       "queryPlanning" : 50,
       "triggerExecution" : 620,
       "walCommit" : 44
     },
     "eventTime" : {
       "avg" : "1970-01-01T00:01:53.500Z",
       "max" : "1970-01-01T00:02:03.000Z",
       "min" : "1970-01-01T00:01:44.000Z",
       "watermark" : "1970-01-01T00:01:30.000Z"
     },
     "stateOperators" : [ {
       "numRowsTotal" : 2,
       "numRowsUpdated" : 2,
       "memoryUsedBytes" : 2584,
       "customMetrics" : {
         "loadedMapCacheHitCount" : 20,
         "loadedMapCacheMissCount" : 0,
         "stateOnCurrentVersionSizeBytes" : 920
       }
     } ],
     "sources" : [ {
       "description" : "MemoryStream[value#1L]",
       "startOffset" : 0,
       "endOffset" : 1,
       "numInputRows" : 2,
       "inputRowsPerSecond" : 153.84615384615384,
       "processedRowsPerSecond" : 3.2258064516129035
     } ],
     "sink" : {
       "description" : 
"FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
       "numOutputRows" : -1
     }
   }
   {
     "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
     "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
     "name" : null,
     "timestamp" : "2020-03-28T23:33:13.688Z",
     "batchId" : 3,
     "numInputRows" : 0,
     "inputRowsPerSecond" : 0.0,
     "processedRowsPerSecond" : 0.0,
     "durationMs" : {
       "addBatch" : 987,
       "getBatch" : 0,
       "latestOffset" : 0,
       "queryPlanning" : 43,
       "triggerExecution" : 1117,
       "walCommit" : 44
     },
     "eventTime" : {
       "watermark" : "1970-01-01T00:01:53.000Z"
     },
     "stateOperators" : [ {
       "numRowsTotal" : 1,
       "numRowsUpdated" : 0,
       "memoryUsedBytes" : 2512,
       "customMetrics" : {
         "loadedMapCacheHitCount" : 30,
         "loadedMapCacheMissCount" : 0,
         "stateOnCurrentVersionSizeBytes" : 720
       }
     } ],
     "sources" : [ {
       "description" : "MemoryStream[value#1L]",
       "startOffset" : 1,
       "endOffset" : 1,
       "numInputRows" : 0,
       "inputRowsPerSecond" : 0.0,
       "processedRowsPerSecond" : 0.0
     } ],
     "sink" : {
       "description" : 
"FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
       "numOutputRows" : -1
     }
   }
   {
     "id" : "1bbf91ac-0a24-4da7-bfe8-54ce1ac63e0f",
     "runId" : "ac738c58-a976-4c87-9547-b4a1ee1c2560",
     "name" : null,
     "timestamp" : "2020-03-28T23:33:14.806Z",
     "batchId" : 4,
     "numInputRows" : 0,
     "inputRowsPerSecond" : 0.0,
     "processedRowsPerSecond" : 0.0,
     "durationMs" : {
       "latestOffset" : 0,
       "triggerExecution" : 0
     },
     "eventTime" : {
       "watermark" : "1970-01-01T00:01:53.000Z"
     },
     "stateOperators" : [ {
       "numRowsTotal" : 1,
       "numRowsUpdated" : 0,
       "memoryUsedBytes" : 2512,
       "customMetrics" : {
         "loadedMapCacheHitCount" : 30,
         "loadedMapCacheMissCount" : 0,
         "stateOnCurrentVersionSizeBytes" : 720
       }
     } ],
     "sources" : [ {
       "description" : "MemoryStream[value#1L]",
       "startOffset" : 1,
       "endOffset" : 1,
       "numInputRows" : 0,
       "inputRowsPerSecond" : 0.0,
       "processedRowsPerSecond" : 0.0
     } ],
     "sink" : {
       "description" : 
"FileSink[/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z40000gn/T/stream.output-cf800c40-1e18-405e-b48f-71a08348a298]",
       "numOutputRows" : 0
     }
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to