[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20745 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20745#discussion_r175994267 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -405,4 +406,55 @@ class FileStreamSinkSuite extends StreamTest { } } } + + test("SPARK-23288 writing and checking output metrics") { +Seq("parquet", "orc", "text", "json").foreach { format => + val inputData = MemoryStream[String] + val df = inputData.toDF() + + withTempDir { outputDir => +withTempDir { checkpointDir => + + var query: StreamingQuery = null + + var numTasks = 0 + var recordsWritten: Long = 0L + var bytesWritten: Long = 0L + try { +spark.sparkContext.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { +val outputMetrics = taskEnd.taskMetrics.outputMetrics +recordsWritten += outputMetrics.recordsWritten +bytesWritten += outputMetrics.bytesWritten --- End diff -- Without registering statsTrackers output metrics are not filled and `assert(recordsWritten === 5)` and `assert(bytesWritten > 0)` blows up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20745#discussion_r175992781 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -405,4 +406,55 @@ class FileStreamSinkSuite extends StreamTest { } } } + + test("SPARK-23288 writing and checking output metrics") { +Seq("parquet", "orc", "text", "json").foreach { format => + val inputData = MemoryStream[String] + val df = inputData.toDF() + + withTempDir { outputDir => +withTempDir { checkpointDir => + + var query: StreamingQuery = null + + var numTasks = 0 + var recordsWritten: Long = 0L + var bytesWritten: Long = 0L + try { +spark.sparkContext.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { +val outputMetrics = taskEnd.taskMetrics.outputMetrics +recordsWritten += outputMetrics.recordsWritten +bytesWritten += outputMetrics.bytesWritten --- End diff -- how does it test https://github.com/apache/spark/pull/20745/files#diff-bfa54a3a7c3a41ecbb805d45dcfef2a1R101 ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20745#discussion_r175978136 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -405,4 +406,53 @@ class FileStreamSinkSuite extends StreamTest { } } } + + test("SPARK-23288 writing and checking output metrics") { +Seq("parquet", "orc", "text", "json").foreach { format => + val inputData = MemoryStream[String] + val df = inputData.toDF() + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath --- End diff -- we should use `withTempDir` to clean up the temp directory at the end --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20745#discussion_r175186820 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -405,4 +406,52 @@ class FileStreamSinkSuite extends StreamTest { } } } + + test("SPARK-23288 writing and checking output metrics") { +Seq("parquet", "orc", "text", "json").foreach { format => + val inputData = MemoryStream[String] + val df = inputData.toDF() + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + var query: StreamingQuery = null + + var numTasks = 0 + var recordsWritten: Long = 0L + var bytesWritten: Long = 0L + try { +spark.sparkContext.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { +val outputMetrics = taskEnd.taskMetrics.outputMetrics +recordsWritten += outputMetrics.recordsWritten +bytesWritten += outputMetrics.bytesWritten +numTasks += 1 + } +}) + +query = + df.writeStream +.option("checkpointLocation", checkpointDir) +.format(format) +.start(outputDir) + +inputData.addData("1", "2", "3") +inputData.addData("4", "5") + +failAfter(streamingTimeout) { + query.processAllAvailable() +} + +assert(numTasks === 2) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20745#discussion_r175186800 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -405,4 +406,52 @@ class FileStreamSinkSuite extends StreamTest { } } } + + test("SPARK-23288 writing and checking output metrics") { +Seq("parquet", "orc", "text", "json").foreach { format => + val inputData = MemoryStream[String] + val df = inputData.toDF() + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + var query: StreamingQuery = null + + var numTasks = 0 + var recordsWritten: Long = 0L + var bytesWritten: Long = 0L + try { +spark.sparkContext.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { +val outputMetrics = taskEnd.taskMetrics.outputMetrics +recordsWritten += outputMetrics.recordsWritten +bytesWritten += outputMetrics.bytesWritten +numTasks += 1 + } +}) + +query = + df.writeStream +.option("checkpointLocation", checkpointDir) +.format(format) +.start(outputDir) + +inputData.addData("1", "2", "3") +inputData.addData("4", "5") + +failAfter(streamingTimeout) { + query.processAllAvailable() +} + --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20745#discussion_r175178609 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -405,4 +406,52 @@ class FileStreamSinkSuite extends StreamTest { } } } + + test("SPARK-23288 writing and checking output metrics") { +Seq("parquet", "orc", "text", "json").foreach { format => + val inputData = MemoryStream[String] + val df = inputData.toDF() + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + var query: StreamingQuery = null + + var numTasks = 0 + var recordsWritten: Long = 0L + var bytesWritten: Long = 0L + try { +spark.sparkContext.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { +val outputMetrics = taskEnd.taskMetrics.outputMetrics +recordsWritten += outputMetrics.recordsWritten +bytesWritten += outputMetrics.bytesWritten +numTasks += 1 + } +}) + +query = + df.writeStream +.option("checkpointLocation", checkpointDir) +.format(format) +.start(outputDir) + +inputData.addData("1", "2", "3") +inputData.addData("4", "5") + +failAfter(streamingTimeout) { + query.processAllAvailable() +} + +assert(numTasks === 2) --- End diff -- I would just check `numTasks > 0` since it depends on the configurations and the number of CPU codes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20745#discussion_r175178230 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -405,4 +406,52 @@ class FileStreamSinkSuite extends StreamTest { } } } + + test("SPARK-23288 writing and checking output metrics") { +Seq("parquet", "orc", "text", "json").foreach { format => + val inputData = MemoryStream[String] + val df = inputData.toDF() + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + var query: StreamingQuery = null + + var numTasks = 0 + var recordsWritten: Long = 0L + var bytesWritten: Long = 0L + try { +spark.sparkContext.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { +val outputMetrics = taskEnd.taskMetrics.outputMetrics +recordsWritten += outputMetrics.recordsWritten +bytesWritten += outputMetrics.bytesWritten +numTasks += 1 + } +}) + +query = + df.writeStream +.option("checkpointLocation", checkpointDir) +.format(format) +.start(outputDir) + +inputData.addData("1", "2", "3") +inputData.addData("4", "5") + +failAfter(streamingTimeout) { + query.processAllAvailable() +} + --- End diff -- nit: it's better to add the below statement here to avoid flakiness. ``` spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...
GitHub user gaborgsomogyi opened a pull request: https://github.com/apache/spark/pull/20745 [SPARK-23288][SS] Fix output metrics with parquet sink ## What changes were proposed in this pull request? Output metrics were not filled when parquet sink used. This PR fixes this problem by passing a `BasicWriteJobStatsTracker` in `FileStreamSink`. ## How was this patch tested? Additional unit test added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborgsomogyi/spark SPARK-23288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20745.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20745 commit 22e6ca1576bdeee2092afc8bc82a743e0700a959 Author: Gabor Somogyi Date: 2018-02-19T23:43:46Z [SPARK-23288][SS] Fix output metrics with parquet sink commit 55aa8bca96b112a33cabb352afb4168c2d8f355c Author: Gabor Somogyi Date: 2018-02-28T22:50:47Z Merge branch 'master' into SPARK-23288 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org