Repository: spark
Updated Branches:
  refs/heads/branch-2.4 77156f8c8 -> 144cb949d


[SPARK-25579][SQL] Use quoted attribute names if needed in pushed ORC predicates

## What changes were proposed in this pull request?

This PR aims to fix an ORC performance regression at Spark 2.4.0 RCs from Spark 
2.3.2. Currently, for column names with `.`, the pushed predicates are ignored.

**Test Data**
```scala
scala> val df = spark.range(Int.MaxValue).sample(0.2).toDF("col.with.dot")
scala> df.write.mode("overwrite").orc("/tmp/orc")
```

**Spark 2.3.2**
```scala
scala> spark.sql("set spark.sql.orc.impl=native")
scala> spark.sql("set spark.sql.orc.filterPushdown=true")
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 1542 ms

scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 152 ms
```

**Spark 2.4.0 RC3**
```scala
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 4074 ms

scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 1771 ms
```

## How was this patch tested?

Pass the Jenkins with a newly added test case.

Closes #22597 from dongjoon-hyun/SPARK-25579.

Authored-by: Dongjoon Hyun <dongj...@apache.org>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>
(cherry picked from commit 2c664edc060a41340eb374fd44b5d32c3c06a15c)
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/144cb949
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/144cb949
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/144cb949

Branch: refs/heads/branch-2.4
Commit: 144cb949d597e6cd0e662f2320e983cb6903ecfb
Parents: 77156f8
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Tue Oct 16 20:30:23 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Tue Oct 16 20:30:40 2018 +0800

----------------------------------------------------------------------
 .../execution/datasources/orc/OrcFilters.scala  | 37 +++++++++++++++-----
 .../datasources/orc/OrcQuerySuite.scala         | 28 +++++----------
 .../sql/execution/datasources/orc/OrcTest.scala | 10 ++++++
 3 files changed, 46 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/144cb949/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index dbafc46..5b93a60 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -67,6 +67,16 @@ private[sql] object OrcFilters {
     }
   }
 
+  // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` 
characters
+  // in order to distinguish predicate pushdown for nested columns.
+  private def quoteAttributeNameIfNeeded(name: String) : String = {
+    if (!name.contains("`") && name.contains(".")) {
+      s"`$name`"
+    } else {
+      name
+    }
+  }
+
   /**
    * Create ORC filter as a SearchArgument instance.
    */
@@ -178,38 +188,47 @@ private[sql] object OrcFilters {
       // wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
 
       case EqualTo(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
+        val quotedName = quoteAttributeNameIfNeeded(attribute)
         val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().equals(attribute, getType(attribute), 
castedValue).end())
+        Some(builder.startAnd().equals(quotedName, getType(attribute), 
castedValue).end())
 
       case EqualNullSafe(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
+        val quotedName = quoteAttributeNameIfNeeded(attribute)
         val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().nullSafeEquals(attribute, getType(attribute), 
castedValue).end())
+        Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), 
castedValue).end())
 
       case LessThan(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
+        val quotedName = quoteAttributeNameIfNeeded(attribute)
         val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().lessThan(attribute, getType(attribute), 
castedValue).end())
+        Some(builder.startAnd().lessThan(quotedName, getType(attribute), 
castedValue).end())
 
       case LessThanOrEqual(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
+        val quotedName = quoteAttributeNameIfNeeded(attribute)
         val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startAnd().lessThanEquals(attribute, getType(attribute), 
castedValue).end())
+        Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), 
castedValue).end())
 
       case GreaterThan(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
+        val quotedName = quoteAttributeNameIfNeeded(attribute)
         val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startNot().lessThanEquals(attribute, getType(attribute), 
castedValue).end())
+        Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), 
castedValue).end())
 
       case GreaterThanOrEqual(attribute, value) if 
isSearchableType(dataTypeMap(attribute)) =>
+        val quotedName = quoteAttributeNameIfNeeded(attribute)
         val castedValue = castLiteralValue(value, dataTypeMap(attribute))
-        Some(builder.startNot().lessThan(attribute, getType(attribute), 
castedValue).end())
+        Some(builder.startNot().lessThan(quotedName, getType(attribute), 
castedValue).end())
 
       case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startAnd().isNull(attribute, getType(attribute)).end())
+        val quotedName = quoteAttributeNameIfNeeded(attribute)
+        Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())
 
       case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
-        Some(builder.startNot().isNull(attribute, getType(attribute)).end())
+        val quotedName = quoteAttributeNameIfNeeded(attribute)
+        Some(builder.startNot().isNull(quotedName, getType(attribute)).end())
 
       case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
+        val quotedName = quoteAttributeNameIfNeeded(attribute)
         val castedValues = values.map(v => castLiteralValue(v, 
dataTypeMap(attribute)))
-        Some(builder.startAnd().in(attribute, getType(attribute),
+        Some(builder.startAnd().in(quotedName, getType(attribute),
           castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
 
       case _ => None

http://git-wip-us.apache.org/repos/asf/spark/blob/144cb949/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index e9dccbf..998b7b3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -445,16 +445,7 @@ abstract class OrcQueryTest extends OrcTest {
   test("Support for pushing down filters for decimal types") {
     withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
       val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
-      withTempPath { file =>
-        // It needs to repartition data so that we can have several ORC files
-        // in order to skip stripes in ORC.
-        spark.createDataFrame(data).toDF("a").repartition(10)
-          .write.orc(file.getCanonicalPath)
-        val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
-        val actual = stripSparkFilter(df).count()
-
-        assert(actual < 10)
-      }
+      checkPredicatePushDown(spark.createDataFrame(data).toDF("a"), 10, "a == 
2")
     }
   }
 
@@ -465,16 +456,7 @@ abstract class OrcQueryTest extends OrcTest {
         val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
         Tuple1(new Timestamp(milliseconds))
       }
-      withTempPath { file =>
-        // It needs to repartition data so that we can have several ORC files
-        // in order to skip stripes in ORC.
-        spark.createDataFrame(data).toDF("a").repartition(10)
-          .write.orc(file.getCanonicalPath)
-        val df = spark.read.orc(file.getCanonicalPath).where(s"a == 
'$timeString'")
-        val actual = stripSparkFilter(df).count()
-
-        assert(actual < 10)
-      }
+      checkPredicatePushDown(spark.createDataFrame(data).toDF("a"), 10, s"a == 
'$timeString'")
     }
   }
 
@@ -674,6 +656,12 @@ class OrcQuerySuite extends OrcQueryTest with 
SharedSQLContext {
     }
   }
 
+  test("SPARK-25579 ORC PPD should support column names with dot") {
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      checkPredicatePushDown(spark.range(10).toDF("col.dot"), 10, "`col.dot` 
== 2")
+    }
+  }
+
   test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and 
sql/core") {
     withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") {
       val e = intercept[AnalysisException] {

http://git-wip-us.apache.org/repos/asf/spark/blob/144cb949/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 38b34a0..a35c536 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -106,4 +106,14 @@ abstract class OrcTest extends QueryTest with SQLTestUtils 
with BeforeAndAfterAl
       df: DataFrame, path: File): Unit = {
     df.write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
   }
+
+  protected def checkPredicatePushDown(df: DataFrame, numRows: Int, predicate: 
String): Unit = {
+    withTempPath { file =>
+      // It needs to repartition data so that we can have several ORC files
+      // in order to skip stripes in ORC.
+      df.repartition(numRows).write.orc(file.getCanonicalPath)
+      val actual = 
stripSparkFilter(spark.read.orc(file.getCanonicalPath).where(predicate)).count()
+      assert(actual < numRows)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to