Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222510006
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 ---
    @@ -517,4 +517,93 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
       test("writing data out metrics with dynamic partition: parquet") {
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
    +
    +  test("SPARK-25602: range metrics can be wrong if the result rows are not 
fully consumed") {
    +    val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0)
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    +      df.collect()
    +      df.queryExecution.executedPlan.foreach {
    +        case w: WholeStageCodegenExec =>
    +          w.child.foreach {
    +            case f: FilterExec => assert(f.metrics("numOutputRows").value 
== 10L)
    +            case r: RangeExec => assert(r.metrics("numOutputRows").value 
== 30L)
    +            case _ =>
    +          }
    +
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +      df.collect()
    +      df.queryExecution.executedPlan.foreach {
    +        case f: FilterExec => assert(f.metrics("numOutputRows").value == 
10L)
    +        case r: RangeExec => assert(r.metrics("numOutputRows").value == 
30L)
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    +      df.queryExecution.executedPlan.foreach(_.resetMetrics())
    +      // For each partition, we get 2 rows. Then the Filter should produce 
2 rows, and Range should
    +      // produce 4 rows(0, 1, 2, 3).
    +      df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
    +      df.queryExecution.executedPlan.foreach {
    +        case w: WholeStageCodegenExec =>
    +          w.child.foreach {
    +            // Range has 2 partitions, so the expected metrics for filter 
should be 2 * 2, for range
    +            // should be 4 * 2.
    +            case f: FilterExec => assert(f.metrics("numOutputRows").value 
== 4L)
    +            case r: RangeExec => assert(r.metrics("numOutputRows").value 
== 8L)
    +            case _ =>
    +          }
    +
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +      df.queryExecution.executedPlan.foreach(_.resetMetrics())
    +      // For each partition, we get 2 rows. Then the Filter should produce 
2 rows, and Range should
    +      // produce 4 rows(0, 1, 2, 3).
    +      df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
    +      df.queryExecution.executedPlan.foreach {
    +        // Range has 2 partitions, so the expected metrics for filter 
should be 2 * 2, for range
    +        // should be 4 * 2.
    +        case f: FilterExec => assert(f.metrics("numOutputRows").value == 
4L)
    +        case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
    +        case _ =>
    +      }
    +    }
    +
    +    val df2 = df.limit(2)
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    +      // Top-most limit will only run the first task, so totally the 
Filter produces 2 rows, and
    +      // Range produces 4 rows(0, 1, 2, 3).
    +      df2.collect()
    +      df2.queryExecution.executedPlan.foreach {
    +        case w: WholeStageCodegenExec =>
    +          w.child.foreach {
    +            case f: FilterExec => assert(f.metrics("numOutputRows").value 
== 2L)
    +            case r: RangeExec => assert(r.metrics("numOutputRows").value 
== 4L)
    +            case _ =>
    +          }
    +
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +      // Top-most limit will only run the first task, so totally the 
Filter produces 2 rows, and
    --- End diff --
    
    yes


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to