Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21143#discussion_r183924792
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -237,11 +239,16 @@ object DataSourceV2Relation {
             // the data source cannot guarantee the rows returned can pass the 
filter.
             // As a result we must return it so Spark can plan an extra filter 
operator.
             val unhandledFilters = 
filterSupport.pushFilters(translatedMap.values.toArray).toSet
    -        val (unhandledPredicates, pushedPredicates) = 
translatedMap.partition { case (_, f) =>
    +        val unhandledPredicates = translatedMap.filter { case (_, f) =>
               unhandledFilters.contains(f)
    -        }
    -
    -        (nonConvertiblePredicates ++ unhandledPredicates.keys, 
pushedPredicates.keys.toSeq)
    +        }.keys
    +        // The filters which are marked as pushed to this data source
    +        val pushedFilters = filterSupport.pushedFilters()
    +        val pushedPredicates = translatedMap.filter { case (_, f) =>
    +          pushedFilters.contains(f)
    +        }.keys.toSeq
    +
    +        (nonConvertiblePredicates ++ unhandledPredicates, pushedPredicates)
    --- End diff --
    
    maybe we can reverse the map and make it simpler:
    ```
            val translatedMap: Map[Filter, Expression] = filters.flatMap { p =>
              DataSourceStrategy.translateFilter(p).map(f => f -> p)
            }.toMap
    
            // Catalyst predicate expressions that cannot be converted to data 
source filters.
            val convertiblePredicates = translatedMap.values.toSet
            val nonConvertiblePredicates = 
filters.filterNot(convertiblePredicates.contains)
    
            // Data source filters that cannot be pushed down. An unhandled 
filter means
            // the data source cannot guarantee the rows returned can pass the 
filter.
            // As a result we must return it so Spark can plan an extra filter 
operator.
            val unhandledPredicates =
              
filterSupport.pushFilters(translatedMap.keys.toArray).map(translatedMap)
            // The filters which are marked as pushed to this data source
            val pushedPredicates = 
filterSupport.pushedFilters().map(translatedMap)
    
            (nonConvertiblePredicates ++ unhandledPredicates, pushedPredicates)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to