[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22621 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22621#discussion_r222642107 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -517,4 +517,57 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") { +def checkFilterAndRangeMetrics( +df: DataFrame, +filterNumOutputs: Int, +rangeNumOutputs: Int): Unit = { + var filter: FilterExec = null --- End diff -- In the future if we need to catch more nodes, we should abstract it. But for now it's only range and filter, I think it's ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22621#discussion_r222601659 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -517,4 +517,57 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") { +def checkFilterAndRangeMetrics( +df: DataFrame, +filterNumOutputs: Int, +rangeNumOutputs: Int): Unit = { + var filter: FilterExec = null --- End diff -- what about something like this: ``` def collectExecNode[T](pf: PartialFunction[SparkPlan, T]): PartialFunction[SparkPlan, T] = { pf.orElse { case w: WholeStageCodegenExec => w.child.collect(pf).head } } val range = df.queryExecution.executedPlan.collectFirst( collectExecNode { case r: RangeExec => r }) val filter = df.queryExecution.executedPlan.collectFirst( collectExecNode { case f: FilterExec => f }) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org