HyukjinKwon commented on a change in pull request #29145: URL: https://github.com/apache/spark/pull/29145#discussion_r458750581
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala ########## @@ -94,7 +91,7 @@ class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) predicate != null && !predicate.eval(row) Review comment: Max, should we maybe add a variable that increase one by one and assert against `index`? We can set it to `0` in `reset`. My biggest worry in here and `JsonFilters` is that it's a bit error-prone .. ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala ########## @@ -48,33 +47,31 @@ class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) private val predicates: Array[BasePredicate] = { val len = requiredSchema.fields.length val groupedPredicates = Array.fill[BasePredicate](len)(null) - if (SQLConf.get.csvFilterPushDown) { - val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter]) - for (filter <- filters) { - val refs = filter.references - val index = if (refs.isEmpty) { - // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any references - // Filters w/o refs always return the same result. Taking into account - // that predicates are combined via `And`, we can apply such filters only - // once at the position 0. - 0 - } else { - // readSchema must contain attributes of all filters. - // Accordingly, `fieldIndex()` returns a valid index always. - refs.map(requiredSchema.fieldIndex).max - } - groupedFilters(index) :+= filter + val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter]) + for (filter <- filters) { + val refs = filter.references + val index = if (refs.isEmpty) { + // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any references + // Filters w/o refs always return the same result. Taking into account + // that predicates are combined via `And`, we can apply such filters only + // once at the position 0. + 0 + } else { + // readSchema must contain attributes of all filters. + // Accordingly, `fieldIndex()` returns a valid index always. + refs.map(requiredSchema.fieldIndex).max } - if (len > 0 && !groupedFilters(0).isEmpty) { - // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse` - // can be evaluated faster that others. We put them in front of others. - val (literals, others) = groupedFilters(0).partition(_.references.isEmpty) - groupedFilters(0) = literals ++ others - } - for (i <- 0 until len) { - if (!groupedFilters(i).isEmpty) { - groupedPredicates(i) = toPredicate(groupedFilters(i)) - } + groupedFilters(index) :+= filter + } + if (len > 0 && !groupedFilters(0).isEmpty) { Review comment: nit: `!groupedFilters(i).isEmpty`-> `groupedFilters(i).nonEmpty` ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ########## @@ -98,7 +99,11 @@ class UnivocityParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) - private val csvFilters = new CSVFilters(filters, requiredSchema) + private val csvFilters = if (SQLConf.get.csvFilterPushDown) { Review comment: Shall we match it in JSON side too? ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala ########## @@ -48,33 +47,31 @@ class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) private val predicates: Array[BasePredicate] = { val len = requiredSchema.fields.length val groupedPredicates = Array.fill[BasePredicate](len)(null) - if (SQLConf.get.csvFilterPushDown) { - val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter]) - for (filter <- filters) { - val refs = filter.references - val index = if (refs.isEmpty) { - // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any references - // Filters w/o refs always return the same result. Taking into account - // that predicates are combined via `And`, we can apply such filters only - // once at the position 0. - 0 - } else { - // readSchema must contain attributes of all filters. - // Accordingly, `fieldIndex()` returns a valid index always. - refs.map(requiredSchema.fieldIndex).max - } - groupedFilters(index) :+= filter + val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter]) + for (filter <- filters) { + val refs = filter.references + val index = if (refs.isEmpty) { + // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any references + // Filters w/o refs always return the same result. Taking into account + // that predicates are combined via `And`, we can apply such filters only + // once at the position 0. + 0 + } else { + // readSchema must contain attributes of all filters. + // Accordingly, `fieldIndex()` returns a valid index always. + refs.map(requiredSchema.fieldIndex).max } - if (len > 0 && !groupedFilters(0).isEmpty) { - // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse` - // can be evaluated faster that others. We put them in front of others. - val (literals, others) = groupedFilters(0).partition(_.references.isEmpty) - groupedFilters(0) = literals ++ others - } - for (i <- 0 until len) { - if (!groupedFilters(i).isEmpty) { - groupedPredicates(i) = toPredicate(groupedFilters(i)) - } + groupedFilters(index) :+= filter + } + if (len > 0 && !groupedFilters(0).isEmpty) { + // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse` + // can be evaluated faster that others. We put them in front of others. + val (literals, others) = groupedFilters(0).partition(_.references.isEmpty) + groupedFilters(0) = literals ++ others + } + for (i <- 0 until len) { + if (!groupedFilters(i).isEmpty) { Review comment: nit: `nonEmpty` too ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org