Repository: spark
Updated Branches:
  refs/heads/master ab05db0b4 -> 986b25140


[SPARK-16400][SQL] Remove InSet filter pushdown from Parquet

## What changes were proposed in this pull request?
This patch removes InSet filter pushdown from Parquet data source, since 
row-based pushdown is not beneficial to Spark and brings extra complexity to 
the code base.

## How was this patch tested?
N/A

Author: Reynold Xin <r...@databricks.com>

Closes #14076 from rxin/SPARK-16400.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/986b2514
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/986b2514
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/986b2514

Branch: refs/heads/master
Commit: 986b2514013ed9ebab526f2cf3dc714cc9e480bf
Parents: ab05db0
Author: Reynold Xin <r...@databricks.com>
Authored: Thu Jul 7 18:09:18 2016 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Jul 7 18:09:18 2016 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/types/StructType.scala |  7 ++-
 .../datasources/parquet/ParquetFilters.scala    | 57 +++++---------------
 .../parquet/ParquetFilterSuite.scala            | 30 -----------
 3 files changed, 18 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/986b2514/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 0284ecc..0c2ebb0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -22,7 +22,7 @@ import scala.util.Try
 
 import org.json4s.JsonDSL._
 
-import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, InterpretedOrdering}
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
LegacyTypeStringParser}
@@ -389,6 +389,11 @@ case class StructType(fields: Array[StructField]) extends 
DataType with Seq[Stru
 
 object StructType extends AbstractDataType {
 
+  /**
+   * A key used in field metadata to indicate that the field comes from the 
result of merging
+   * two different StructTypes that do not always contain the field. That is 
to say, the field
+   * might be missing (optional) from one of the StructTypes.
+   */
   private[sql] val metadataKeyForOptionalField = "_OPTIONAL_"
 
   override private[sql] def defaultConcreteType: DataType = new StructType

http://git-wip-us.apache.org/repos/asf/spark/blob/986b2514/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index e0a113a..426263f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.io.Serializable
-
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.io.api.Binary
@@ -26,18 +24,10 @@ import org.apache.parquet.io.api.Binary
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
 
+/**
+ * Some utility function to convert Spark data source filters to Parquet 
filters.
+ */
 private[sql] object ParquetFilters {
-  case class SetInFilter[T <: Comparable[T]](
-    valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
-
-    override def keep(value: T): Boolean = {
-      value != null && valueSet.contains(value)
-    }
-
-    override def canDrop(statistics: Statistics[T]): Boolean = false
-
-    override def inverseCanDrop(statistics: Statistics[T]): Boolean = false
-  }
 
   private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
     case BooleanType =>
@@ -154,36 +144,16 @@ private[sql] object ParquetFilters {
         FilterApi.gtEq(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
   }
 
-  private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => 
FilterPredicate] = {
-    case IntegerType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(intColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
-    case LongType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(longColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
-    case FloatType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(floatColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
-    case DoubleType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(doubleColumn(n), 
SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
-    case StringType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(binaryColumn(n),
-          SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
-    case BinaryType =>
-      (n: String, v: Set[Any]) =>
-        FilterApi.userDefined(binaryColumn(n),
-          SetInFilter(v.map(e => 
Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
-  }
-
   /**
+   * Returns a map from name of the column to the data type, if predicate push 
down applies
+   * (i.e. not an optional field).
+   *
    * SPARK-11955: The optional fields will have metadata 
StructType.metadataKeyForOptionalField.
    * These fields only exist in one side of merged schemas. Due to that, we 
can't push down filters
-   * using such fields, otherwise Parquet library will throw exception. Here 
we filter out such
-   * fields.
+   * using such fields, otherwise Parquet library will throw exception 
(PARQUET-389).
+   * Here we filter out such fields.
    */
-  private def getFieldMap(dataType: DataType): Array[(String, DataType)] = 
dataType match {
+  private def getFieldMap(dataType: DataType): Map[String, DataType] = 
dataType match {
     case StructType(fields) =>
       // Here we don't flatten the fields in the nested schema but just look 
up through
       // root fields. Currently, accessing to nested fields does not push down 
filters
@@ -191,15 +161,15 @@ private[sql] object ParquetFilters {
       fields.filter { f =>
         !f.metadata.contains(StructType.metadataKeyForOptionalField) ||
           !f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
-      }.map(f => f.name -> f.dataType)
-    case _ => Array.empty[(String, DataType)]
+      }.map(f => f.name -> f.dataType).toMap
+    case _ => Map.empty[String, DataType]
   }
 
   /**
    * Converts data sources filters to Parquet filter predicates.
    */
   def createFilter(schema: StructType, predicate: sources.Filter): 
Option[FilterPredicate] = {
-    val dataTypeOf = getFieldMap(schema).toMap
+    val dataTypeOf = getFieldMap(schema)
 
     // NOTE:
     //
@@ -242,9 +212,6 @@ private[sql] object ParquetFilters {
       case sources.GreaterThanOrEqual(name, value) if 
dataTypeOf.contains(name) =>
         makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
 
-      case sources.In(name, valueSet) =>
-        makeInSet.lift(dataTypeOf(name)).map(_(name, valueSet.toSet))
-
       case sources.And(lhs, rhs) =>
         // At here, it is not safe to just convert one side if we do not 
understand the
         // other side. Here is an example used to explain the reason.

http://git-wip-us.apache.org/repos/asf/spark/blob/986b2514/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 84fdcfe..f59d474 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -514,36 +514,6 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
     }
   }
 
-  test("SPARK-11164: test the parquet filter in") {
-    import testImplicits._
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
-        withTempPath { dir =>
-          val path = s"${dir.getCanonicalPath}/table1"
-          (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", 
"b").write.parquet(path)
-
-          // When a filter is pushed to Parquet, Parquet can apply it to every 
row.
-          // So, we can check the number of rows returned from the Parquet
-          // to make sure our filter pushdown work.
-          val df = spark.read.parquet(path).where("b in (0,2)")
-          assert(stripSparkFilter(df).count == 3)
-
-          val df1 = spark.read.parquet(path).where("not (b in (1))")
-          assert(stripSparkFilter(df1).count == 3)
-
-          val df2 = spark.read.parquet(path).where("not (b in (1,3) or a <= 
2)")
-          assert(stripSparkFilter(df2).count == 2)
-
-          val df3 = spark.read.parquet(path).where("not (b in (1,3) and a <= 
2)")
-          assert(stripSparkFilter(df3).count == 4)
-
-          val df4 = spark.read.parquet(path).where("not (a <= 2)")
-          assert(stripSparkFilter(df4).count == 3)
-        }
-      }
-    }
-  }
-
   test("SPARK-16371 Do not push down filters when inner name and outer name 
are the same") {
     withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
       // Here the schema becomes as below:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to