Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19882#discussion_r155410507
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
 ---
    @@ -0,0 +1,368 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.orc
    +
    +import java.nio.charset.StandardCharsets
    +import java.sql.{Date, Timestamp}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
    +
    +import org.apache.spark.sql.{Column, DataFrame}
    +import org.apache.spark.sql.catalyst.dsl.expressions._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
    +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
HadoopFsRelation, LogicalRelation}
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A test suite that tests Apache ORC filter API based filter pushdown 
optimization.
    + */
    +class OrcFilterSuite extends OrcTest with SharedSQLContext {
    +
    +  private def checkFilterPredicate(
    +      df: DataFrame,
    +      predicate: Predicate,
    +      checker: (SearchArgument) => Unit): Unit = {
    +    val output = predicate.collect { case a: Attribute => a }.distinct
    +    val query = df
    +      .select(output.map(e => Column(e)): _*)
    +      .where(Column(predicate))
    +
    +    var maybeRelation: Option[HadoopFsRelation] = None
    +    val maybeAnalyzedPredicate = 
query.queryExecution.optimizedPlan.collect {
    +      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: 
HadoopFsRelation, _, _, _)) =>
    +        maybeRelation = Some(orcRelation)
    +        filters
    +    }.flatten.reduceLeftOption(_ && _)
    +    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from 
the given query")
    +
    +    val (_, selectedFilters, _) =
    +      DataSourceStrategy.selectFilters(maybeRelation.get, 
maybeAnalyzedPredicate.toSeq)
    +    assert(selectedFilters.nonEmpty, "No filter is pushed down")
    +
    +    val maybeFilter = OrcFilters.createFilter(query.schema, 
selectedFilters)
    +    assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for 
$selectedFilters")
    +    checker(maybeFilter.get)
    +  }
    +
    +  private def checkFilterPredicate
    +      (predicate: Predicate, filterOperator: PredicateLeaf.Operator)
    +      (implicit df: DataFrame): Unit = {
    +    def checkComparisonOperator(filter: SearchArgument) = {
    +      val operator = filter.getLeaves.asScala
    +      assert(operator.map(_.getOperator).contains(filterOperator))
    +    }
    +    checkFilterPredicate(df, predicate, checkComparisonOperator)
    +  }
    +
    +  private def checkFilterPredicate
    +      (predicate: Predicate, stringExpr: String)
    +      (implicit df: DataFrame): Unit = {
    +    def checkLogicalOperator(filter: SearchArgument) = {
    +      assert(filter.toString == stringExpr)
    +    }
    +    checkFilterPredicate(df, predicate, checkLogicalOperator)
    +  }
    +
    +  private def checkNoFilterPredicate
    +      (predicate: Predicate)
    +      (implicit df: DataFrame): Unit = {
    +    val output = predicate.collect { case a: Attribute => a }.distinct
    +    val query = df
    +      .select(output.map(e => Column(e)): _*)
    +      .where(Column(predicate))
    +
    +    var maybeRelation: Option[HadoopFsRelation] = None
    +    val maybeAnalyzedPredicate = 
query.queryExecution.optimizedPlan.collect {
    +      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: 
HadoopFsRelation, _, _, _)) =>
    +        maybeRelation = Some(orcRelation)
    +        filters
    +    }.flatten.reduceLeftOption(_ && _)
    +    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from 
