This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new f3ba73a  [SPARK-27160][SQL] Fix DecimalType when building orc filters
f3ba73a is described below

commit f3ba73a5f54cc233424cee4fdfd3a61674b2b48e
Author: Darcy Shen <sad...@zoho.com>
AuthorDate: Tue Mar 19 20:28:46 2019 -0700

    [SPARK-27160][SQL] Fix DecimalType when building orc filters
    
    DecimalType Literal should not be casted to Long.
    
    eg. For `df.filter("x < 3.14")`, assuming df (x in DecimalType) reads from 
a ORC table and uses the native ORC reader with predicate push down enabled, we 
will push down the `x < 3.14` predicate to the ORC reader via a SearchArgument.
    
    OrcFilters will construct the SearchArgument, but not handle the 
DecimalType correctly.
    
    The previous impl will construct `x < 3` from `x < 3.14`.
    
    ```
    $ sbt
    > sql/testOnly *OrcFilterSuite
    > sql/testOnly *OrcQuerySuite -- -z "27160"
    ```
    
    Closes #24092 from sadhen/spark27160.
    
    Authored-by: Darcy Shen <sad...@zoho.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/execution/datasources/orc/OrcFilters.scala     |  6 ++----
 .../sql/execution/datasources/orc/OrcFilterSuite.scala | 13 +++++++++++++
 .../sql/execution/datasources/orc/OrcQuerySuite.scala  | 18 +++++++++++++++++-
 3 files changed, 32 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 5b93a60..6bc0d4f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.orc
 
+import org.apache.orc.storage.common.`type`.HiveDecimal
 import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
 import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
 import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
@@ -134,10 +135,7 @@ private[sql] object OrcFilters {
     case FloatType | DoubleType =>
       value.asInstanceOf[Number].doubleValue()
     case _: DecimalType =>
-      val decimal = value.asInstanceOf[java.math.BigDecimal]
-      val decimalWritable = new HiveDecimalWritable(decimal.longValue)
-      decimalWritable.mutateEnforcePrecisionScale(decimal.precision, 
decimal.scale)
-      decimalWritable
+      new 
HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal]))
     case _ => value
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 8680b86..1c6d07d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.orc
 
+import java.math.MathContext
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 
@@ -383,4 +384,16 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext 
{
       )).get.toString
     }
   }
+
+  test("SPARK-27160: Fix casting of the DecimalType literal") {
+    import org.apache.spark.sql.sources._
+    val schema = StructType(Array(StructField("a", DecimalType(3, 2))))
+    assertResult("leaf-0 = (LESS_THAN a 3.14), expr = leaf-0") {
+      OrcFilters.createFilter(schema, Array(
+        LessThan(
+          "a",
+          new java.math.BigDecimal(3.14, MathContext.DECIMAL64).setScale(2)))
+      ).get.toString
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 998b7b3..9b35ad8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 case class AllDataTypesWithNonPrimitiveType(
@@ -597,6 +597,22 @@ abstract class OrcQueryTest extends OrcTest {
       assert(m4.contains("Malformed ORC file"))
     }
   }
+
+  test("SPARK-27160 Predicate pushdown correctness on DecimalType for ORC") {
+    withTempPath { dir =>
+      withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+        val path = dir.getCanonicalPath
+        Seq(BigDecimal(0.1), BigDecimal(0.2), BigDecimal(-0.3))
+          .toDF("x").write.orc(path)
+        val df = spark.read.orc(path)
+        checkAnswer(df.filter("x >= 0.1"), Seq(Row(0.1), Row(0.2)))
+        checkAnswer(df.filter("x > 0.1"), Seq(Row(0.2)))
+        checkAnswer(df.filter("x <= 0.15"), Seq(Row(0.1), Row(-0.3)))
+        checkAnswer(df.filter("x < 0.1"), Seq(Row(-0.3)))
+        checkAnswer(df.filter("x == 0.2"), Seq(Row(0.2)))
+      }
+    }
+  }
 }
 
 class OrcQuerySuite extends OrcQueryTest with SharedSQLContext {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to