John Wesley created SPARK-33359:
-----------------------------------

             Summary: foreachBatch sink outputs wrong metrics
                 Key: SPARK-33359
                 URL: https://issues.apache.org/jira/browse/SPARK-33359
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.0.0
         Environment: Spark on Kubernetes cluster with spark-3.0.0 image. The 
CRD is ScheduledSparkApplication
            Reporter: John Wesley


I created 2 similar jobs, 
1) First job reading from kafka and writing to console sink in append mode
2) Second job reading from kafka and writing to foreachBatch sink (which then 
writes in parquet format to S3).

The metrics in the log for console shows correct values for numInputRows and 
numOutputRows whereas they are wrong for foreachBatch.

With foreachBatch:
numInputRows is +1 more than what is actually present

numOutputRows is always -1.
///Console sink
//====================================20/11/05 13:36:21 INFO 
MicroBatchExecution: Streaming query made progress: {
  "id" : "775aa543-58bf-4cf7-b274-390da640b6ae",
  "runId" : "e5eac4ca-0b29-4ed4-be35-b70bd20906d5",
  "name" : null,
  "timestamp" : "2020-11-05T13:36:08.921Z",
  "batchId" : 0,
  "numInputRows" : 10,
  "processedRowsPerSecond" : 0.7759757895553658,
  "durationMs" : {
    "addBatch" : 7735,
    "getBatch" : 152,
    "latestOffset" : 2037,
    "queryPlanning" : 1010,
    "triggerExecution" : 12886,
    "walCommit" : 938
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[testedr7]]",
    "startOffset" : null,
    "endOffset" : {
      "testedr7" : {
        "0" : 10
      }
    },
    "numInputRows" : 10,
    "processedRowsPerSecond" : 0.7759757895553658
  } ],
  "sink" : {
    "description" : 
"org.apache.spark.sql.execution.streaming.ConsoleTable$@38c3a814",
    "numOutputRows" : 10
  }
}


///ForEachBatch Sink
//====================================20/11/05 13:43:38 INFO 
MicroBatchExecution: Streaming query made progress: {
  "id" : "789f9a00-2f2a-4f75-b643-fea201088b4a",
  "runId" : "b5e695c5-3a2e-4ad2-9dbf-11b69f368f61",
  "name" : null,
  "timestamp" : "2020-11-05T13:43:15.421Z",
  "batchId" : 0,
  "numInputRows" : 11,
  "processedRowsPerSecond" : 0.4833252779120348,
  "durationMs" : {
    "addBatch" : 17689,
    "getBatch" : 135,
    "latestOffset" : 2121,
    "queryPlanning" : 880,
    "triggerExecution" : 22758,
    "walCommit" : 876
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[testedr7]]",
    "startOffset" : null,
    "endOffset" : {
      "testedr7" : {
        "0" : 10
      }
    },
    "numInputRows" : 11,
    "processedRowsPerSecond" : 0.4833252779120348
  } ],
  "sink" : {
    "description" : "ForeachBatchSink",
    "numOutputRows" : -1
  }
}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to