Repository: spark
Updated Branches:
  refs/heads/branch-1.4 011b07e23 -> 9d6475b93


[SPARK-6917] [SQL] DecimalType is not read back when non-native type exists

cc yhuai

Author: Davies Liu <dav...@databricks.com>

Closes #6558 from davies/decimalType and squashes the following commits:

c877ca8 [Davies Liu] Update ParquetConverter.scala
48cc57c [Davies Liu] Update ParquetConverter.scala
b43845c [Davies Liu] add test
3b4a94f [Davies Liu] DecimalType is not read back when non-native type exists

(cherry picked from commit bcb47ad7718b843fbd25cd1e228a7b7e6e5b8686)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d6475b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d6475b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d6475b9

Branch: refs/heads/branch-1.4
Commit: 9d6475b93d1e35e2cd5d7ffd1ef28faf6ce0a425
Parents: 011b07e
Author: Davies Liu <dav...@databricks.com>
Authored: Mon Jun 1 23:12:29 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Jun 1 23:12:37 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/ParquetConverter.scala    |  4 +++-
 .../apache/spark/sql/parquet/ParquetQuerySuite.scala   | 13 +++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9d6475b9/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 1b4196a..caa9f04 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -243,8 +243,10 @@ private[parquet] abstract class CatalystConverter extends 
GroupConverter {
   /**
    * Read a decimal value from a Parquet Binary into "dest". Only supports 
decimals that fit in
    * a long (i.e. precision <= 18)
+   *
+   * Returned value is needed by CatalystConverter, which doesn't reuse the 
Decimal object.
    */
-  protected[parquet] def readDecimal(dest: Decimal, value: Binary, ctype: 
DecimalType): Unit = {
+  protected[parquet] def readDecimal(dest: Decimal, value: Binary, ctype: 
DecimalType): Decimal = {
     val precision = ctype.precisionInfo.get.precision
     val scale = ctype.precisionInfo.get.scale
     val bytes = value.getBytes

http://git-wip-us.apache.org/repos/asf/spark/blob/9d6475b9/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index b98ba09..304936f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
 
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.{SQLConf, QueryTest}
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.test.TestSQLContext
@@ -111,6 +112,18 @@ class ParquetQuerySuiteBase extends QueryTest with 
ParquetTest {
         List(Row("same", "run_5", 100)))
     }
   }
+
+  test("SPARK-6917 DecimalType should work with non-native types") {
+    val data = (1 to 10).map(i => Row(Decimal(i, 18, 0), new 
java.sql.Timestamp(i)))
+    val schema = StructType(List(StructField("d", DecimalType(18, 0), false),
+      StructField("time", TimestampType, false)).toArray)
+    withTempPath { file =>
+      val df = sqlContext.createDataFrame(sparkContext.parallelize(data), 
schema)
+      df.write.parquet(file.getCanonicalPath)
+      val df2 = sqlContext.read.parquet(file.getCanonicalPath)
+      checkAnswer(df2, df.collect().toSeq)
+    }
+  }
 }
 
 class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with 
BeforeAndAfterAll {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to