Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20745#discussion_r175994267
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
    @@ -405,4 +406,55 @@ class FileStreamSinkSuite extends StreamTest {
           }
         }
       }
    +
    +  test("SPARK-23288 writing and checking output metrics") {
    +    Seq("parquet", "orc", "text", "json").foreach { format =>
    +      val inputData = MemoryStream[String]
    +      val df = inputData.toDF()
    +
    +      withTempDir { outputDir =>
    +        withTempDir { checkpointDir =>
    +
    +          var query: StreamingQuery = null
    +
    +          var numTasks = 0
    +          var recordsWritten: Long = 0L
    +          var bytesWritten: Long = 0L
    +          try {
    +            spark.sparkContext.addSparkListener(new SparkListener() {
    +              override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    +                val outputMetrics = taskEnd.taskMetrics.outputMetrics
    +                recordsWritten += outputMetrics.recordsWritten
    +                bytesWritten += outputMetrics.bytesWritten
    --- End diff --
    
    Without registering statsTrackers output metrics are not filled and 
`assert(recordsWritten === 5)` and `assert(bytesWritten > 0)` blows up.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to