[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...

2018-10-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22621


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...

2018-10-04 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22621#discussion_r222642107
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -517,4 +517,57 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the 
input when not necessary") {
+def checkFilterAndRangeMetrics(
+df: DataFrame,
+filterNumOutputs: Int,
+rangeNumOutputs: Int): Unit = {
+  var filter: FilterExec = null
--- End diff --

In the future if we need to catch more nodes, we should abstract it. But 
for now it's only range and filter, I think it's ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...

2018-10-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22621#discussion_r222601659
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
@@ -517,4 +517,57 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the 
input when not necessary") {
+def checkFilterAndRangeMetrics(
+df: DataFrame,
+filterNumOutputs: Int,
+rangeNumOutputs: Int): Unit = {
+  var filter: FilterExec = null
--- End diff --

what about something like this:
```
def collectExecNode[T](pf: PartialFunction[SparkPlan, T]): 
PartialFunction[SparkPlan, T] = {
pf.orElse {
  case w: WholeStageCodegenExec =>
w.child.collect(pf).head
}
  }
  val range = df.queryExecution.executedPlan.collectFirst(
collectExecNode { case r: RangeExec => r })
  val filter = df.queryExecution.executedPlan.collectFirst(
collectExecNode { case f: FilterExec => f })
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org