John Wesley created SPARK-33359: ----------------------------------- Summary: foreachBatch sink outputs wrong metrics Key: SPARK-33359 URL: https://issues.apache.org/jira/browse/SPARK-33359 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.0.0 Environment: Spark on Kubernetes cluster with spark-3.0.0 image. The CRD is ScheduledSparkApplication Reporter: John Wesley
I created 2 similar jobs, 1) First job reading from kafka and writing to console sink in append mode 2) Second job reading from kafka and writing to foreachBatch sink (which then writes in parquet format to S3). The metrics in the log for console shows correct values for numInputRows and numOutputRows whereas they are wrong for foreachBatch. With foreachBatch: numInputRows is +1 more than what is actually present numOutputRows is always -1. ///Console sink //====================================20/11/05 13:36:21 INFO MicroBatchExecution: Streaming query made progress: { "id" : "775aa543-58bf-4cf7-b274-390da640b6ae", "runId" : "e5eac4ca-0b29-4ed4-be35-b70bd20906d5", "name" : null, "timestamp" : "2020-11-05T13:36:08.921Z", "batchId" : 0, "numInputRows" : 10, "processedRowsPerSecond" : 0.7759757895553658, "durationMs" : { "addBatch" : 7735, "getBatch" : 152, "latestOffset" : 2037, "queryPlanning" : 1010, "triggerExecution" : 12886, "walCommit" : 938 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaV2[Subscribe[testedr7]]", "startOffset" : null, "endOffset" : { "testedr7" : { "0" : 10 } }, "numInputRows" : 10, "processedRowsPerSecond" : 0.7759757895553658 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@38c3a814", "numOutputRows" : 10 } } ///ForEachBatch Sink //====================================20/11/05 13:43:38 INFO MicroBatchExecution: Streaming query made progress: { "id" : "789f9a00-2f2a-4f75-b643-fea201088b4a", "runId" : "b5e695c5-3a2e-4ad2-9dbf-11b69f368f61", "name" : null, "timestamp" : "2020-11-05T13:43:15.421Z", "batchId" : 0, "numInputRows" : 11, "processedRowsPerSecond" : 0.4833252779120348, "durationMs" : { "addBatch" : 17689, "getBatch" : 135, "latestOffset" : 2121, "queryPlanning" : 880, "triggerExecution" : 22758, "walCommit" : 876 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaV2[Subscribe[testedr7]]", "startOffset" : null, "endOffset" : { "testedr7" : { "0" : 10 } }, "numInputRows" : 11, "processedRowsPerSecond" : 0.4833252779120348 } ], "sink" : { "description" : "ForeachBatchSink", "numOutputRows" : -1 } } -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org