[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...

2018-03-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20745


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...

2018-03-20 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20745#discussion_r175994267
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -405,4 +406,55 @@ class FileStreamSinkSuite extends StreamTest {
   }
 }
   }
+
+  test("SPARK-23288 writing and checking output metrics") {
+Seq("parquet", "orc", "text", "json").foreach { format =>
+  val inputData = MemoryStream[String]
+  val df = inputData.toDF()
+
+  withTempDir { outputDir =>
+withTempDir { checkpointDir =>
+
+  var query: StreamingQuery = null
+
+  var numTasks = 0
+  var recordsWritten: Long = 0L
+  var bytesWritten: Long = 0L
+  try {
+spark.sparkContext.addSparkListener(new SparkListener() {
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+val outputMetrics = taskEnd.taskMetrics.outputMetrics
+recordsWritten += outputMetrics.recordsWritten
+bytesWritten += outputMetrics.bytesWritten
--- End diff --

Without registering statsTrackers output metrics are not filled and 
`assert(recordsWritten === 5)` and `assert(bytesWritten > 0)` blows up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...

2018-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20745#discussion_r175992781
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -405,4 +406,55 @@ class FileStreamSinkSuite extends StreamTest {
   }
 }
   }
+
+  test("SPARK-23288 writing and checking output metrics") {
+Seq("parquet", "orc", "text", "json").foreach { format =>
+  val inputData = MemoryStream[String]
+  val df = inputData.toDF()
+
+  withTempDir { outputDir =>
+withTempDir { checkpointDir =>
+
+  var query: StreamingQuery = null
+
+  var numTasks = 0
+  var recordsWritten: Long = 0L
+  var bytesWritten: Long = 0L
+  try {
+spark.sparkContext.addSparkListener(new SparkListener() {
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+val outputMetrics = taskEnd.taskMetrics.outputMetrics
+recordsWritten += outputMetrics.recordsWritten
+bytesWritten += outputMetrics.bytesWritten
--- End diff --

how does it test 
https://github.com/apache/spark/pull/20745/files#diff-bfa54a3a7c3a41ecbb805d45dcfef2a1R101
 ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...

2018-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20745#discussion_r175978136
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -405,4 +406,53 @@ class FileStreamSinkSuite extends StreamTest {
   }
 }
   }
+
+  test("SPARK-23288 writing and checking output metrics") {
+Seq("parquet", "orc", "text", "json").foreach { format =>
+  val inputData = MemoryStream[String]
+  val df = inputData.toDF()
+
+  val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
--- End diff --

we should use `withTempDir` to clean up the temp directory at the end


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...

2018-03-16 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20745#discussion_r175186820
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -405,4 +406,52 @@ class FileStreamSinkSuite extends StreamTest {
   }
 }
   }
+
+  test("SPARK-23288 writing and checking output metrics") {
+Seq("parquet", "orc", "text", "json").foreach { format =>
+  val inputData = MemoryStream[String]
+  val df = inputData.toDF()
+
+  val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
+  val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
+
+  var query: StreamingQuery = null
+
+  var numTasks = 0
+  var recordsWritten: Long = 0L
+  var bytesWritten: Long = 0L
+  try {
+spark.sparkContext.addSparkListener(new SparkListener() {
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+val outputMetrics = taskEnd.taskMetrics.outputMetrics
+recordsWritten += outputMetrics.recordsWritten
+bytesWritten += outputMetrics.bytesWritten
+numTasks += 1
+  }
+})
+
+query =
+  df.writeStream
+.option("checkpointLocation", checkpointDir)
+.format(format)
+.start(outputDir)
+
+inputData.addData("1", "2", "3")
+inputData.addData("4", "5")
+
+failAfter(streamingTimeout) {
+  query.processAllAvailable()
+}
+
+assert(numTasks === 2)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...

2018-03-16 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20745#discussion_r175186800
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -405,4 +406,52 @@ class FileStreamSinkSuite extends StreamTest {
   }
 }
   }
+
+  test("SPARK-23288 writing and checking output metrics") {
+Seq("parquet", "orc", "text", "json").foreach { format =>
+  val inputData = MemoryStream[String]
+  val df = inputData.toDF()
+
+  val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
+  val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
+
+  var query: StreamingQuery = null
+
+  var numTasks = 0
+  var recordsWritten: Long = 0L
+  var bytesWritten: Long = 0L
+  try {
+spark.sparkContext.addSparkListener(new SparkListener() {
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+val outputMetrics = taskEnd.taskMetrics.outputMetrics
+recordsWritten += outputMetrics.recordsWritten
+bytesWritten += outputMetrics.bytesWritten
+numTasks += 1
+  }
+})
+
+query =
+  df.writeStream
+.option("checkpointLocation", checkpointDir)
+.format(format)
+.start(outputDir)
+
+inputData.addData("1", "2", "3")
+inputData.addData("4", "5")
+
+failAfter(streamingTimeout) {
+  query.processAllAvailable()
+}
+
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...

2018-03-16 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20745#discussion_r175178609
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -405,4 +406,52 @@ class FileStreamSinkSuite extends StreamTest {
   }
 }
   }
+
+  test("SPARK-23288 writing and checking output metrics") {
+Seq("parquet", "orc", "text", "json").foreach { format =>
+  val inputData = MemoryStream[String]
+  val df = inputData.toDF()
+
+  val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
+  val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
+
+  var query: StreamingQuery = null
+
+  var numTasks = 0
+  var recordsWritten: Long = 0L
+  var bytesWritten: Long = 0L
+  try {
+spark.sparkContext.addSparkListener(new SparkListener() {
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+val outputMetrics = taskEnd.taskMetrics.outputMetrics
+recordsWritten += outputMetrics.recordsWritten
+bytesWritten += outputMetrics.bytesWritten
+numTasks += 1
+  }
+})
+
+query =
+  df.writeStream
+.option("checkpointLocation", checkpointDir)
+.format(format)
+.start(outputDir)
+
+inputData.addData("1", "2", "3")
+inputData.addData("4", "5")
+
+failAfter(streamingTimeout) {
+  query.processAllAvailable()
+}
+
+assert(numTasks === 2)
--- End diff --

I would just check `numTasks > 0` since it depends on the configurations 
and the number of CPU codes.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...

2018-03-16 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20745#discussion_r175178230
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -405,4 +406,52 @@ class FileStreamSinkSuite extends StreamTest {
   }
 }
   }
+
+  test("SPARK-23288 writing and checking output metrics") {
+Seq("parquet", "orc", "text", "json").foreach { format =>
+  val inputData = MemoryStream[String]
+  val df = inputData.toDF()
+
+  val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
+  val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
+
+  var query: StreamingQuery = null
+
+  var numTasks = 0
+  var recordsWritten: Long = 0L
+  var bytesWritten: Long = 0L
+  try {
+spark.sparkContext.addSparkListener(new SparkListener() {
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+val outputMetrics = taskEnd.taskMetrics.outputMetrics
+recordsWritten += outputMetrics.recordsWritten
+bytesWritten += outputMetrics.bytesWritten
+numTasks += 1
+  }
+})
+
+query =
+  df.writeStream
+.option("checkpointLocation", checkpointDir)
+.format(format)
+.start(outputDir)
+
+inputData.addData("1", "2", "3")
+inputData.addData("4", "5")
+
+failAfter(streamingTimeout) {
+  query.processAllAvailable()
+}
+
--- End diff --

nit: it's better to add the below statement here to avoid flakiness.
```
spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20745: [SPARK-23288][SS] Fix output metrics with parquet...

2018-03-05 Thread gaborgsomogyi
GitHub user gaborgsomogyi opened a pull request:

https://github.com/apache/spark/pull/20745

[SPARK-23288][SS] Fix output metrics with parquet sink

## What changes were proposed in this pull request?

Output metrics were not filled when parquet sink used.

This PR fixes this problem by passing a `BasicWriteJobStatsTracker` in 
`FileStreamSink`.

## How was this patch tested?

Additional unit test added.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborgsomogyi/spark SPARK-23288

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20745.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20745


commit 22e6ca1576bdeee2092afc8bc82a743e0700a959
Author: Gabor Somogyi 
Date:   2018-02-19T23:43:46Z

[SPARK-23288][SS] Fix output metrics with parquet sink

commit 55aa8bca96b112a33cabb352afb4168c2d8f355c
Author: Gabor Somogyi 
Date:   2018-02-28T22:50:47Z

Merge branch 'master' into SPARK-23288




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org