[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r126056717
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -314,21 +339,40 @@ object FileFormatWriter extends Logging {
 
   recordsInFile = 0
   releaseResources()
+  numOutputRows += recordsInFile
   newOutputWriter(fileCounter)
 }
 
 val internalRow = iter.next()
+val startTime = System.nanoTime()
 currentWriter.write(internalRow)
+timeOnCurrentFile += (System.nanoTime() - startTime)
--- End diff --

Yeah, I also considered this option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r126048467
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -314,21 +339,40 @@ object FileFormatWriter extends Logging {
 
   recordsInFile = 0
   releaseResources()
+  numOutputRows += recordsInFile
   newOutputWriter(fileCounter)
 }
 
 val internalRow = iter.next()
+val startTime = System.nanoTime()
 currentWriter.write(internalRow)
+timeOnCurrentFile += (System.nanoTime() - startTime)
--- End diff --

seems it's impossible to track writing time without per-row tracking, shall 
we just remove the `writeTime` metrics?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r126039846
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -314,21 +339,40 @@ object FileFormatWriter extends Logging {
 
   recordsInFile = 0
   releaseResources()
+  numOutputRows += recordsInFile
   newOutputWriter(fileCounter)
 }
 
 val internalRow = iter.next()
+val startTime = System.nanoTime()
 currentWriter.write(internalRow)
+timeOnCurrentFile += (System.nanoTime() - startTime)
--- End diff --

Ok. I'll get rid of per row tracking in a follow-up PR. Thanks @rxin.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-06 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r126015755
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -314,21 +339,40 @@ object FileFormatWriter extends Logging {
 
   recordsInFile = 0
   releaseResources()
+  numOutputRows += recordsInFile
   newOutputWriter(fileCounter)
 }
 
 val internalRow = iter.next()
+val startTime = System.nanoTime()
 currentWriter.write(internalRow)
+timeOnCurrentFile += (System.nanoTime() - startTime)
--- End diff --

We should absolutely NOT do per row tracking.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18159


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125803128
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -314,21 +339,40 @@ object FileFormatWriter extends Logging {
 
   recordsInFile = 0
   releaseResources()
+  numOutputRows += recordsInFile
   newOutputWriter(fileCounter)
 }
 
 val internalRow = iter.next()
+val startTime = System.nanoTime()
 currentWriter.write(internalRow)
+timeOnCurrentFile += (System.nanoTime() - startTime)
--- End diff --

I'm neutral about the per-row time tracking. Comparing with actual data 
writing, it should be faster. IMHO, so the performance penalty may be ignored?

In both `SingleDirectoryWriteTask` and `DynamicPartitionWriteTask`, in each 
iteration, we first pull the row from the iterator and then write the row out 
(and measure the timing). So the time spent on pulling rows from data pipeline 
should be already excluded. That's why I'm not worried it may be inaccurate due 
to data pipelining.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125461897
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -314,21 +339,40 @@ object FileFormatWriter extends Logging {
 
   recordsInFile = 0
   releaseResources()
+  numOutputRows += recordsInFile
   newOutputWriter(fileCounter)
 }
 
 val internalRow = iter.next()
+val startTime = System.nanoTime()
 currentWriter.write(internalRow)
+timeOnCurrentFile += (System.nanoTime() - startTime)
--- End diff --

I'm worried about this per-row time tracking may have performance penalty, 
and also may be inaccurate because the real data flow is pipelined. cc @rxin 
who should have more experience in this area.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125418557
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -47,10 +56,73 @@ trait RunnableCommand extends logical.Command {
 }
 
 /**
+ * A special `RunnableCommand` which writes data out and updates metrics.
+ */
+trait DataWritingCommand extends RunnableCommand {
+
+  override lazy val metrics: Map[String, SQLMetric] = {
+val sparkContext = SparkContext.getActive.get
+Map(
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  protected def updateWritingMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+val sparkContext = SparkContext.getActive.get
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+var totalWritingTime: Long = 0L
+var numFilesNonZeroWritingTime = 0
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+  totalWritingTime += summary.totalWritingTime
+  numFilesNonZeroWritingTime += summary.numFilesWithNonZeroWritingTime
+}
+
+// We only count non-zero writing time when averaging total writing 
time.
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
--- End diff --

Ok. Since it should be rare, I removed this part so the codes can be 
simpler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125400761
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -314,21 +339,40 @@ object FileFormatWriter extends Logging {
 
   recordsInFile = 0
   releaseResources()
+  numOutputRows += recordsInFile
   newOutputWriter(fileCounter)
 }
 
 val internalRow = iter.next()
+val startTime = System.nanoTime()
 currentWriter.write(internalRow)
+timeOnCurrentFile += (System.nanoTime() - startTime)
--- End diff --

If we track the time in `newOutputWriter`, the time will not only contain 
writing time, but also the time pulling the rows when iterating. If the query 
plan is complicated, I guess that makes inaccurate writing time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125400399
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -47,10 +56,73 @@ trait RunnableCommand extends logical.Command {
 }
 
 /**
+ * A special `RunnableCommand` which writes data out and updates metrics.
+ */
+trait DataWritingCommand extends RunnableCommand {
--- End diff --

Ok. Sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125400321
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -47,10 +56,73 @@ trait RunnableCommand extends logical.Command {
 }
 
 /**
+ * A special `RunnableCommand` which writes data out and updates metrics.
+ */
+trait DataWritingCommand extends RunnableCommand {
+
+  override lazy val metrics: Map[String, SQLMetric] = {
+val sparkContext = SparkContext.getActive.get
+Map(
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  protected def updateWritingMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+val sparkContext = SparkContext.getActive.get
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+var totalWritingTime: Long = 0L
+var numFilesNonZeroWritingTime = 0
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+  totalWritingTime += summary.totalWritingTime
+  numFilesNonZeroWritingTime += summary.numFilesWithNonZeroWritingTime
+}
+
+// We only count non-zero writing time when averaging total writing 
time.
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
--- End diff --

Yeah, it's for small partition like the test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125390211
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -47,10 +56,73 @@ trait RunnableCommand extends logical.Command {
 }
 
 /**
+ * A special `RunnableCommand` which writes data out and updates metrics.
+ */
+trait DataWritingCommand extends RunnableCommand {
+
+  override lazy val metrics: Map[String, SQLMetric] = {
+val sparkContext = SparkContext.getActive.get
+Map(
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  protected def updateWritingMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+val sparkContext = SparkContext.getActive.get
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+var totalWritingTime: Long = 0L
+var numFilesNonZeroWritingTime = 0
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+  totalWritingTime += summary.totalWritingTime
+  numFilesNonZeroWritingTime += summary.numFilesWithNonZeroWritingTime
+}
+
+// We only count non-zero writing time when averaging total writing 
time.
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
--- End diff --

I guess this should be rare?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125389877
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -47,10 +56,73 @@ trait RunnableCommand extends logical.Command {
 }
 
 /**
+ * A special `RunnableCommand` which writes data out and updates metrics.
+ */
+trait DataWritingCommand extends RunnableCommand {
+
+  override lazy val metrics: Map[String, SQLMetric] = {
+val sparkContext = SparkContext.getActive.get
+Map(
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  protected def updateWritingMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+val sparkContext = SparkContext.getActive.get
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+var totalWritingTime: Long = 0L
+var numFilesNonZeroWritingTime = 0
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+  totalWritingTime += summary.totalWritingTime
+  numFilesNonZeroWritingTime += summary.numFilesWithNonZeroWritingTime
+}
+
+// We only count non-zero writing time when averaging total writing 
time.
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
--- End diff --

This only happens if a partition is very small, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125389753
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -314,21 +339,40 @@ object FileFormatWriter extends Logging {
 
   recordsInFile = 0
   releaseResources()
+  numOutputRows += recordsInFile
   newOutputWriter(fileCounter)
 }
 
 val internalRow = iter.next()
+val startTime = System.nanoTime()
 currentWriter.write(internalRow)
+timeOnCurrentFile += (System.nanoTime() - startTime)
--- End diff --

instead of tracking the time here, how about we do it in `newOutputWriter`?
```
var startTime = -1
def newOutputWriter {
  if (startTime == -1) {
startTime = System.nanoTime()
  } else {
val currentTime = System.nanoTime()
totalWritingTime += currentTime - startTime
startTime = currentTime
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125389285
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -47,10 +56,73 @@ trait RunnableCommand extends logical.Command {
 }
 
 /**
+ * A special `RunnableCommand` which writes data out and updates metrics.
+ */
+trait DataWritingCommand extends RunnableCommand {
--- End diff --

let's move it to a new file


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125218995
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -273,12 +271,36 @@ object FileFormatWriter extends Logging {
* automatically trigger task aborts.
*/
   private trait ExecuteWriteTask {
+
 /**
- * Writes data out to files, and then returns the list of partition 
strings written out.
- * The list of partitions is sent back to the driver and used to 
update the catalog.
+ * The data structures used to measure metrics during writing.
  */
-def execute(iterator: Iterator[InternalRow]): Set[String]
+protected val writingTimePerFile: mutable.ArrayBuffer[Long] = 
mutable.ArrayBuffer.empty
--- End diff --

Ok. Will update accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125218806
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -273,12 +271,36 @@ object FileFormatWriter extends Logging {
* automatically trigger task aborts.
*/
   private trait ExecuteWriteTask {
+
 /**
- * Writes data out to files, and then returns the list of partition 
strings written out.
- * The list of partitions is sent back to the driver and used to 
update the catalog.
+ * The data structures used to measure metrics during writing.
  */
-def execute(iterator: Iterator[InternalRow]): Set[String]
+protected val writingTimePerFile: mutable.ArrayBuffer[Long] = 
mutable.ArrayBuffer.empty
--- End diff --

I think we should do so, sending `writingTimePerFile` seems expensive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125218623
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -273,12 +271,36 @@ object FileFormatWriter extends Logging {
* automatically trigger task aborts.
*/
   private trait ExecuteWriteTask {
+
 /**
- * Writes data out to files, and then returns the list of partition 
strings written out.
- * The list of partitions is sent back to the driver and used to 
update the catalog.
+ * The data structures used to measure metrics during writing.
  */
-def execute(iterator: Iterator[InternalRow]): Set[String]
+protected val writingTimePerFile: mutable.ArrayBuffer[Long] = 
mutable.ArrayBuffer.empty
--- End diff --

Yap.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125218388
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -273,12 +271,36 @@ object FileFormatWriter extends Logging {
* automatically trigger task aborts.
*/
   private trait ExecuteWriteTask {
+
 /**
- * Writes data out to files, and then returns the list of partition 
strings written out.
- * The list of partitions is sent back to the driver and used to 
update the catalog.
+ * The data structures used to measure metrics during writing.
  */
-def execute(iterator: Iterator[InternalRow]): Set[String]
+protected val writingTimePerFile: mutable.ArrayBuffer[Long] = 
mutable.ArrayBuffer.empty
--- End diff --

OK so we need to send back `totalWriteTime` and 
`numFilesWithNonZeroWritingTime` right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125218214
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -273,12 +271,36 @@ object FileFormatWriter extends Logging {
* automatically trigger task aborts.
*/
   private trait ExecuteWriteTask {
+
 /**
- * Writes data out to files, and then returns the list of partition 
strings written out.
- * The list of partitions is sent back to the driver and used to 
update the catalog.
+ * The data structures used to measure metrics during writing.
  */
-def execute(iterator: Iterator[InternalRow]): Set[String]
+protected val writingTimePerFile: mutable.ArrayBuffer[Long] = 
mutable.ArrayBuffer.empty
--- End diff --

So I send back `writingTimePerFile` and filter with time values greater 
than 0 before counting average on them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125217300
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -53,11 +55,22 @@ case class InsertIntoHadoopFsRelationCommand(
 mode: SaveMode,
 catalogTable: Option[CatalogTable],
 fileIndex: Option[FileIndex])
-  extends RunnableCommand {
+  extends RunnableCommand with MetricUpdater {
   import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
 
   override def children: Seq[LogicalPlan] = query :: Nil
 
+  override lazy val metrics: Map[String, SQLMetric] = {
--- End diff --

Sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125217353
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -47,10 +56,56 @@ trait RunnableCommand extends logical.Command {
 }
 
 /**
+ * A trait for classes that can update its metrics of data writing 
operation.
+ */
+trait MetricUpdater {
+
+  val metrics: Map[String, SQLMetric]
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  protected def callbackMetricsUpdater(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
--- End diff --

Good for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125217322
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -47,10 +56,56 @@ trait RunnableCommand extends logical.Command {
 }
 
 /**
+ * A trait for classes that can update its metrics of data writing 
operation.
+ */
+trait MetricUpdater {
--- End diff --

Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125217117
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -273,12 +271,36 @@ object FileFormatWriter extends Logging {
* automatically trigger task aborts.
*/
   private trait ExecuteWriteTask {
+
 /**
- * Writes data out to files, and then returns the list of partition 
strings written out.
- * The list of partitions is sent back to the driver and used to 
update the catalog.
+ * The data structures used to measure metrics during writing.
  */
-def execute(iterator: Iterator[InternalRow]): Set[String]
+protected val writingTimePerFile: mutable.ArrayBuffer[Long] = 
mutable.ArrayBuffer.empty
--- End diff --

Now I exclude the zero values of writing time (actually < 1ms). So I 
actually don't count totoalWritingTime / numFiles.

totoalWritingTime / numFiles can easily result a zero writing time, for 
example the total writing time for (1ms, 1ms, 0ms, 0ms) is 2ms, and 
totoalWritingTime / numFiles = 2ms / 4 = 0ms (the smallest unit is 1ms).




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125213358
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -273,12 +271,36 @@ object FileFormatWriter extends Logging {
* automatically trigger task aborts.
*/
   private trait ExecuteWriteTask {
+
 /**
- * Writes data out to files, and then returns the list of partition 
strings written out.
- * The list of partitions is sent back to the driver and used to 
update the catalog.
+ * The data structures used to measure metrics during writing.
  */
-def execute(iterator: Iterator[InternalRow]): Set[String]
+protected val writingTimePerFile: mutable.ArrayBuffer[Long] = 
mutable.ArrayBuffer.empty
--- End diff --

Since we only care about average writing time, why we send back 
`writingTimePerFile`? Can we just send back total writing time and numFiles?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125213106
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -53,11 +55,22 @@ case class InsertIntoHadoopFsRelationCommand(
 mode: SaveMode,
 catalogTable: Option[CatalogTable],
 fileIndex: Option[FileIndex])
-  extends RunnableCommand {
+  extends RunnableCommand with MetricUpdater {
   import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
 
   override def children: Seq[LogicalPlan] = query :: Nil
 
+  override lazy val metrics: Map[String, SQLMetric] = {
--- End diff --

can we move this to the parent trait?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125212516
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -47,10 +56,56 @@ trait RunnableCommand extends logical.Command {
 }
 
 /**
+ * A trait for classes that can update its metrics of data writing 
operation.
+ */
+trait MetricUpdater {
+
+  val metrics: Map[String, SQLMetric]
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  protected def callbackMetricsUpdater(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
--- End diff --

how about `updateWritingMetrics`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125212464
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -47,10 +56,56 @@ trait RunnableCommand extends logical.Command {
 }
 
 /**
+ * A trait for classes that can update its metrics of data writing 
operation.
+ */
+trait MetricUpdater {
--- End diff --

I'd like to call it `trait InsertionCommand extends RunnableCommand`, as we 
are updating `avgTime`, `numFiles` etc, which is specific to insertion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125158558
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -19,24 +19,65 @@ package org.apache.spark.sql.execution.command
 
 import java.util.UUID
 
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
  * wrapped in `ExecutedCommand` during execution.
  */
 trait RunnableCommand extends logical.Command {
+
+  // The map used to record the metrics of running the command. This will 
be passed to
+  // `ExecutedCommand` during query planning.
+  private[sql] lazy val metrics: Map[String, SQLMetric] = Map.empty
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  protected def callbackMetricsUpdater(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
--- End diff --

Ok. I created a trait for them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-07-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125158551
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -19,24 +19,65 @@ package org.apache.spark.sql.execution.command
 
 import java.util.UUID
 
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
  * wrapped in `ExecutedCommand` during execution.
  */
 trait RunnableCommand extends logical.Command {
+
+  // The map used to record the metrics of running the command. This will 
be passed to
+  // `ExecutedCommand` during query planning.
+  private[sql] lazy val metrics: Map[String, SQLMetric] = Map.empty
--- End diff --

Oh. got it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125151880
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -19,24 +19,65 @@ package org.apache.spark.sql.execution.command
 
 import java.util.UUID
 
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
  * wrapped in `ExecutedCommand` during execution.
  */
 trait RunnableCommand extends logical.Command {
+
+  // The map used to record the metrics of running the command. This will 
be passed to
+  // `ExecutedCommand` during query planning.
+  private[sql] lazy val metrics: Map[String, SQLMetric] = Map.empty
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  protected def callbackMetricsUpdater(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
--- End diff --

I think it's more reasonable to do this in `InsertIntoHadoopFsRelation`. If 
you are worried about duplicated code,  we can create a trait for 
`InsertIntoHadoopFsRelation` and `InsertIntoHive`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r125151815
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -19,24 +19,65 @@ package org.apache.spark.sql.execution.command
 
 import java.util.UUID
 
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
  * wrapped in `ExecutedCommand` during execution.
  */
 trait RunnableCommand extends logical.Command {
+
+  // The map used to record the metrics of running the command. This will 
be passed to
+  // `ExecutedCommand` during query planning.
+  private[sql] lazy val metrics: Map[String, SQLMetric] = Map.empty
--- End diff --

usually we don't need `private[sql]` under the `execution` package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r124982973
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -19,25 +19,70 @@ package org.apache.spark.sql.execution.command
 
 import java.util.UUID
 
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
  * wrapped in `ExecutedCommand` during execution.
  */
 trait RunnableCommand extends logical.Command {
-  def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] 
= {
+
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
Map.empty
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def prepareMetricsUpdater(
--- End diff --

Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-30 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r124979471
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2659,6 +2668,21 @@ private[spark] object Utils extends Logging {
 redact(redactionPattern, kvs.toArray)
   }
 
+  /**
+   * Computes the average of all elements in an `Iterable`. If there is no 
element, returns 0.
+   */
+  def average[T](ts: Iterable[T])(implicit num: Numeric[T]): Double = {
+if (ts.isEmpty) {
+  0.0
+} else {
+  var count = 0
+  val sum = ts.reduce { (sum, ele) =>
+count += 1
+num.plus(sum, ele)
+  }
+  num.toDouble(sum) / (count + 1)
--- End diff --

Is this better? Now we don't call `size` on the `Iterable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r124740444
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -19,25 +19,70 @@ package org.apache.spark.sql.execution.command
 
 import java.util.UUID
 
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
  * wrapped in `ExecutedCommand` during execution.
  */
 trait RunnableCommand extends logical.Command {
-  def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] 
= {
+
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
Map.empty
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def prepareMetricsUpdater(
--- End diff --

We can remove this abstraction if we can get the `SparkContext` in logical 
plan instead of passing it via `RunnaleCommand.run`. I think this is doable via 
`SparkContext.getActive.get`




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r124739262
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2659,6 +2668,13 @@ private[spark] object Utils extends Logging {
 redact(redactionPattern, kvs.toArray)
   }
 
+  def average[T](ts: Iterable[T])(implicit num: Numeric[T]): Double = {
--- End diff --

shall we require array type parameter? We are calling `size` twice in this 
method, which can be very slow if the input is not indexed seq.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r121940285
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+  def run(
+sparkSession: SparkSession,
+children: Seq[SparkPlan],
+fileCommandExec: FileWritingCommandExec): Seq[Row]
+}
+
+/**
+ * A physical operator specialized to execute the run method of a 
`FileWritingCommand`,
+ * save the result to prevent multiple executions, and record necessary 
metrics for UI.
+ */
+case class FileWritingCommandExec(
+cmd: FileWritingCommand,
+children: Seq[SparkPlan],
+givenMetrics: Option[Map[String, SQLMetric]] = None) extends 
CommandExec {
+
+  override val metrics = givenMetrics.getOrElse {
+val sparkContext = sqlContext.sparkContext
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing to zero when calculating average, so 
excluding them.
+val avgWritingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+// Note: for simplifying metric values assignment, we put the values 
as the alphabetically
+// sorted of the metric keys.
+val metricsNames = metrics.keys.toSeq.sorted
--- End diff --

`givenMetrics` comes from other `FileWritingCommandExec` or an empty. When 
it's an empty map, means the wrapped command won't call this callback. But I 
agree this is loose guarantee. I'll update this in next commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, 

[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r121927605
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+  def run(
+sparkSession: SparkSession,
+children: Seq[SparkPlan],
+fileCommandExec: FileWritingCommandExec): Seq[Row]
+}
+
+/**
+ * A physical operator specialized to execute the run method of a 
`FileWritingCommand`,
+ * save the result to prevent multiple executions, and record necessary 
metrics for UI.
+ */
+case class FileWritingCommandExec(
+cmd: FileWritingCommand,
+children: Seq[SparkPlan],
+givenMetrics: Option[Map[String, SQLMetric]] = None) extends 
CommandExec {
+
+  override val metrics = givenMetrics.getOrElse {
+val sparkContext = sqlContext.sparkContext
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing to zero when calculating average, so 
excluding them.
+val avgWritingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+// Note: for simplifying metric values assignment, we put the values 
as the alphabetically
+// sorted of the metric keys.
--- End diff --

Ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-14 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r121927602
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -60,14 +56,11 @@ case class ExecutedCommandExec(cmd: RunnableCommand, 
children: Seq[SparkPlan]) e
*/
   protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
 val converter = 
CatalystTypeConverters.createToCatalystConverter(schema)
-val rows = if (children.isEmpty) {
-  cmd.run(sqlContext.sparkSession)
-} else {
-  cmd.run(sqlContext.sparkSession, children)
-}
-rows.map(converter(_).asInstanceOf[InternalRow])
+invokeCommand.map(converter(_).asInstanceOf[InternalRow])
   }
 
+  protected[sql] val invokeCommand: Seq[Row]
--- End diff --

Sure. Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r121890420
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+  def run(
+sparkSession: SparkSession,
+children: Seq[SparkPlan],
+fileCommandExec: FileWritingCommandExec): Seq[Row]
+}
+
+/**
+ * A physical operator specialized to execute the run method of a 
`FileWritingCommand`,
+ * save the result to prevent multiple executions, and record necessary 
metrics for UI.
+ */
+case class FileWritingCommandExec(
+cmd: FileWritingCommand,
+children: Seq[SparkPlan],
+givenMetrics: Option[Map[String, SQLMetric]] = None) extends 
CommandExec {
+
+  override val metrics = givenMetrics.getOrElse {
+val sparkContext = sqlContext.sparkContext
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing to zero when calculating average, so 
excluding them.
+val avgWritingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+// Note: for simplifying metric values assignment, we put the values 
as the alphabetically
+// sorted of the metric keys.
+val metricsNames = metrics.keys.toSeq.sorted
--- End diff --

how do you guarantee the `metrics` contains avg, numFiles, etc. as it's 
created by `givenMetrics.getOrElse`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r121889646
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -60,14 +56,11 @@ case class ExecutedCommandExec(cmd: RunnableCommand, 
children: Seq[SparkPlan]) e
*/
   protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
 val converter = 
CatalystTypeConverters.createToCatalystConverter(schema)
-val rows = if (children.isEmpty) {
-  cmd.run(sqlContext.sparkSession)
-} else {
-  cmd.run(sqlContext.sparkSession, children)
-}
-rows.map(converter(_).asInstanceOf[InternalRow])
+invokeCommand.map(converter(_).asInstanceOf[InternalRow])
   }
 
+  protected[sql] val invokeCommand: Seq[Row]
--- End diff --

the name looks weird for a `val`, shall we make it a `def`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r121889358
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+  def run(
+sparkSession: SparkSession,
+children: Seq[SparkPlan],
+fileCommandExec: FileWritingCommandExec): Seq[Row]
+}
+
+/**
+ * A physical operator specialized to execute the run method of a 
`FileWritingCommand`,
+ * save the result to prevent multiple executions, and record necessary 
metrics for UI.
+ */
+case class FileWritingCommandExec(
+cmd: FileWritingCommand,
+children: Seq[SparkPlan],
+givenMetrics: Option[Map[String, SQLMetric]] = None) extends 
CommandExec {
+
+  override val metrics = givenMetrics.getOrElse {
+val sparkContext = sqlContext.sparkContext
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing to zero when calculating average, so 
excluding them.
+val avgWritingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+// Note: for simplifying metric values assignment, we put the values 
as the alphabetically
+// sorted of the metric keys.
--- End diff --

I think it's simpler to just write 4 lines to set these 4 metrics


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120506519
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `FileWritingCommandExec` as its metrics when showing in 
UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
_externalMetrics.getOrElse {
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(sparkContext: SparkContext, metrics: 
Map[String, SQLMetric])
+  (writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing when calculating average, so excluding 
them.
+val writingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+
+val metricsNames = metrics.keys.toSeq.sorted
+val metricsValues = Seq(writingTime, numFiles, totalNumBytes, 
totalNumOutput, numPartitions)
+metricsNames.zip(metricsValues).foreach(x => metrics(x._1).add(x._2))
--- End diff --

For metrics purpose, this change is a bit too large. I'd try not to 
increase the complexity for now. I added a comment for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120377948
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -446,7 +451,12 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val qe = 
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data))
+qe.executedPlan.transform {
+  case f: FileWritingCommandExec =>
+val newCmd = 
f.cmd.withExternalMetrics(externalMetrics.getOrElse(null))
+FileWritingCommandExec(newCmd, f.children)
+}.execute()
--- End diff --

Yeah, I agree that's not a good way to go..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120375471
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `FileWritingCommandExec` as its metrics when showing in 
UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
_externalMetrics.getOrElse {
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(sparkContext: SparkContext, metrics: 
Map[String, SQLMetric])
+  (writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing when calculating average, so excluding 
them.
+val writingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+
+val metricsNames = metrics.keys.toSeq.sorted
+val metricsValues = Seq(writingTime, numFiles, totalNumBytes, 
totalNumOutput, numPartitions)
+metricsNames.zip(metricsValues).foreach(x => metrics(x._1).add(x._2))
--- End diff --

I meant user as in caller of this function. This function only works when 
the input satisfies some requirements and silently fails otherwise. I get it 
that it's a private method, but it's very error prone.

Imagine I want to extend this by adding another metric called 
`avgNumFilesPerPart`. If I add it to the end of the `metricsValues` Seq, then 
all metrics will get messed up (because of the ordering).

At the very least, leave a comment saying that the `metricValues` need to 
be sorted alphabetically (and rename `writingTime` to 

[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120373405
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
--- End diff --

I should have better code comment on this part too. I'll add it in next 
commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120370076
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `FileWritingCommandExec` as its metrics when showing in 
UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
_externalMetrics.getOrElse {
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(sparkContext: SparkContext, metrics: 
Map[String, SQLMetric])
+  (writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing when calculating average, so excluding 
them.
+val writingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+
+val metricsNames = metrics.keys.toSeq.sorted
+val metricsValues = Seq(writingTime, numFiles, totalNumBytes, 
totalNumOutput, numPartitions)
+metricsNames.zip(metricsValues).foreach(x => metrics(x._1).add(x._2))
+
+val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, 
metricsNames.map(metrics(_)))
+  }
+
+  def run(
+sparkSession: SparkSession,
+children: Seq[SparkPlan],
+metrics: Map[String, SQLMetric],
+metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row]
--- End diff --

I understand that, but I think it can still be simplified. See below: 
https://github.com/apache/spark/pull/18159#discussion_r120359550


---
If your project 

[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120369741
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -446,7 +451,12 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val qe = 
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data))
+qe.executedPlan.transform {
+  case f: FileWritingCommandExec =>
+val newCmd = 
f.cmd.withExternalMetrics(externalMetrics.getOrElse(null))
+FileWritingCommandExec(newCmd, f.children)
+}.execute()
--- End diff --

`InsertIntoTable` is a general logical node representing `inserting some 
data into a table`. It is not a `logical.Command`. It shouldn't be a 
`FileWritingCommand` actually. As it's unresolved, making it to extend 
`FileWritingCommand` and have metrics don't make sense too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120368953
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
--- End diff --

Indeed, I missed it at first, but then I got it. See below: 
https://github.com/apache/spark/pull/18159#discussion_r120359550


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120359550
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -446,7 +451,12 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val qe = 
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data))
+qe.executedPlan.transform {
+  case f: FileWritingCommandExec =>
+val newCmd = 
f.cmd.withExternalMetrics(externalMetrics.getOrElse(null))
+FileWritingCommandExec(newCmd, f.children)
+}.execute()
--- End diff --

Hm..

So it looks like you do this in 2 places: `CreateHiveTableAsSelectCommand` 
and `CreateDataSourceTableAsSelectCommand.saveDataIntoTable() -> 
DataSource.writeAndRead()`

For the latter case, I'd pass 
`CreateDataSourceTableAsSelectCommand.metrics` through 
`planForWritingFileFormat()` to `InsertIntoHadoopFsRelationCommand` which 
already extends `FileWritingCommand` and could just take those existing metrics 
as a constructor argument.

For the former, I'm not sure, as I see `InsertIntoTable` does not extend 
`FileWritingCommand`. Maybe it should? Then a similar solution would apply 
there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120355478
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `FileWritingCommandExec` as its metrics when showing in 
UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
_externalMetrics.getOrElse {
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(sparkContext: SparkContext, metrics: 
Map[String, SQLMetric])
+  (writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing when calculating average, so excluding 
them.
--- End diff --

Excluding 0 values because 0 values can make the average writing time as 0. 
It seems to me that it doesn't really make sense to show 0 writing time. Seems 
it indicates the writing doesn't cost time.

1ms brings the average down, but you still get a meaningful time value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120354430
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
--- End diff --

I've explained the purpose of this in below comments.

Once a command invokes another command to write data. We have to update the 
metrics in the physical plan of the first command, not the second.

Before we invoke the second command, we have to replace the metrics in the 
physical plan's metrics with the metrics in the first one (i.e., the external 
metrics).

It seems to me that you miss the relationship that there is two different 
commands.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120352370
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `FileWritingCommandExec` as its metrics when showing in 
UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
_externalMetrics.getOrElse {
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(sparkContext: SparkContext, metrics: 
Map[String, SQLMetric])
+  (writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing when calculating average, so excluding 
them.
+val writingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+
+val metricsNames = metrics.keys.toSeq.sorted
+val metricsValues = Seq(writingTime, numFiles, totalNumBytes, 
totalNumOutput, numPartitions)
+metricsNames.zip(metricsValues).foreach(x => metrics(x._1).add(x._2))
--- End diff --

I am not sure if I correctly understand your point.

The external metrics are always coming from the same kind of trait. It's 
not coming from user, how it's user-provided map?

The order of keys returned by `map.keys` is not determined as I tried. 
Without sorting, how do we make sure we match the metrics values with correct 
metrics keys?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120350843
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `FileWritingCommandExec` as its metrics when showing in 
UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
_externalMetrics.getOrElse {
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(sparkContext: SparkContext, metrics: 
Map[String, SQLMetric])
+  (writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing when calculating average, so excluding 
them.
+val writingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+
+val metricsNames = metrics.keys.toSeq.sorted
+val metricsValues = Seq(writingTime, numFiles, totalNumBytes, 
totalNumOutput, numPartitions)
+metricsNames.zip(metricsValues).foreach(x => metrics(x._1).add(x._2))
+
+val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, 
metricsNames.map(metrics(_)))
+  }
+
+  def run(
+sparkSession: SparkSession,
+children: Seq[SparkPlan],
+metrics: Map[String, SQLMetric],
+metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row]
--- End diff --

The case is more complicated...

We have those commands which don't write the data but invoke another 
commands to do that. The execution data for showing on 

[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120349943
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -446,7 +451,12 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val qe = 
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data))
+qe.executedPlan.transform {
+  case f: FileWritingCommandExec =>
+val newCmd = 
f.cmd.withExternalMetrics(externalMetrics.getOrElse(null))
+FileWritingCommandExec(newCmd, f.children)
+}.execute()
--- End diff --

Can you provide specified suggestion regarding this? I don't think there is 
a possible way to avoid this.

Those commands are invoking another commands to write the data out. We want 
to have the metrics updated is the metrics in the original physical node, not 
the invoked one. To do this, we either pass the callback or the metrics in the 
original plan into the invoked one. Do you think we have other choice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120332795
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
--- End diff --

It's not clear to me why we need this and the comment is not helpful.
Looks to me like this is currently either null/None, or the map that's 
defined below, in `def metrics`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120334376
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `FileWritingCommandExec` as its metrics when showing in 
UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
_externalMetrics.getOrElse {
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(sparkContext: SparkContext, metrics: 
Map[String, SQLMetric])
+  (writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing when calculating average, so excluding 
them.
+val writingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+
+val metricsNames = metrics.keys.toSeq.sorted
+val metricsValues = Seq(writingTime, numFiles, totalNumBytes, 
totalNumOutput, numPartitions)
+metricsNames.zip(metricsValues).foreach(x => metrics(x._1).add(x._2))
+
+val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, 
metricsNames.map(metrics(_)))
+  }
+
+  def run(
+sparkSession: SparkSession,
+children: Seq[SparkPlan],
+metrics: Map[String, SQLMetric],
+metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row]
--- End diff --

Why does this take `metrics` as a parameter, when the trait already has a 
`metrics` member?

A big part of this patch is just about passing down 

[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120319308
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `FileWritingCommandExec` as its metrics when showing in 
UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
_externalMetrics.getOrElse {
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(sparkContext: SparkContext, metrics: 
Map[String, SQLMetric])
+  (writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing when calculating average, so excluding 
them.
--- End diff --

Seems arbitrary to exclude 0 values. 1ms values also bring the average 
down, yet you're keeping those.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120311829
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/FileWritingCommand.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.util.Utils
+
+/**
+ * A logical command specialized for writing data out. 
`FileWritingCommand`s are
+ * wrapped in `FileWritingCommandExec` during execution.
+ */
+trait FileWritingCommand extends logical.Command {
+
+  // The caller of `FileWritingCommand` can replace the metrics location 
by providing this external
+  // metrics structure.
+  private var _externalMetrics: Option[Map[String, SQLMetric]] = None
+  private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): 
this.type = {
+_externalMetrics = Option(map)
+this
+  }
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `FileWritingCommandExec` as its metrics when showing in 
UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = 
_externalMetrics.getOrElse {
+Map(
+  // General metrics.
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part")
+)
+  }
+
+  /**
+   * Callback function that update metrics collected from the writing 
operation.
+   */
+  private[sql] def postDriverMetrics(sparkContext: SparkContext, metrics: 
Map[String, SQLMetric])
+  (writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+// The time for writing individual file can be zero if it's less than 
1 ms. Zero values can
+// lower actual time of writing when calculating average, so excluding 
them.
+val writingTime =
+  Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 
0))).toLong
+
+val metricsNames = metrics.keys.toSeq.sorted
+val metricsValues = Seq(writingTime, numFiles, totalNumBytes, 
totalNumOutput, numPartitions)
+metricsNames.zip(metricsValues).foreach(x => metrics(x._1).add(x._2))
--- End diff --

This doesn't make sense if `_externalMetrics` are provided, as there's no 
guarantee that the user-provided map is exactly as you expect. It just happens 
to work when it's an empty map, but it's very fragile, not to mention the fact 
that it relies on the alphabetical ordering of the keys.. Please rework this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-06 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120339182
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -446,7 +451,12 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val qe = 
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data))
+qe.executedPlan.transform {
+  case f: FileWritingCommandExec =>
+val newCmd = 
f.cmd.withExternalMetrics(externalMetrics.getOrElse(null))
+FileWritingCommandExec(newCmd, f.children)
+}.execute()
--- End diff --

I don't think this is much of an improvement over the previous hack.. See 
above comments, try to avoid changing this code at all.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120134028
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -446,7 +450,12 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val qe = 
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data))
+val insertCommand = qe.executedPlan.collect {
+  case w: WrittenFileCommandExec => w
+}.head
+insertCommand.cmd.run(sparkSession, insertCommand.children,
+  metricsCallback.getOrElse(_ => ()))
--- End diff --

Because here we need to invoke the command and pass the metrics-updating 
callback function. `.execute()` on the plan can't allow us do this. I'd think 
if there is a better way to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-05 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120117616
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/WrittenFileCommandExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+/**
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
+ */
+trait WriteOutFileCommand extends logical.Command {
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `WrittenFileCommandExe` as its metrics when showing in UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] =
+Map(
+  // General metrics.
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)")
+)
+
+  def run(
+  sparkSession: SparkSession,
+  children: Seq[SparkPlan],
+  metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row] = {
+throw new NotImplementedError
+  }
+}
+
+/**
+ * A physical operator specialized to execute the run method of a 
`WriteOutFileCommand`,
+ * save the result to prevent multiple executions, and record necessary 
metrics for UI.
+ */
+case class WrittenFileCommandExec(
+cmd: WriteOutFileCommand,
+children: Seq[SparkPlan]) extends CommandExec {
+
+  override lazy val metrics = cmd.metrics(sqlContext.sparkContext)
+
+  /**
+   * The callback function used to update metrics returned from the 
operation of writing data out.
+   */
+  private def updateDriverMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+val times = writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 0))
+val avgWritingTime = if (times.size > 0) {
+  times.sum / times.size
+} else {
+  0
+}
+
+val metricsNames = Seq("numParts", "numFiles", "numOutputBytes", 
"numOutputRows", "avgTime")
--- End diff --

should just be `metrics.keys`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-05 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120115651
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/WrittenFileCommandExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+/**
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
+ */
+trait WriteOutFileCommand extends logical.Command {
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `WrittenFileCommandExe` as its metrics when showing in UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] =
+Map(
+  // General metrics.
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)")
+)
+
+  def run(
+  sparkSession: SparkSession,
+  children: Seq[SparkPlan],
+  metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row] = {
+throw new NotImplementedError
+  }
+}
+
+/**
+ * A physical operator specialized to execute the run method of a 
`WriteOutFileCommand`,
+ * save the result to prevent multiple executions, and record necessary 
metrics for UI.
+ */
+case class WrittenFileCommandExec(
--- End diff --

for consistency, better call this `Exec`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-05 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120123638
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -58,15 +56,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand, 
children: Seq[SparkPlan]) e
* The `execute()` method of all the physical command classes should 
reference `sideEffectResult`
* so that the command can be executed eagerly right after the command 
query is created.
*/
-  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
-val converter = 
CatalystTypeConverters.createToCatalystConverter(schema)
-val rows = if (children.isEmpty) {
-  cmd.run(sqlContext.sparkSession)
-} else {
-  cmd.run(sqlContext.sparkSession, children)
-}
-rows.map(converter(_).asInstanceOf[InternalRow])
-  }
+  protected[sql] val sideEffectResult: Seq[InternalRow]
--- End diff --

By making this abstract, you ended up writing two very similar 
implementations, which only differ in the arguments to `cmd.run()`
Consider extracting out only the differing part into an abstract function 
and then have this be something like
```
  val converter = CatalystTypeConverters.createToCatalystConverter(schema) 
  val rows = 
  rows.map(converter(_).asInstanceOf[InternalRow])
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-05 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120129166
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -446,7 +450,12 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val qe = 
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data))
+val insertCommand = qe.executedPlan.collect {
+  case w: WrittenFileCommandExec => w
+}.head
+insertCommand.cmd.run(sparkSession, insertCommand.children,
+  metricsCallback.getOrElse(_ => ()))
--- End diff --

This looks like a hack and you did it in 3 different places.
There has to be a way to get it to work via `.execute()` and revert this..




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-05 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120121507
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/WrittenFileCommandExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+/**
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
+ */
+trait WriteOutFileCommand extends logical.Command {
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `WrittenFileCommandExe` as its metrics when showing in UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] =
+Map(
+  // General metrics.
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)")
+)
+
+  def run(
+  sparkSession: SparkSession,
+  children: Seq[SparkPlan],
+  metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row] = {
+throw new NotImplementedError
+  }
+}
+
+/**
+ * A physical operator specialized to execute the run method of a 
`WriteOutFileCommand`,
+ * save the result to prevent multiple executions, and record necessary 
metrics for UI.
+ */
+case class WrittenFileCommandExec(
+cmd: WriteOutFileCommand,
+children: Seq[SparkPlan]) extends CommandExec {
+
+  override lazy val metrics = cmd.metrics(sqlContext.sparkContext)
+
+  /**
+   * The callback function used to update metrics returned from the 
operation of writing data out.
+   */
+  private def updateDriverMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+val times = writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 0))
--- End diff --

leave a note explaining how a `writeTime` of <=0 can occur and why we're 
excluding it here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-05 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120120540
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/WrittenFileCommandExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+/**
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
+ */
+trait WriteOutFileCommand extends logical.Command {
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `WrittenFileCommandExe` as its metrics when showing in UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] =
+Map(
+  // General metrics.
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)")
+)
+
+  def run(
+  sparkSession: SparkSession,
+  children: Seq[SparkPlan],
+  metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row] = {
+throw new NotImplementedError
+  }
+}
+
+/**
+ * A physical operator specialized to execute the run method of a 
`WriteOutFileCommand`,
+ * save the result to prevent multiple executions, and record necessary 
metrics for UI.
+ */
+case class WrittenFileCommandExec(
+cmd: WriteOutFileCommand,
+children: Seq[SparkPlan]) extends CommandExec {
+
+  override lazy val metrics = cmd.metrics(sqlContext.sparkContext)
+
+  /**
+   * The callback function used to update metrics returned from the 
operation of writing data out.
+   */
+  private def updateDriverMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
+
+writeSummaries.foreach { summary =>
+  numPartitions += summary.updatedPartitions.size
+  numFiles += summary.numOutputFile
+  totalNumBytes += summary.numOutputBytes
+  totalNumOutput += summary.numOutputRows
+}
+
+val times = writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 0))
+val avgWritingTime = if (times.size > 0) {
+  times.sum / times.size
+} else {
+  0
+}
--- End diff --

Is there no library/Utils function that computes the average value of a 
list? if not, better add one and use it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-05 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120116201
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/WrittenFileCommandExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+/**
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
+ */
+trait WriteOutFileCommand extends logical.Command {
--- End diff --

FileWritingCommand ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-05 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120115392
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/WrittenFileCommandExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+/**
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
+ */
+trait WriteOutFileCommand extends logical.Command {
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `WrittenFileCommandExe` as its metrics when showing in UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] =
+Map(
+  // General metrics.
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)")
+)
+
+  def run(
+  sparkSession: SparkSession,
+  children: Seq[SparkPlan],
+  metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row] = {
+throw new NotImplementedError
--- End diff --

better omit the body altogether


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-05 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r120120649
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/WrittenFileCommandExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+/**
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
+ */
+trait WriteOutFileCommand extends logical.Command {
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `WrittenFileCommandExe` as its metrics when showing in UI.
+   */
+  def metrics(sparkContext: SparkContext): Map[String, SQLMetric] =
+Map(
+  // General metrics.
+  "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+  "numParts" -> SQLMetrics.createMetric(sparkContext, "number of 
dynamic part"),
+  "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of 
written files"),
+  "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
+  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing 
time (ms)")
+)
+
+  def run(
+  sparkSession: SparkSession,
+  children: Seq[SparkPlan],
+  metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row] = {
+throw new NotImplementedError
+  }
+}
+
+/**
+ * A physical operator specialized to execute the run method of a 
`WriteOutFileCommand`,
+ * save the result to prevent multiple executions, and record necessary 
metrics for UI.
+ */
+case class WrittenFileCommandExec(
+cmd: WriteOutFileCommand,
+children: Seq[SparkPlan]) extends CommandExec {
+
+  override lazy val metrics = cmd.metrics(sqlContext.sparkContext)
+
+  /**
+   * The callback function used to update metrics returned from the 
operation of writing data out.
+   */
+  private def updateDriverMetrics(writeSummaries: 
Seq[ExecutedWriteSummary]): Unit = {
+var numPartitions = 0
+var numFiles = 0
+var totalNumBytes: Long = 0L
+var totalNumOutput: Long = 0L
--- End diff --

I feel this function belongs to the trait, next to the method that creates 
these metrics.
Right now you're spelling out this list of metric names in two different 
places (here + trait)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r12935
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -17,38 +17,97 @@
 
 package org.apache.spark.sql.execution.command
 
+import scala.collection.mutable
+
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
 
 /**
- * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
- * wrapped in `ExecutedCommand` during execution.
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
  */
-trait RunnableCommand extends logical.Command {
-  def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] 
= {
+trait WriteOutFileCommand extends logical.Command {
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `WrittenFileCommandExe` as its metrics when showing in UI.
+   */
+  def metrics(sqlContext: SQLContext): Map[String, SQLMetric] = {
+val sparkContext = sqlContext.sparkContext
+
+Map(
+  // General metrics.
--- End diff --

@cloud-fan I removed the part of specified metrics per file/partition. Now 
it is about 600 lines, and about 180 lines are tests. Do you think it's okay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r11871
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -17,38 +17,97 @@
 
 package org.apache.spark.sql.execution.command
 
+import scala.collection.mutable
+
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
 
 /**
- * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
- * wrapped in `ExecutedCommand` during execution.
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
  */
-trait RunnableCommand extends logical.Command {
-  def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] 
= {
+trait WriteOutFileCommand extends logical.Command {
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `WrittenFileCommandExe` as its metrics when showing in UI.
+   */
+  def metrics(sqlContext: SQLContext): Map[String, SQLMetric] = {
+val sparkContext = sqlContext.sparkContext
+
+Map(
+  // General metrics.
--- End diff --

Ok. Let me revert specified metrics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r11816
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -17,38 +17,97 @@
 
 package org.apache.spark.sql.execution.command
 
+import scala.collection.mutable
+
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
 
 /**
- * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
- * wrapped in `ExecutedCommand` during execution.
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
  */
-trait RunnableCommand extends logical.Command {
-  def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] 
= {
+trait WriteOutFileCommand extends logical.Command {
+
+  /**
+   * Those metrics will be updated once the command finishes writing data 
out. Those metrics will
+   * be taken by `WrittenFileCommandExe` as its metrics when showing in UI.
+   */
+  def metrics(sqlContext: SQLContext): Map[String, SQLMetric] = {
+val sparkContext = sqlContext.sparkContext
+
+Map(
+  // General metrics.
--- End diff --

shall we just add general metrics first? I hope this can make the PR 
smaller...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18159: [SPARK-20703][SQL] Associate metrics with data wr...

2017-06-03 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18159#discussion_r119995109
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
---
@@ -17,38 +17,97 @@
 
 package org.apache.spark.sql.execution.command
 
+import scala.collection.mutable
+
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
 import org.apache.spark.sql.execution.debug._
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types._
 
 /**
- * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
- * wrapped in `ExecutedCommand` during execution.
+ * A logical command specialized for writing data out. 
`WriteOutFileCommand`s are
+ * wrapped in `WrittenFileCommandExec` during execution.
  */
-trait RunnableCommand extends logical.Command {
-  def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] 
= {
+trait WriteOutFileCommand extends logical.Command {
--- End diff --

move it to a new file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org