HyukjinKwon commented on a change in pull request #29145:
URL: https://github.com/apache/spark/pull/29145#discussion_r458750581



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala
##########
@@ -94,7 +91,7 @@ class CSVFilters(filters: Seq[sources.Filter], 
requiredSchema: StructType)
     predicate != null && !predicate.eval(row)

Review comment:
       Max, should we maybe add a variable that increase one by one and assert 
against `index`? We can set it to `0` in `reset`. My biggest worry in here and 
`JsonFilters` is that it's a bit error-prone ..

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala
##########
@@ -48,33 +47,31 @@ class CSVFilters(filters: Seq[sources.Filter], 
requiredSchema: StructType)
   private val predicates: Array[BasePredicate] = {
     val len = requiredSchema.fields.length
     val groupedPredicates = Array.fill[BasePredicate](len)(null)
-    if (SQLConf.get.csvFilterPushDown) {
-      val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter])
-      for (filter <- filters) {
-        val refs = filter.references
-        val index = if (refs.isEmpty) {
-          // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any 
references
-          // Filters w/o refs always return the same result. Taking into 
account
-          // that predicates are combined via `And`, we can apply such filters 
only
-          // once at the position 0.
-          0
-        } else {
-          // readSchema must contain attributes of all filters.
-          // Accordingly, `fieldIndex()` returns a valid index always.
-          refs.map(requiredSchema.fieldIndex).max
-        }
-        groupedFilters(index) :+= filter
+    val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter])
+    for (filter <- filters) {
+      val refs = filter.references
+      val index = if (refs.isEmpty) {
+        // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any 
references
+        // Filters w/o refs always return the same result. Taking into account
+        // that predicates are combined via `And`, we can apply such filters 
only
+        // once at the position 0.
+        0
+      } else {
+        // readSchema must contain attributes of all filters.
+        // Accordingly, `fieldIndex()` returns a valid index always.
+        refs.map(requiredSchema.fieldIndex).max
       }
-      if (len > 0 && !groupedFilters(0).isEmpty) {
-        // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse`
-        // can be evaluated faster that others. We put them in front of others.
-        val (literals, others) = 
groupedFilters(0).partition(_.references.isEmpty)
-        groupedFilters(0) = literals ++ others
-      }
-      for (i <- 0 until len) {
-        if (!groupedFilters(i).isEmpty) {
-          groupedPredicates(i) = toPredicate(groupedFilters(i))
-        }
+      groupedFilters(index) :+= filter
+    }
+    if (len > 0 && !groupedFilters(0).isEmpty) {

Review comment:
       nit: `!groupedFilters(i).isEmpty`-> `groupedFilters(i).nonEmpty`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
##########
@@ -98,7 +99,11 @@ class UnivocityParser(
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = true)
 
-  private val csvFilters = new CSVFilters(filters, requiredSchema)
+  private val csvFilters = if (SQLConf.get.csvFilterPushDown) {

Review comment:
       Shall we match it in JSON side too?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/OrderedFilters.scala
##########
@@ -48,33 +47,31 @@ class CSVFilters(filters: Seq[sources.Filter], 
requiredSchema: StructType)
   private val predicates: Array[BasePredicate] = {
     val len = requiredSchema.fields.length
     val groupedPredicates = Array.fill[BasePredicate](len)(null)
-    if (SQLConf.get.csvFilterPushDown) {
-      val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter])
-      for (filter <- filters) {
-        val refs = filter.references
-        val index = if (refs.isEmpty) {
-          // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any 
references
-          // Filters w/o refs always return the same result. Taking into 
account
-          // that predicates are combined via `And`, we can apply such filters 
only
-          // once at the position 0.
-          0
-        } else {
-          // readSchema must contain attributes of all filters.
-          // Accordingly, `fieldIndex()` returns a valid index always.
-          refs.map(requiredSchema.fieldIndex).max
-        }
-        groupedFilters(index) :+= filter
+    val groupedFilters = Array.fill(len)(Seq.empty[sources.Filter])
+    for (filter <- filters) {
+      val refs = filter.references
+      val index = if (refs.isEmpty) {
+        // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any 
references
+        // Filters w/o refs always return the same result. Taking into account
+        // that predicates are combined via `And`, we can apply such filters 
only
+        // once at the position 0.
+        0
+      } else {
+        // readSchema must contain attributes of all filters.
+        // Accordingly, `fieldIndex()` returns a valid index always.
+        refs.map(requiredSchema.fieldIndex).max
       }
-      if (len > 0 && !groupedFilters(0).isEmpty) {
-        // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse`
-        // can be evaluated faster that others. We put them in front of others.
-        val (literals, others) = 
groupedFilters(0).partition(_.references.isEmpty)
-        groupedFilters(0) = literals ++ others
-      }
-      for (i <- 0 until len) {
-        if (!groupedFilters(i).isEmpty) {
-          groupedPredicates(i) = toPredicate(groupedFilters(i))
-        }
+      groupedFilters(index) :+= filter
+    }
+    if (len > 0 && !groupedFilters(0).isEmpty) {
+      // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse`
+      // can be evaluated faster that others. We put them in front of others.
+      val (literals, others) = 
groupedFilters(0).partition(_.references.isEmpty)
+      groupedFilters(0) = literals ++ others
+    }
+    for (i <- 0 until len) {
+      if (!groupedFilters(i).isEmpty) {

Review comment:
       nit: `nonEmpty` too




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to