This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 6bc088f  [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading 
parquet non-decimal fields as decimal
6bc088f is described below

commit 6bc088fd0499a28201dc6c2a25836d02d769e14d
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Jan 27 09:34:31 2021 -0800

    [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet 
non-decimal fields as decimal
    
    This is a followup of https://github.com/apache/spark/pull/31319 .
    
    When reading parquet int/long as decimal, the behavior should be the same 
as reading int/long and then cast to the decimal type. This PR changes to the 
expected behavior.
    
    When reading parquet binary as decimal, we don't really know how to 
interpret the binary (it may from a string), and should fail. This PR changes 
to the expected behavior.
    
    To make the behavior more sane.
    
    Yes, but it's a followup.
    
    updated test
    
    Closes #31357 from cloud-fan/bug.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 2dbb7d5af8f498e49488cd8876bd3d0b083723b7)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../datasources/parquet/ParquetRowConverter.scala  | 48 +++++++++++-------
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 58 ++++++++++++----------
 2 files changed, 60 insertions(+), 46 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 0d22fe5..5878bb0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -210,19 +210,6 @@ private[parquet] class ParquetRowConverter(
   }
 
   /**
-   * Get a precision and a scale to interpret parquet decimal values.
-   * 1. If there is a decimal metadata, we read decimal values with the given 
precision and scale.
-   * 2. If there is no metadata, we read decimal values with scale `0` because 
it's plain integers
-   *    when it is written into INT32/INT64/BINARY/FIXED_LEN_BYTE_ARRAY types.
-   */
-  private def getPrecisionAndScale(parquetType: Type, t: DecimalType): (Int, 
Int) = {
-    val metadata = parquetType.asPrimitiveType().getDecimalMetadata
-    val precision = if (metadata == null) t.precision else 
metadata.getPrecision()
-    val scale = if (metadata == null) 0 else metadata.getScale()
-    (precision, scale)
-  }
-
-  /**
    * Creates a converter for the given Parquet type `parquetType` and Spark 
SQL data type
    * `catalystType`. Converted values are handled by `updater`.
    */
@@ -249,20 +236,43 @@ private[parquet] class ParquetRowConverter(
 
       // For INT32 backed decimals
       case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
-        val (precision, scale) = getPrecisionAndScale(parquetType, t)
-        new ParquetIntDictionaryAwareDecimalConverter(precision, scale, 
updater)
+        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+        if (metadata == null) {
+          // If the column is a plain INT32, we should pick the precision that 
can host the largest
+          // INT32 value.
+          new ParquetIntDictionaryAwareDecimalConverter(
+            DecimalType.IntDecimal.precision, 0, updater)
+        } else {
+          new ParquetIntDictionaryAwareDecimalConverter(
+            metadata.getPrecision, metadata.getScale, updater)
+        }
 
       // For INT64 backed decimals
       case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
-        val (precision, scale) = getPrecisionAndScale(parquetType, t)
-        new ParquetLongDictionaryAwareDecimalConverter(precision, scale, 
updater)
+        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+        if (metadata == null) {
+          // If the column is a plain INT64, we should pick the precision that 
can host the largest
+          // INT64 value.
+          new ParquetLongDictionaryAwareDecimalConverter(
+            DecimalType.LongDecimal.precision, 0, updater)
+        } else {
+          new ParquetLongDictionaryAwareDecimalConverter(
+            metadata.getPrecision, metadata.getScale, updater)
+        }
 
       // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
       case t: DecimalType
         if parquetType.asPrimitiveType().getPrimitiveTypeName == 
FIXED_LEN_BYTE_ARRAY ||
            parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
-        val (precision, scale) = getPrecisionAndScale(parquetType, t)
-        new ParquetBinaryDictionaryAwareDecimalConverter(precision, scale, 
updater)
+        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+        if (metadata == null) {
+          throw new RuntimeException(s"Unable to create Parquet converter for 
${t.typeName} " +
+            s"whose Parquet type is $parquetType without decimal metadata. 
Please read this " +
+            "column/field as Spark BINARY type." )
+        } else {
+          new ParquetBinaryDictionaryAwareDecimalConverter(
+            metadata.getPrecision, metadata.getScale, updater)
+        }
 
       case t: DecimalType =>
         throw new RuntimeException(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1af50bf..a2efed6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3143,52 +3143,56 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("SPARK-34212 Parquet should read decimals correctly") {
-    // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is 
binary-decimal (16 bytes)
-    val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS 
DECIMAL(36, 2)) c")
+    def readParquet(schema: String, path: File): DataFrame = {
+      spark.read.schema(schema).parquet(path.toString)
+    }
 
     withTempPath { path =>
+      // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is 
binary-decimal (16 bytes)
+      val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS 
DECIMAL(36, 2)) c")
       df.write.parquet(path.toString)
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
-        checkAnswer(spark.read.schema(schema1).parquet(path.toString), df)
+        checkAnswer(readParquet(schema1, path), df)
         val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-        checkAnswer(spark.read.schema(schema2).parquet(path.toString), Row(1, 
1.2, 1.2))
+        checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
       }
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
-        val e1 = intercept[SparkException] {
-          spark.read.schema("a DECIMAL(3, 2)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e1.isInstanceOf[SchemaColumnConvertNotSupportedException])
-
-        val e2 = intercept[SparkException] {
-          spark.read.schema("b DECIMAL(18, 
1)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e2.isInstanceOf[SchemaColumnConvertNotSupportedException])
-
-        val e3 = intercept[SparkException] {
-          spark.read.schema("c DECIMAL(37, 
1)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e3.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach 
{ schema =>
+          val e = intercept[SparkException] {
+            readParquet(schema, path).collect()
+          }.getCause.getCause
+          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        }
       }
     }
 
+    // tests for parquet types without decimal metadata.
     withTempPath { path =>
-      val df2 = sql(s"SELECT 1 a, ${Int.MaxValue + 1L} b")
-      df2.write.parquet(path.toString)
+      val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, 
CAST('1.2' AS BINARY) d")
+      df.write.parquet(path.toString)
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
-        val schema = "a DECIMAL(3, 2), b DECIMAL(17, 2)"
-        checkAnswer(spark.read.schema(schema).parquet(path.toString),
-          Row(BigDecimal(100, 2), BigDecimal((Int.MaxValue + 1L) * 100, 2)))
+        checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+        checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+        checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 
123456.0"))
+        checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
+        checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
+        val e = intercept[SparkException] {
+          readParquet("d DECIMAL(3, 2)", path).collect()
+        }.getCause
+        assert(e.getMessage.contains("Please read this column/field as Spark 
BINARY type"))
       }
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
-        val e = intercept[SparkException] {
-          spark.read.schema("a DECIMAL(3, 2)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach 
{ schema =>
+          val e = intercept[SparkException] {
+            readParquet(schema, path).collect()
+          }.getCause.getCause
+          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to