the given query")
    +
    +    val (_, selectedFilters, _) =
    +      DataSourceStrategy.selectFilters(maybeRelation.get, 
maybeAnalyzedPredicate.toSeq)
    +    assert(selectedFilters.nonEmpty, "No filter is pushed down")
    +
    +    val maybeFilter = OrcFilters.createFilter(query.schema, 
selectedFilters)
    +    assert(maybeFilter.isEmpty, s"Could generate filter predicate for 
$selectedFilters")
    +  }
    +
    +  test("filter pushdown - integer") {
    +    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
    +      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
    +
    +      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate('_1 <=> 1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +
    +      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate('_1 > 3, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 <= 1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
    +
    +      checkFilterPredicate(Literal(1) === '_1, 
PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +      checkFilterPredicate(Literal(2) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(4) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +    }
    +  }
    +
    +  test("filter pushdown - long") {
    +    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { 
implicit df =>
    +      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
    +
    +      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate('_1 <=> 1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +
    +      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate('_1 > 3, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 <= 1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
    +
    +      checkFilterPredicate(Literal(1) === '_1, 
PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +      checkFilterPredicate(Literal(2) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(4) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +    }
    +  }
    +
    +  test("filter pushdown - float") {
    +    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { 
implicit df =>
    +      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
    +
    +      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate('_1 <=> 1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +
    +      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate('_1 > 3, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 <= 1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
    +
    +      checkFilterPredicate(Literal(1) === '_1, 
PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +      checkFilterPredicate(Literal(2) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(4) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +    }
    +  }
    +
    +  test("filter pushdown - double") {
    +    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { 
implicit df =>
    +      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
    +
    +      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate('_1 <=> 1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +
    +      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate('_1 > 3, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 <= 1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
    +
    +      checkFilterPredicate(Literal(1) === '_1, 
PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +      checkFilterPredicate(Literal(2) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(4) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +    }
    +  }
    +
    +  test("filter pushdown - string") {
    +    withOrcDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df 
=>
    +      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
    +
    +      checkFilterPredicate('_1 === "1", PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate('_1 <=> "1", 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +
    +      checkFilterPredicate('_1 < "2", PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate('_1 > "3", 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 <= "1", 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 >= "4", PredicateLeaf.Operator.LESS_THAN)
    +
    +      checkFilterPredicate(Literal("1") === '_1, 
PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate(Literal("1") <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +      checkFilterPredicate(Literal("2") > '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate(Literal("3") < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal("1") >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal("4") <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +    }
    +  }
    +
    +  test("filter pushdown - boolean") {
    +    withOrcDataFrame((true :: false :: Nil).map(b => 
Tuple1.apply(Option(b)))) { implicit df =>
    +      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
    +
    +      checkFilterPredicate('_1 === true, PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate('_1 <=> true, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +
    +      checkFilterPredicate('_1 < true, PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate('_1 > false, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 <= false, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 >= false, PredicateLeaf.Operator.LESS_THAN)
    +
    +      checkFilterPredicate(Literal(false) === '_1, 
PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate(Literal(false) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +      checkFilterPredicate(Literal(false) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate(Literal(true) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(true) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(true) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +    }
    +  }
    +
    +  test("filter pushdown - decimal") {
    +    withOrcDataFrame((1 to 4).map(i => 
Tuple1.apply(BigDecimal.valueOf(i)))) { implicit df =>
    +      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
    +
    +      checkFilterPredicate('_1 === BigDecimal.valueOf(1), 
PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate('_1 <=> BigDecimal.valueOf(1), 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +
    +      checkFilterPredicate('_1 < BigDecimal.valueOf(2), 
PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate('_1 > BigDecimal.valueOf(3), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 <= BigDecimal.valueOf(1), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 >= BigDecimal.valueOf(4), 
PredicateLeaf.Operator.LESS_THAN)
    +
    +      checkFilterPredicate(
    +        Literal(BigDecimal.valueOf(1)) === '_1, 
PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate(
    +        Literal(BigDecimal.valueOf(1)) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +      checkFilterPredicate(
    +        Literal(BigDecimal.valueOf(2)) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate(
    +        Literal(BigDecimal.valueOf(3)) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(
    +        Literal(BigDecimal.valueOf(1)) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(
    +        Literal(BigDecimal.valueOf(4)) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +    }
    +  }
    +
    +  test("filter pushdown - timestamp") {
    +    val timeString = "2015-08-20 14:57:00"
    +    val timestamps = (1 to 4).map { i =>
    +      val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
    +      new Timestamp(milliseconds)
    +    }
    +    withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
    +      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
    +
    +      checkFilterPredicate('_1 === timestamps(0), 
PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate('_1 <=> timestamps(0), 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +
    +      checkFilterPredicate('_1 < timestamps(1), 
PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate('_1 > timestamps(2), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 <= timestamps(0), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate('_1 >= timestamps(3), 
PredicateLeaf.Operator.LESS_THAN)
    +
    +      checkFilterPredicate(Literal(timestamps(0)) === '_1, 
PredicateLeaf.Operator.EQUALS)
    +      checkFilterPredicate(Literal(timestamps(0)) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
    +      checkFilterPredicate(Literal(timestamps(1)) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +      checkFilterPredicate(Literal(timestamps(2)) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(timestamps(0)) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
    +      checkFilterPredicate(Literal(timestamps(3)) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
    +    }
    +  }
    +
    +  test("filter pushdown - combinations with logical operators") {
    +    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
    +      // Because `ExpressionTree` is not accessible at Hive 1.2.x, this 
should be checked
    --- End diff --
    
    I wrote the original tests here like this using `toString` partly because 
`ExpressionTree` (`SearchArgument.getExpression`) it's inaccessible and the 
string format is easy to read. 
    
    Although I think that tree seems available in the ORC, I think it's okay to 
keep the tests like this. It's easy to read but let's fix up the comments here. 
It doesn't look about Hive anyway here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to