Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9468#discussion_r44105125
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
 ---
    @@ -70,43 +76,297 @@ class SimpleTextHadoopFsRelationSuite extends 
HadoopFsRelationTest {
         }
       }
     
    -  private val writer = testDF.write.option("dataSchema", 
dataSchema.json).format(dataSourceName)
    -  private val reader = sqlContext.read.option("dataSchema", 
dataSchema.json).format(dataSourceName)
    -
    -  test("unhandledFilters") {
    -    withTempPath { dir =>
    -
    -      val path = dir.getCanonicalPath
    -      writer.save(s"$path/p=0")
    -      writer.save(s"$path/p=1")
    -
    -      val isOdd = udf((_: Int) % 2 == 1)
    -      val df = reader.load(path)
    -        .filter(
    -          // This filter is inconvertible
    -          isOdd('a) &&
    -            // This filter is convertible but unhandled
    -            'a > 1 &&
    -            // This filter is convertible and handled
    -            'b > "val_1" &&
    -            // This filter references a partiiton column, won't be pushed 
down
    -            'p === 1
    -        ).select('a, 'p)
    -      val rawScan = df.queryExecution.executedPlan collect {
    +  private var tempPath: File = _
    +
    +  private var partitionedDF: DataFrame = _
    +
    +  private val partitionedDataSchema: StructType = StructType('a.int :: 
'b.int :: 'c.string :: Nil)
    +
    +  protected override def beforeAll(): Unit = {
    +    this.tempPath = Utils.createTempDir()
    +
    +    val df = sqlContext.range(10).select(
    +      'id cast IntegerType as 'a,
    +      ('id cast IntegerType) * 2 as 'b,
    +      concat(lit("val_"), 'id) as 'c
    +    )
    +
    +    partitionedWriter(df).save(s"${tempPath.getCanonicalPath}/p=0")
    +    partitionedWriter(df).save(s"${tempPath.getCanonicalPath}/p=1")
    +
    +    partitionedDF = partitionedReader.load(tempPath.getCanonicalPath)
    +  }
    +
    +  override protected def afterAll(): Unit = {
    +    Utils.deleteRecursively(tempPath)
    +  }
    +
    +  private def partitionedWriter(df: DataFrame) =
    +    df.write.option("dataSchema", 
partitionedDataSchema.json).format(dataSourceName)
    +
    +  private def partitionedReader =
    +    sqlContext.read.option("dataSchema", 
partitionedDataSchema.json).format(dataSourceName)
    +
    +  /**
    +   * Constructs test cases that test column pruning and filter push-down.
    +   *
    +   * For filter push-down, the following filters are not pushed-down.
    +   *
    +   * 1. Partitioning filters don't participate filter push-down, they are 
handled separately in
    +   *    `DataSourceStrategy`
    +   *
    +   * 2. Catalyst filter `Expression`s that cannot be converted to data 
source `Filter`s are not
    +   *    pushed down (e.g. UDF and filters referencing multiple columns).
    +   *
    +   * 3. Catalyst filter `Expression`s that can be converted to data source 
`Filter`s but cannot be
    +   *    handled by the underlying data source are not pushed down (e.g. 
returned from
    +   *    `BaseRelation.unhandledFilters()`).
    +   *
    +   *    Note that for [[SimpleTextRelation]], all data source [[Filter]]s 
other than [[GreaterThan]]
    +   *    are unhandled.  We made this assumption in 
[[SimpleTextRelation.unhandledFilters()]] only
    +   *    for testing purposes.
    +   *
    +   * @param projections Projection list of the query
    +   * @param filter Filter condition of the query
    +   * @param requiredColumns Expected names of required columns
    +   * @param pushedFilters Expected data source [[Filter]]s that are pushed 
down
    +   * @param inconvertibleFilters Expected Catalyst filter [[Expression]]s 
that cannot be converted
    +   *        to data source [[Filter]]s
    +   * @param unhandledFilters Expected Catalyst flter [[Expression]]s that 
can be converted to data
    +   *        source [[Filter]]s but cannot be handled by the data source 
relation
    +   * @param partitioningFilters Expected Catalyst filter [[Expression]]s 
that reference partition
    +   *        columns
    +   * @param expectedRawScanAnswer Expected query result of the raw table 
scan returned by the data
    +   *        source relation
    +   * @param expectedAnswer Expected query result of the full query
    +   */
    +  def testPruningAndFiltering(
    +      projections: Seq[Column],
    +      filter: Column,
    +      requiredColumns: Seq[String],
    +      pushedFilters: Seq[Filter],
    +      inconvertibleFilters: Seq[Column],
    +      unhandledFilters: Seq[Column],
    +      partitioningFilters: Seq[Column])(
    +      expectedRawScanAnswer: => Seq[Row])(
    +      expectedAnswer: => Seq[Row]): Unit = {
    +    test(s"pruning and filtering: df.select(${projections.mkString(", 
")}).where($filter)") {
    +      val df = partitionedDF.where(filter).select(projections: _*)
    +      val queryExecution = df.queryExecution
    +      val executedPlan = queryExecution.executedPlan
    +
    +      val rawScan = executedPlan.collect {
             case p: PhysicalRDD => p
           } match {
    -        case Seq(p) => p
    +        case Seq(scan) => scan
    +        case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
           }
     
    -      val outputSchema = new StructType().add("a", IntegerType).add("p", 
IntegerType)
    +      markup("Checking raw scan answer")
    +      checkAnswer(
    +        DataFrame(sqlContext, LogicalRDD(rawScan.output, 
rawScan.rdd)(sqlContext)),
    +        expectedRawScanAnswer)
    +
    +      markup("Checking full query answer")
    +      checkAnswer(df, expectedAnswer)
    +
    +      markup("Checking required columns")
    +      assert(requiredColumns === SimpleTextRelation.requiredColumns)
    +
    +      val nonPushedFilters = {
    +        val boundFilters = executedPlan.collect {
    +          case f: execution.Filter => f
    +        } match {
    +          case Nil => Nil
    +          case Seq(f) => splitConjunctivePredicates(f.condition)
    +          case _ => fail(s"More than one PhysicalRDD 
found\n$queryExecution")
    +        }
     
    -      assertResult(Set((2, 1), (3, 1))) {
    -        rawScan.execute().collect()
    -          .map { CatalystTypeConverters.convertToScala(_, outputSchema) }
    -          .map { case Row(a, p) => (a, p) }.toSet
    +        // Unbound these bound filters so that we can easily compare them 
with expected results.
    +        boundFilters.map {
    +          _.transform { case a: AttributeReference => 
UnresolvedAttribute(a.name) }
    +        }.toSet
           }
     
    -      checkAnswer(df, Row(3, 1))
    +      markup("Checking pushed filters")
    +      assert(SimpleTextRelation.pushedFilters === pushedFilters.toSet)
    +
    +      val expectedInconvertibleFilters = 
inconvertibleFilters.map(_.expr).toSet
    +      val expectedUnhandledFilters = unhandledFilters.map(_.expr).toSet
    +      val expectedPartitioningFilters = 
partitioningFilters.map(_.expr).toSet
    +
    +      markup("Checking unhandled, inconvertible, and partitioning filters")
    +      assert(expectedInconvertibleFilters ++ expectedUnhandledFilters === 
nonPushedFilters)
    +      // Partitioning filters are handled separately and don't participate 
filter push-down. So they
    +      // shouldn't be part of non-pushed filters.
    +      
assert(expectedPartitioningFilters.intersect(nonPushedFilters).isEmpty)
         }
       }
    +
    +  testPruningAndFiltering(
    +    projections = Seq('*),
    +    filter = 'p > 0,
    +    requiredColumns = Seq("a", "b", "c"),
    +    pushedFilters = Nil,
    +    inconvertibleFilters = Nil,
    +    unhandledFilters = Nil,
    +    partitioningFilters = Nil
    +  ) {
    +    Seq(
    +      Row(0, 0, "val_0", 1),
    +      Row(1, 2, "val_1", 1),
    +      Row(2, 4, "val_2", 1),
    +      Row(3, 6, "val_3", 1),
    +      Row(4, 8, "val_4", 1),
    +      Row(5, 10, "val_5", 1),
    +      Row(6, 12, "val_6", 1),
    +      Row(7, 14, "val_7", 1),
    +      Row(8, 16, "val_8", 1),
    +      Row(9, 18, "val_9", 1))
    +  } {
    +    Seq(
    +      Row(0, 0, "val_0", 1),
    +      Row(1, 2, "val_1", 1),
    +      Row(2, 4, "val_2", 1),
    +      Row(3, 6, "val_3", 1),
    +      Row(4, 8, "val_4", 1),
    +      Row(5, 10, "val_5", 1),
    +      Row(6, 12, "val_6", 1),
    +      Row(7, 14, "val_7", 1),
    +      Row(8, 16, "val_8", 1),
    +      Row(9, 18, "val_9", 1))
    +  }
    +
    +  testPruningAndFiltering(
    +    projections = Seq('c, 'p),
    +    filter = 'a < 3 && 'p > 0,
    +    requiredColumns = Seq("c", "a"),
    +    pushedFilters = Nil,
    +    inconvertibleFilters = Nil,
    +    unhandledFilters = Seq('a < 3),
    +    partitioningFilters = Nil
    --- End diff --
    
    Should we have `'p > 0`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to