Repository: spark Updated Branches: refs/heads/master 3eee9e024 -> faf73dcd3
[SPARK-25559][FOLLOW-UP] Add comments for partial pushdown of conjuncts in Parquet ## What changes were proposed in this pull request? This is a follow up of https://github.com/apache/spark/pull/22574. Renamed the parameter and added comments. ## How was this patch tested? N/A Closes #22679 from gatorsmile/followupSPARK-25559. Authored-by: gatorsmile <gatorsm...@gmail.com> Signed-off-by: DB Tsai <d_t...@apple.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/faf73dcd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/faf73dcd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/faf73dcd Branch: refs/heads/master Commit: faf73dcd33d04365c28c2846d3a1f845785f69df Parents: 3eee9e0 Author: gatorsmile <gatorsm...@gmail.com> Authored: Tue Oct 9 21:10:33 2018 +0000 Committer: DB Tsai <d_t...@apple.com> Committed: Tue Oct 9 21:10:33 2018 +0000 ---------------------------------------------------------------------- .../datasources/parquet/ParquetFilters.scala | 31 ++++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/faf73dcd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 44a0d20..21ab9c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -394,13 +394,22 @@ private[parquet] class ParquetFilters( */ def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToParquetField = getFieldMap(schema) - createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true) + createFilterHelper(nameToParquetField, predicate, canPartialPushDownConjuncts = true) } + /** + * @param nameToParquetField a map from the field name to its field name and data type. + * This only includes the root fields whose types are primitive types. + * @param predicate the input filter predicates. Not all the predicates can be pushed down. + * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed + * down safely. Pushing ONLY one side of AND down is safe to + * do at the top level or none of its ancestors is NOT and OR. + * @return the Parquet-native filter predicates that are eligible for pushdown. + */ private def createFilterHelper( nameToParquetField: Map[String, ParquetField], predicate: sources.Filter, - canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = { + canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = { // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { @@ -505,24 +514,28 @@ private[parquet] class ParquetFilters( // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. - val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) - val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) + val lhsFilterOption = + createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts) + val rhsFilterOption = + createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts) (lhsFilterOption, rhsFilterOption) match { case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter)) - case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) - case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter) + case (None, Some(rhsFilter)) if canPartialPushDownConjuncts => Some(rhsFilter) case _ => None } case sources.Or(lhs, rhs) => for { - lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) - rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) + lhsFilter <- + createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts = false) + rhsFilter <- + createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = false) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => - createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false) .map(FilterApi.not) case sources.In(name, values) if canMakeFilterOn(name, values.head) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org