Repository: spark
Updated Branches:
  refs/heads/master 3eee9e024 -> faf73dcd3


[SPARK-25559][FOLLOW-UP] Add comments for partial pushdown of conjuncts in 
Parquet

## What changes were proposed in this pull request?
This is a follow up of https://github.com/apache/spark/pull/22574. Renamed the 
parameter and added comments.

## How was this patch tested?
N/A

Closes #22679 from gatorsmile/followupSPARK-25559.

Authored-by: gatorsmile <gatorsm...@gmail.com>
Signed-off-by: DB Tsai <d_t...@apple.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/faf73dcd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/faf73dcd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/faf73dcd

Branch: refs/heads/master
Commit: faf73dcd33d04365c28c2846d3a1f845785f69df
Parents: 3eee9e0
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Tue Oct 9 21:10:33 2018 +0000
Committer: DB Tsai <d_t...@apple.com>
Committed: Tue Oct 9 21:10:33 2018 +0000

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFilters.scala    | 31 ++++++++++++++------
 1 file changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/faf73dcd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 44a0d20..21ab9c7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -394,13 +394,22 @@ private[parquet] class ParquetFilters(
    */
   def createFilter(schema: MessageType, predicate: sources.Filter): 
Option[FilterPredicate] = {
     val nameToParquetField = getFieldMap(schema)
-    createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = 
true)
+    createFilterHelper(nameToParquetField, predicate, 
canPartialPushDownConjuncts = true)
   }
 
+  /**
+   * @param nameToParquetField a map from the field name to its field name and 
data type.
+   *                           This only includes the root fields whose types 
are primitive types.
+   * @param predicate the input filter predicates. Not all the predicates can 
be pushed down.
+   * @param canPartialPushDownConjuncts whether a subset of conjuncts of 
predicates can be pushed
+   *                                    down safely. Pushing ONLY one side of 
AND down is safe to
+   *                                    do at the top level or none of its 
ancestors is NOT and OR.
+   * @return the Parquet-native filter predicates that are eligible for 
pushdown.
+   */
   private def createFilterHelper(
       nameToParquetField: Map[String, ParquetField],
       predicate: sources.Filter,
-      canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = {
+      canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = {
     // Decimal type must make sure that filter value's scale matched the file.
     // If doesn't matched, which would cause data corruption.
     def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = 
value match {
@@ -505,24 +514,28 @@ private[parquet] class ParquetFilters(
         // Pushing one side of AND down is only safe to do at the top level or 
in the child
         // AND before hitting NOT or OR conditions, and in this case, the 
unsupported predicate
         // can be safely removed.
-        val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, 
canRemoveOneSideInAnd)
-        val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, 
canRemoveOneSideInAnd)
+        val lhsFilterOption =
+          createFilterHelper(nameToParquetField, lhs, 
canPartialPushDownConjuncts)
+        val rhsFilterOption =
+          createFilterHelper(nameToParquetField, rhs, 
canPartialPushDownConjuncts)
 
         (lhsFilterOption, rhsFilterOption) match {
           case (Some(lhsFilter), Some(rhsFilter)) => 
Some(FilterApi.and(lhsFilter, rhsFilter))
-          case (Some(lhsFilter), None) if canRemoveOneSideInAnd => 
Some(lhsFilter)
-          case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => 
Some(rhsFilter)
+          case (Some(lhsFilter), None) if canPartialPushDownConjuncts => 
Some(lhsFilter)
+          case (None, Some(rhsFilter)) if canPartialPushDownConjuncts => 
Some(rhsFilter)
           case _ => None
         }
 
       case sources.Or(lhs, rhs) =>
         for {
-          lhsFilter <- createFilterHelper(nameToParquetField, lhs, 
canRemoveOneSideInAnd = false)
-          rhsFilter <- createFilterHelper(nameToParquetField, rhs, 
canRemoveOneSideInAnd = false)
+          lhsFilter <-
+            createFilterHelper(nameToParquetField, lhs, 
canPartialPushDownConjuncts = false)
+          rhsFilter <-
+            createFilterHelper(nameToParquetField, rhs, 
canPartialPushDownConjuncts = false)
         } yield FilterApi.or(lhsFilter, rhsFilter)
 
       case sources.Not(pred) =>
-        createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = 
false)
+        createFilterHelper(nameToParquetField, pred, 
canPartialPushDownConjuncts = false)
           .map(FilterApi.not)
 
       case sources.In(name, values) if canMakeFilterOn(name, values.head)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to