Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22313#discussion_r214744306 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala --- @@ -55,19 +59,52 @@ import org.apache.spark.sql.types._ * known to be convertible. */ private[orc] object OrcFilters extends Logging { + case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, DataType]) + + private lazy val cacheExpireTimeout = + org.apache.spark.sql.execution.datasources.orc.OrcFilters.cacheExpireTimeout + + private lazy val searchArgumentCache = CacheBuilder.newBuilder() + .expireAfterAccess(cacheExpireTimeout, TimeUnit.SECONDS) + .build( + new CacheLoader[FilterWithTypeMap, Option[Builder]]() { + override def load(typeMapAndFilter: FilterWithTypeMap): Option[Builder] = { + buildSearchArgument( + typeMapAndFilter.typeMap, typeMapAndFilter.filter, SearchArgumentFactory.newBuilder()) + } + }) + + private def getOrBuildSearchArgumentWithNewBuilder( + dataTypeMap: Map[String, DataType], + expression: Filter): Option[Builder] = { + // When `spark.sql.orc.cache.sarg.timeout` is 0, cache is disabled. + if (cacheExpireTimeout > 0) { + searchArgumentCache.get(FilterWithTypeMap(expression, dataTypeMap)) + } else { + buildSearchArgument(dataTypeMap, expression, SearchArgumentFactory.newBuilder()) + } + } + def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap // First, tries to convert each filter individually to see whether it's convertible, and then // collect all convertible ones to build the final `SearchArgument`. val convertibleFilters = for { filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder()) + _ <- getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, filter) } yield filter for { // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- convertibleFilters.reduceOption(And) + conjunction <- convertibleFilters.reduceOption { (x, y) => + val newFilter = org.apache.spark.sql.sources.And(x, y) + if (cacheExpireTimeout > 0) { + // Build in a bottom-up manner + getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, newFilter) + } --- End diff -- Final conjunction? All sub function results will be cached in the end.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org