Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222727599
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
    @@ -518,56 +521,81 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
     
    +  private def collectNodeWithinWholeStage[T <: SparkPlan : ClassTag](plan: 
SparkPlan): Seq[T] = {
    +    val stages = plan.collect {
    +      case w: WholeStageCodegenExec => w
    +    }
    +    assert(stages.length == 1, "The query plan should have one and only 
one whole-stage.")
    +    stages.head
    +
    +    val cls = classTag[T].runtimeClass
    +    stages.head.collect {
    +      case n if n.getClass == cls => n.asInstanceOf[T]
    +    }
    +  }
    +
       test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the 
input when not necessary") {
         def checkFilterAndRangeMetrics(
             df: DataFrame,
             filterNumOutputs: Int,
             rangeNumOutputs: Int): Unit = {
    -      var filter: FilterExec = null
    -      var range: RangeExec = null
    -      val collectFilterAndRange: SparkPlan => Unit = {
    -        case f: FilterExec =>
    -          assert(filter == null, "the query should only have one Filter")
    -          filter = f
    -        case r: RangeExec =>
    -          assert(range == null, "the query should only have one Range")
    -          range = r
    -        case _ =>
    -      }
    -      if (SQLConf.get.wholeStageEnabled) {
    -        df.queryExecution.executedPlan.foreach {
    -          case w: WholeStageCodegenExec =>
    -            w.child.foreach(collectFilterAndRange)
    -          case _ =>
    -        }
    -      } else {
    -        df.queryExecution.executedPlan.foreach(collectFilterAndRange)
    -      }
    +      val plan = df.queryExecution.executedPlan
     
    -      assert(filter != null && range != null, "the query doesn't have 
Filter and Range")
    -      assert(filter.metrics("numOutputRows").value == filterNumOutputs)
    -      assert(range.metrics("numOutputRows").value == rangeNumOutputs)
    +      val filters = collectNodeWithinWholeStage[FilterExec](plan)
    +      assert(filters.length == 1, "The query plan should have one and only 
one Filter")
    +      assert(filters.head.metrics("numOutputRows").value == 
filterNumOutputs)
    +
    +      val ranges = collectNodeWithinWholeStage[RangeExec](plan)
    +      assert(ranges.length == 1, "The query plan should have one and only 
one Range")
    +      assert(ranges.head.metrics("numOutputRows").value == rangeNumOutputs)
         }
     
    -    val df = spark.range(0, 3000, 1, 2).toDF().filter('id % 3 === 0)
    -    val df2 = df.limit(2)
    -    Seq(true, false).foreach { wholeStageEnabled =>
    -      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
wholeStageEnabled.toString) {
    -        df.collect()
    -        checkFilterAndRangeMetrics(df, filterNumOutputs = 1000, 
rangeNumOutputs = 3000)
    -
    -        df.queryExecution.executedPlan.foreach(_.resetMetrics())
    -        // For each partition, we get 2 rows. Then the Filter should 
produce 2 rows per-partition,
    -        // and Range should produce 1000 rows (one batch) per-partition. 
Totally Filter produces
    -        // 4 rows, and Range produces 2000 rows.
    -        df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
    -        checkFilterAndRangeMetrics(df, filterNumOutputs = 4, 
rangeNumOutputs = 2000)
    -
    -        // Top-most limit will call `CollectLimitExec.executeCollect`, 
which will only run the first
    -        // task, so totally the Filter produces 2 rows, and Range produces 
1000 rows (one batch).
    -        df2.collect()
    -        checkFilterAndRangeMetrics(df2, filterNumOutputs = 2, 
rangeNumOutputs = 1000)
    -      }
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    --- End diff --
    
    I change the test to check whole-stage mode only. The metrics is different 
between whole-stage and normal mode, and the bug was only in whole-stage.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to