[ 
https://issues.apache.org/jira/browse/SPARK-33359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227064#comment-17227064
 ] 

John Wesley commented on SPARK-33359:
-------------------------------------

Thanks for the clarification. How about numInputRows?

> foreachBatch sink outputs wrong metrics
> ---------------------------------------
>
>                 Key: SPARK-33359
>                 URL: https://issues.apache.org/jira/browse/SPARK-33359
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.0.0
>         Environment: Spark on Kubernetes cluster with spark-3.0.0 image. The 
> CRD is ScheduledSparkApplication
>            Reporter: John Wesley
>            Priority: Minor
>
> I created 2 similar jobs, 
> 1) First job reading from kafka and writing to console sink in append mode
> 2) Second job reading from kafka and writing to foreachBatch sink (which then 
> writes in parquet format to S3).
> The metrics in the log for console shows correct values for numInputRows and 
> numOutputRows whereas they are wrong for foreachBatch.
> With foreachBatch:
> numInputRows is +1 more than what is actually present
> numOutputRows is always -1.
> ///Console sink
> //====================================20/11/05 13:36:21 INFO 
> MicroBatchExecution: Streaming query made progress: {
>   "id" : "775aa543-58bf-4cf7-b274-390da640b6ae",
>   "runId" : "e5eac4ca-0b29-4ed4-be35-b70bd20906d5",
>   "name" : null,
>   "timestamp" : "2020-11-05T13:36:08.921Z",
>   "batchId" : 0,
>   "numInputRows" : 10,
>   "processedRowsPerSecond" : 0.7759757895553658,
>   "durationMs" : {
>     "addBatch" : 7735,
>     "getBatch" : 152,
>     "latestOffset" : 2037,
>     "queryPlanning" : 1010,
>     "triggerExecution" : 12886,
>     "walCommit" : 938
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : "KafkaV2[Subscribe[testedr7]]",
>     "startOffset" : null,
>     "endOffset" : {
>       "testedr7" : {
>         "0" : 10
>       }
>     },
>     "numInputRows" : 10,
>     "processedRowsPerSecond" : 0.7759757895553658
>   } ],
>   "sink" : {
>     "description" : 
> "org.apache.spark.sql.execution.streaming.ConsoleTable$@38c3a814",
>     "numOutputRows" : 10
>   }
> }
> ///ForEachBatch Sink
> //====================================20/11/05 13:43:38 INFO 
> MicroBatchExecution: Streaming query made progress: {
>   "id" : "789f9a00-2f2a-4f75-b643-fea201088b4a",
>   "runId" : "b5e695c5-3a2e-4ad2-9dbf-11b69f368f61",
>   "name" : null,
>   "timestamp" : "2020-11-05T13:43:15.421Z",
>   "batchId" : 0,
>   "numInputRows" : 11,
>   "processedRowsPerSecond" : 0.4833252779120348,
>   "durationMs" : {
>     "addBatch" : 17689,
>     "getBatch" : 135,
>     "latestOffset" : 2121,
>     "queryPlanning" : 880,
>     "triggerExecution" : 22758,
>     "walCommit" : 876
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : "KafkaV2[Subscribe[testedr7]]",
>     "startOffset" : null,
>     "endOffset" : {
>       "testedr7" : {
>         "0" : 10
>       }
>     },
>     "numInputRows" : 11,
>     "processedRowsPerSecond" : 0.4833252779120348
>   } ],
>   "sink" : {
>     "description" : "ForeachBatchSink",
>     "numOutputRows" : -1
>   }
> }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to