Repository: spark Updated Branches: refs/heads/branch-2.4 77156f8c8 -> 144cb949d
[SPARK-25579][SQL] Use quoted attribute names if needed in pushed ORC predicates ## What changes were proposed in this pull request? This PR aims to fix an ORC performance regression at Spark 2.4.0 RCs from Spark 2.3.2. Currently, for column names with `.`, the pushed predicates are ignored. **Test Data** ```scala scala> val df = spark.range(Int.MaxValue).sample(0.2).toDF("col.with.dot") scala> df.write.mode("overwrite").orc("/tmp/orc") ``` **Spark 2.3.2** ```scala scala> spark.sql("set spark.sql.orc.impl=native") scala> spark.sql("set spark.sql.orc.filterPushdown=true") scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) +------------+ |col.with.dot| +------------+ | 5| | 7| | 8| +------------+ Time taken: 1542 ms scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) +------------+ |col.with.dot| +------------+ | 5| | 7| | 8| +------------+ Time taken: 152 ms ``` **Spark 2.4.0 RC3** ```scala scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) +------------+ |col.with.dot| +------------+ | 5| | 7| | 8| +------------+ Time taken: 4074 ms scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show) +------------+ |col.with.dot| +------------+ | 5| | 7| | 8| +------------+ Time taken: 1771 ms ``` ## How was this patch tested? Pass the Jenkins with a newly added test case. Closes #22597 from dongjoon-hyun/SPARK-25579. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: hyukjinkwon <gurwls...@apache.org> (cherry picked from commit 2c664edc060a41340eb374fd44b5d32c3c06a15c) Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/144cb949 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/144cb949 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/144cb949 Branch: refs/heads/branch-2.4 Commit: 144cb949d597e6cd0e662f2320e983cb6903ecfb Parents: 77156f8 Author: Dongjoon Hyun <dongj...@apache.org> Authored: Tue Oct 16 20:30:23 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Tue Oct 16 20:30:40 2018 +0800 ---------------------------------------------------------------------- .../execution/datasources/orc/OrcFilters.scala | 37 +++++++++++++++----- .../datasources/orc/OrcQuerySuite.scala | 28 +++++---------- .../sql/execution/datasources/orc/OrcTest.scala | 10 ++++++ 3 files changed, 46 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/144cb949/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index dbafc46..5b93a60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -67,6 +67,16 @@ private[sql] object OrcFilters { } } + // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters + // in order to distinguish predicate pushdown for nested columns. + private def quoteAttributeNameIfNeeded(name: String) : String = { + if (!name.contains("`") && name.contains(".")) { + s"`$name`" + } else { + name + } + } + /** * Create ORC filter as a SearchArgument instance. */ @@ -178,38 +188,47 @@ private[sql] object OrcFilters { // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().equals(attribute, getType(attribute), castedValue).end()) + Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end()) case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().nullSafeEquals(attribute, getType(attribute), castedValue).end()) + Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end()) case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().lessThan(attribute, getType(attribute), castedValue).end()) + Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end()) case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startAnd().lessThanEquals(attribute, getType(attribute), castedValue).end()) + Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end()) case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startNot().lessThanEquals(attribute, getType(attribute), castedValue).end()) + Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end()) case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => + val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValue = castLiteralValue(value, dataTypeMap(attribute)) - Some(builder.startNot().lessThan(attribute, getType(attribute), castedValue).end()) + Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end()) case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().isNull(attribute, getType(attribute)).end()) + val quotedName = quoteAttributeNameIfNeeded(attribute) + Some(builder.startAnd().isNull(quotedName, getType(attribute)).end()) case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startNot().isNull(attribute, getType(attribute)).end()) + val quotedName = quoteAttributeNameIfNeeded(attribute) + Some(builder.startNot().isNull(quotedName, getType(attribute)).end()) case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => + val quotedName = quoteAttributeNameIfNeeded(attribute) val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute))) - Some(builder.startAnd().in(attribute, getType(attribute), + Some(builder.startAnd().in(quotedName, getType(attribute), castedValues.map(_.asInstanceOf[AnyRef]): _*).end()) case _ => None http://git-wip-us.apache.org/repos/asf/spark/blob/144cb949/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index e9dccbf..998b7b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -445,16 +445,7 @@ abstract class OrcQueryTest extends OrcTest { test("Support for pushing down filters for decimal types") { withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i))) - withTempPath { file => - // It needs to repartition data so that we can have several ORC files - // in order to skip stripes in ORC. - spark.createDataFrame(data).toDF("a").repartition(10) - .write.orc(file.getCanonicalPath) - val df = spark.read.orc(file.getCanonicalPath).where("a == 2") - val actual = stripSparkFilter(df).count() - - assert(actual < 10) - } + checkPredicatePushDown(spark.createDataFrame(data).toDF("a"), 10, "a == 2") } } @@ -465,16 +456,7 @@ abstract class OrcQueryTest extends OrcTest { val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600 Tuple1(new Timestamp(milliseconds)) } - withTempPath { file => - // It needs to repartition data so that we can have several ORC files - // in order to skip stripes in ORC. - spark.createDataFrame(data).toDF("a").repartition(10) - .write.orc(file.getCanonicalPath) - val df = spark.read.orc(file.getCanonicalPath).where(s"a == '$timeString'") - val actual = stripSparkFilter(df).count() - - assert(actual < 10) - } + checkPredicatePushDown(spark.createDataFrame(data).toDF("a"), 10, s"a == '$timeString'") } } @@ -674,6 +656,12 @@ class OrcQuerySuite extends OrcQueryTest with SharedSQLContext { } } + test("SPARK-25579 ORC PPD should support column names with dot") { + withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { + checkPredicatePushDown(spark.range(10).toDF("col.dot"), 10, "`col.dot` == 2") + } + } + test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") { val e = intercept[AnalysisException] { http://git-wip-us.apache.org/repos/asf/spark/blob/144cb949/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index 38b34a0..a35c536 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -106,4 +106,14 @@ abstract class OrcTest extends QueryTest with SQLTestUtils with BeforeAndAfterAl df: DataFrame, path: File): Unit = { df.write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath) } + + protected def checkPredicatePushDown(df: DataFrame, numRows: Int, predicate: String): Unit = { + withTempPath { file => + // It needs to repartition data so that we can have several ORC files + // in order to skip stripes in ORC. + df.repartition(numRows).write.orc(file.getCanonicalPath) + val actual = stripSparkFilter(spark.read.orc(file.getCanonicalPath).where(predicate)).count() + assert(actual < numRows) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org