Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22313#discussion_r214744306
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala ---
    @@ -55,19 +59,52 @@ import org.apache.spark.sql.types._
      * known to be convertible.
      */
     private[orc] object OrcFilters extends Logging {
    +  case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, 
DataType])
    +
    +  private lazy val cacheExpireTimeout =
    +    
org.apache.spark.sql.execution.datasources.orc.OrcFilters.cacheExpireTimeout
    +
    +  private lazy val searchArgumentCache = CacheBuilder.newBuilder()
    +    .expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS)
    +    .build(
    +      new CacheLoader[FilterWithTypeMap, Option[Builder]]() {
    +        override def load(typeMapAndFilter: FilterWithTypeMap): 
Option[Builder] = {
    +          buildSearchArgument(
    +            typeMapAndFilter.typeMap, typeMapAndFilter.filter, 
SearchArgumentFactory.newBuilder())
    +        }
    +      })
    +
    +  private def getOrBuildSearchArgumentWithNewBuilder(
    +      dataTypeMap: Map[String, DataType],
    +      expression: Filter): Option[Builder] = {
    +    // When `spark.sql.orc.cache.sarg.timeout` is 0, cache is disabled.
    +    if (cacheExpireTimeout > 0) {
    +      searchArgumentCache.get(FilterWithTypeMap(expression, dataTypeMap))
    +    } else {
    +      buildSearchArgument(dataTypeMap, expression, 
SearchArgumentFactory.newBuilder())
    +    }
    +  }
    +
       def createFilter(schema: StructType, filters: Array[Filter]): 
Option[SearchArgument] = {
         val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
     
         // First, tries to convert each filter individually to see whether 
it's convertible, and then
         // collect all convertible ones to build the final `SearchArgument`.
         val convertibleFilters = for {
           filter <- filters
    -      _ <- buildSearchArgument(dataTypeMap, filter, 
SearchArgumentFactory.newBuilder())
    +      _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, filter)
         } yield filter
     
         for {
           // Combines all convertible filters using `And` to produce a single 
conjunction
    -      conjunction <- convertibleFilters.reduceOption(And)
    +      conjunction <- convertibleFilters.reduceOption { (x, y) =>
    +        val newFilter = org.apache.spark.sql.sources.And(x, y)
    +        if (cacheExpireTimeout > 0) {
    +          // Build in a bottom-up manner
    +          getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, newFilter)
    +        }
    --- End diff --
    
    Final conjunction? All sub function results will be cached in the end.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to