Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22630#discussion_r222727599 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -518,56 +521,81 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared testMetricsDynamicPartition("parquet", "parquet", "t1") } + private def collectNodeWithinWholeStage[T <: SparkPlan : ClassTag](plan: SparkPlan): Seq[T] = { + val stages = plan.collect { + case w: WholeStageCodegenExec => w + } + assert(stages.length == 1, "The query plan should have one and only one whole-stage.") + stages.head + + val cls = classTag[T].runtimeClass + stages.head.collect { + case n if n.getClass == cls => n.asInstanceOf[T] + } + } + test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") { def checkFilterAndRangeMetrics( df: DataFrame, filterNumOutputs: Int, rangeNumOutputs: Int): Unit = { - var filter: FilterExec = null - var range: RangeExec = null - val collectFilterAndRange: SparkPlan => Unit = { - case f: FilterExec => - assert(filter == null, "the query should only have one Filter") - filter = f - case r: RangeExec => - assert(range == null, "the query should only have one Range") - range = r - case _ => - } - if (SQLConf.get.wholeStageEnabled) { - df.queryExecution.executedPlan.foreach { - case w: WholeStageCodegenExec => - w.child.foreach(collectFilterAndRange) - case _ => - } - } else { - df.queryExecution.executedPlan.foreach(collectFilterAndRange) - } + val plan = df.queryExecution.executedPlan - assert(filter != null && range != null, "the query doesn't have Filter and Range") - assert(filter.metrics("numOutputRows").value == filterNumOutputs) - assert(range.metrics("numOutputRows").value == rangeNumOutputs) + val filters = collectNodeWithinWholeStage[FilterExec](plan) + assert(filters.length == 1, "The query plan should have one and only one Filter") + assert(filters.head.metrics("numOutputRows").value == filterNumOutputs) + + val ranges = collectNodeWithinWholeStage[RangeExec](plan) + assert(ranges.length == 1, "The query plan should have one and only one Range") + assert(ranges.head.metrics("numOutputRows").value == rangeNumOutputs) } - val df = spark.range(0, 3000, 1, 2).toDF().filter('id % 3 === 0) - val df2 = df.limit(2) - Seq(true, false).foreach { wholeStageEnabled => - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageEnabled.toString) { - df.collect() - checkFilterAndRangeMetrics(df, filterNumOutputs = 1000, rangeNumOutputs = 3000) - - df.queryExecution.executedPlan.foreach(_.resetMetrics()) - // For each partition, we get 2 rows. Then the Filter should produce 2 rows per-partition, - // and Range should produce 1000 rows (one batch) per-partition. Totally Filter produces - // 4 rows, and Range produces 2000 rows. - df.queryExecution.toRdd.mapPartitions(_.take(2)).collect() - checkFilterAndRangeMetrics(df, filterNumOutputs = 4, rangeNumOutputs = 2000) - - // Top-most limit will call `CollectLimitExec.executeCollect`, which will only run the first - // task, so totally the Filter produces 2 rows, and Range produces 1000 rows (one batch). - df2.collect() - checkFilterAndRangeMetrics(df2, filterNumOutputs = 2, rangeNumOutputs = 1000) - } + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { --- End diff -- I change the test to check whole-stage mode only. The metrics is different between whole-stage and normal mode, and the bug was only in whole-stage.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org