Repository: spark Updated Branches: refs/heads/branch-2.2 db821fe55 -> 8b0cb3a7b
[SPARK-20364][SQL] Disable Parquet predicate pushdown for fields having dots in the names ## What changes were proposed in this pull request? This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with https://github.com/apache/spark/pull/17680. The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case. I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here. This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best). Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet. **With dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ +--------+ ``` **Without dots** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("coldots").write.parquet(path) spark.read.parquet(path).where("`coldots` IS NOT NULL").show() ``` ``` +-------+ |coldots| +-------+ | 1| +-------+ ``` **After** ```scala val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() ``` ``` +--------+ |col.dots| +--------+ | 1| +--------+ ``` ## How was this patch tested? Unit tests added in `ParquetFilterSuite`. Author: hyukjinkwon <gurwls...@gmail.com> Closes #18000 from HyukjinKwon/SPARK-20364-workaround. (cherry picked from commit 8fb3d5c6da30922458091837eec17ccca502098a) Signed-off-by: Xiao Li <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b0cb3a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b0cb3a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b0cb3a7 Branch: refs/heads/branch-2.2 Commit: 8b0cb3a7be138d0f2059731ed4bbd8d01f599497 Parents: db821fe Author: hyukjinkwon <gurwls...@gmail.com> Authored: Thu May 18 10:52:23 2017 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Thu May 18 10:52:32 2017 -0700 ---------------------------------------------------------------------- .../datasources/parquet/ParquetFilters.scala | 57 +++++++++++--------- .../parquet/ParquetFilterSuite.scala | 15 ++++++ 2 files changed, 47 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8b0cb3a7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index a6a6cef..763841e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { - val dataTypeOf = getFieldMap(schema) + val nameToType = getFieldMap(schema) + + // Parquet does not allow dots in the column name because dots are used as a column path + // delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates + // with missing columns. The incorrect results could be got from Parquet when we push down + // filters for the column having dots in the names. Thus, we do not push down such filters. + // See SPARK-20364. + def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".") // NOTE: // @@ -184,30 +191,30 @@ private[parquet] object ParquetFilters { // Probably I missed something and obviously this should be changed. predicate match { - case sources.IsNull(name) if dataTypeOf.contains(name) => - makeEq.lift(dataTypeOf(name)).map(_(name, null)) - case sources.IsNotNull(name) if dataTypeOf.contains(name) => - makeNotEq.lift(dataTypeOf(name)).map(_(name, null)) - - case sources.EqualTo(name, value) if dataTypeOf.contains(name) => - makeEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.Not(sources.EqualTo(name, value)) if dataTypeOf.contains(name) => - makeNotEq.lift(dataTypeOf(name)).map(_(name, value)) - - case sources.EqualNullSafe(name, value) if dataTypeOf.contains(name) => - makeEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.Not(sources.EqualNullSafe(name, value)) if dataTypeOf.contains(name) => - makeNotEq.lift(dataTypeOf(name)).map(_(name, value)) - - case sources.LessThan(name, value) if dataTypeOf.contains(name) => - makeLt.lift(dataTypeOf(name)).map(_(name, value)) - case sources.LessThanOrEqual(name, value) if dataTypeOf.contains(name) => - makeLtEq.lift(dataTypeOf(name)).map(_(name, value)) - - case sources.GreaterThan(name, value) if dataTypeOf.contains(name) => - makeGt.lift(dataTypeOf(name)).map(_(name, value)) - case sources.GreaterThanOrEqual(name, value) if dataTypeOf.contains(name) => - makeGtEq.lift(dataTypeOf(name)).map(_(name, value)) + case sources.IsNull(name) if canMakeFilterOn(name) => + makeEq.lift(nameToType(name)).map(_(name, null)) + case sources.IsNotNull(name) if canMakeFilterOn(name) => + makeNotEq.lift(nameToType(name)).map(_(name, null)) + + case sources.EqualTo(name, value) if canMakeFilterOn(name) => + makeEq.lift(nameToType(name)).map(_(name, value)) + case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name) => + makeNotEq.lift(nameToType(name)).map(_(name, value)) + + case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) => + makeEq.lift(nameToType(name)).map(_(name, value)) + case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name) => + makeNotEq.lift(nameToType(name)).map(_(name, value)) + + case sources.LessThan(name, value) if canMakeFilterOn(name) => + makeLt.lift(nameToType(name)).map(_(name, value)) + case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name) => + makeLtEq.lift(nameToType(name)).map(_(name, value)) + + case sources.GreaterThan(name, value) if canMakeFilterOn(name) => + makeGt.lift(nameToType(name)).map(_(name, value)) + case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name) => + makeGtEq.lift(nameToType(name)).map(_(name, value)) case sources.And(lhs, rhs) => // At here, it is not safe to just convert one side if we do not understand the http://git-wip-us.apache.org/repos/asf/spark/blob/8b0cb3a7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index dd53b56..98427cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -538,6 +538,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // scalastyle:on nonascii } } + + test("SPARK-20364: Disable Parquet predicate pushdown for fields having dots in the names") { + import testImplicits._ + + Seq(true, false).foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString, + SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> true.toString) { + withTempPath { path => + Seq(Some(1), None).toDF("col.dots").write.parquet(path.getAbsolutePath) + val readBack = spark.read.parquet(path.getAbsolutePath).where("`col.dots` IS NOT NULL") + assert(readBack.count() == 1) + } + } + } + } } class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org