Repository: spark
Updated Branches:
  refs/heads/master 2d868d939 -> 6d7ebf2f9


[SPARK-22165][SQL] Fixes type conflicts between double, long, decimals, dates 
and timestamps in partition column

## What changes were proposed in this pull request?

This PR proposes to add a rule that re-uses `TypeCoercion.findWiderCommonType` 
when resolving type conflicts in partition values.

Currently, this uses numeric precedence-like comparison; therefore, it looks 
introducing failures for type conflicts between timestamps, dates and decimals, 
please see:

```scala
private val upCastingOrder: Seq[DataType] =
  Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
...
literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
```

The codes below:

```scala
val df = Seq((1, "2015-01-01"), (2, "2016-01-01 00:00:00")).toDF("i", "ts")
df.write.format("parquet").partitionBy("ts").save("/tmp/foo")
spark.read.load("/tmp/foo").printSchema()

val df = Seq((1, "1"), (2, "1" * 30)).toDF("i", "decimal")
df.write.format("parquet").partitionBy("decimal").save("/tmp/bar")
spark.read.load("/tmp/bar").printSchema()
```

produces output as below:

**Before**

```
root
 |-- i: integer (nullable = true)
 |-- ts: date (nullable = true)

root
 |-- i: integer (nullable = true)
 |-- decimal: integer (nullable = true)
```

**After**

```
root
 |-- i: integer (nullable = true)
 |-- ts: timestamp (nullable = true)

root
 |-- i: integer (nullable = true)
 |-- decimal: decimal(30,0) (nullable = true)
```

### Type coercion table:

This PR proposes the type conflict resolusion as below:

**Before**

|InputA \ 
InputB|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|------------------------|----------|----------|----------|----------|----------|----------|----------|----------|
|**`NullType`**|`StringType`|`IntegerType`|`LongType`|`StringType`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`IntegerType`**|`IntegerType`|`IntegerType`|`LongType`|`IntegerType`|`DoubleType`|`IntegerType`|`IntegerType`|`StringType`|
|**`LongType`**|`LongType`|`LongType`|`LongType`|`LongType`|`DoubleType`|`LongType`|`LongType`|`StringType`|
|**`DecimalType(38,0)`**|`StringType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DecimalType(38,0)`|`DecimalType(38,0)`|`StringType`|
|**`DoubleType`**|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`StringType`|
|**`DateType`**|`StringType`|`IntegerType`|`LongType`|`DateType`|`DoubleType`|`DateType`|`DateType`|`StringType`|
|**`TimestampType`**|`StringType`|`IntegerType`|`LongType`|`TimestampType`|`DoubleType`|`TimestampType`|`TimestampType`|`StringType`|
|**`StringType`**|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|

**After**

|InputA \ 
InputB|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|------------------------|----------|----------|----------|----------|----------|----------|----------|----------|
|**`NullType`**|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|**`IntegerType`**|`IntegerType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`LongType`**|`LongType`|`LongType`|`LongType`|`DecimalType(38,0)`|`StringType`|`StringType`|`StringType`|`StringType`|
|**`DecimalType(38,0)`**|`DecimalType(38,0)`|`DecimalType(38,0)`|`DecimalType(38,0)`|`DecimalType(38,0)`|`StringType`|`StringType`|`StringType`|`StringType`|
|**`DoubleType`**|`DoubleType`|`DoubleType`|`StringType`|`StringType`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`DateType`**|`DateType`|`StringType`|`StringType`|`StringType`|`StringType`|`DateType`|`TimestampType`|`StringType`|
|**`TimestampType`**|`TimestampType`|`StringType`|`StringType`|`StringType`|`StringType`|`TimestampType`|`TimestampType`|`StringType`|
|**`StringType`**|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|

This was produced by:

```scala
  test("Print out chart") {
    val supportedTypes: Seq[DataType] = Seq(
      NullType, IntegerType, LongType, DecimalType(38, 0), DoubleType,
      DateType, TimestampType, StringType)

    // Old type conflict resolution:
    val upCastingOrder: Seq[DataType] =
      Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
    def oldResolveTypeConflicts(dataTypes: Seq[DataType]): DataType = {
      val topType = dataTypes.maxBy(upCastingOrder.indexOf(_))
      if (topType == NullType) StringType else topType
    }
    println(s"|InputA \\ InputB|${supportedTypes.map(dt => 
s"`${dt.toString}`").mkString("|")}|")
    println(s"|------------------------|${supportedTypes.map(_ => 
"----------").mkString("|")}|")
    supportedTypes.foreach { inputA =>
      val types = supportedTypes.map(inputB => 
oldResolveTypeConflicts(Seq(inputA, inputB)))
      println(s"|**`$inputA`**|${types.map(dt => 
s"`${dt.toString}`").mkString("|")}|")
    }

    // New type conflict resolution:
    def newResolveTypeConflicts(dataTypes: Seq[DataType]): DataType = {
      dataTypes.fold[DataType](NullType)(findWiderTypeForPartitionColumn)
    }
    println(s"|InputA \\ InputB|${supportedTypes.map(dt => 
s"`${dt.toString}`").mkString("|")}|")
    println(s"|------------------------|${supportedTypes.map(_ => 
"----------").mkString("|")}|")
    supportedTypes.foreach { inputA =>
      val types = supportedTypes.map(inputB => 
newResolveTypeConflicts(Seq(inputA, inputB)))
      println(s"|**`$inputA`**|${types.map(dt => 
s"`${dt.toString}`").mkString("|")}|")
    }
  }
```

## How was this patch tested?

Unit tests added in `ParquetPartitionDiscoverySuite`.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #19389 from HyukjinKwon/partition-type-coercion.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d7ebf2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d7ebf2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d7ebf2f

Branch: refs/heads/master
Commit: 6d7ebf2f9fbd043813738005a23c57a77eba6f47
Parents: 2d868d9
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Tue Nov 21 20:53:38 2017 +0100
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Nov 21 20:53:38 2017 +0100

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   | 139 +++++++++++++++++++
 .../sql/catalyst/analysis/TypeCoercion.scala    |   2 +-
 .../datasources/PartitioningUtils.scala         |  60 +++++---
 .../ParquetPartitionDiscoverySuite.scala        |  57 +++++++-
 4 files changed, 235 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6d7ebf2f/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 686fcb1..5f98213 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1577,6 +1577,145 @@ options.
 
   - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when 
the referenced columns only include the internal corrupt record column (named 
`_corrupt_record` by default). For example, 
`spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()`
 and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. 
Instead, you can cache or save the parsed results and then send the same query. 
For example, `val df = spark.read.schema(schema).json(file).cache()` and then 
`df.filter($"_corrupt_record".isNotNull).count()`.
   - The `percentile_approx` function previously accepted numeric type input 
and output double type results. Now it supports date type, timestamp type and 
numeric types as input types. The result type is also changed to be the same as 
the input type, which is more reasonable for percentiles.
+  - Partition column inference previously found incorrect common type for 
different inferred types, for example, previously it ended up with double type 
as the common type for double type and date type. Now it finds the correct 
common type for such conflicts. The conflict resolution follows the table below:
+
+    <table class="table">
+      <tr>
+        <th>
+          <b>InputA \ InputB</b>
+        </th>
+        <th>
+          <b>NullType</b>
+        </th>
+        <th>
+          <b>IntegerType</b>
+        </th>
+        <th>
+          <b>LongType</b>
+        </th>
+        <th>
+          <b>DecimalType(38,0)*</b>
+        </th>
+        <th>
+          <b>DoubleType</b>
+        </th>
+        <th>
+          <b>DateType</b>
+        </th>
+        <th>
+          <b>TimestampType</b>
+        </th>
+        <th>
+          <b>StringType</b>
+        </th>
+      </tr>
+      <tr>
+        <td>
+          <b>NullType</b>
+        </td>
+        <td>NullType</td>
+        <td>IntegerType</td>
+        <td>LongType</td>
+        <td>DecimalType(38,0)</td>
+        <td>DoubleType</td>
+        <td>DateType</td>
+        <td>TimestampType</td>
+        <td>StringType</td>
+      </tr>
+      <tr>
+        <td>
+          <b>IntegerType</b>
+        </td>
+        <td>IntegerType</td>
+        <td>IntegerType</td>
+        <td>LongType</td>
+        <td>DecimalType(38,0)</td>
+        <td>DoubleType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+      </tr>
+      <tr>
+        <td>
+          <b>LongType</b>
+        </td>
+        <td>LongType</td>
+        <td>LongType</td>
+        <td>LongType</td>
+        <td>DecimalType(38,0)</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+      </tr>
+      <tr>
+        <td>
+          <b>DecimalType(38,0)*</b>
+        </td>
+        <td>DecimalType(38,0)</td>
+        <td>DecimalType(38,0)</td>
+        <td>DecimalType(38,0)</td>
+        <td>DecimalType(38,0)</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+      </tr>
+      <tr>
+        <td>
+          <b>DoubleType</b>
+        </td>
+        <td>DoubleType</td>
+        <td>DoubleType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>DoubleType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+      </tr>
+      <tr>
+        <td>
+          <b>DateType</b>
+        </td>
+        <td>DateType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>DateType</td>
+        <td>TimestampType</td>
+        <td>StringType</td>
+      </tr>
+      <tr>
+        <td>
+          <b>TimestampType</b>
+        </td>
+        <td>TimestampType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>TimestampType</td>
+        <td>TimestampType</td>
+        <td>StringType</td>
+      </tr>
+      <tr>
+        <td>
+          <b>StringType</b>
+        </td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+        <td>StringType</td>
+      </tr>
+    </table>
+
+    Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally 
does not cover all other combinations of scales and precisions because 
currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 
1.1 is inferred as double type.
 
 ## Upgrading From Spark SQL 2.1 to 2.2
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7ebf2f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 074eda5..28be955 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -155,7 +155,7 @@ object TypeCoercion {
    * i.e. the main difference with [[findTightestCommonType]] is that here we 
allow some
    * loss of precision when widening decimal and double, and promotion to 
string.
    */
-  private[analysis] def findWiderTypeForTwo(t1: DataType, t2: DataType): 
Option[DataType] = {
+  def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = {
     findTightestCommonType(t1, t2)
       .orElse(findWiderTypeForDecimal(t1, t2))
       .orElse(stringPromotion(t1, t2))

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7ebf2f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 1c00c9e..472bf82 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -309,13 +309,8 @@ object PartitioningUtils {
   }
 
   /**
-   * Resolves possible type conflicts between partitions by up-casting "lower" 
types.  The up-
-   * casting order is:
-   * {{{
-   *   NullType ->
-   *   IntegerType -> LongType ->
-   *   DoubleType -> StringType
-   * }}}
+   * Resolves possible type conflicts between partitions by up-casting "lower" 
types using
+   * [[findWiderTypeForPartitionColumn]].
    */
   def resolvePartitions(
       pathsWithPartitionValues: Seq[(Path, PartitionValues)],
@@ -372,11 +367,31 @@ object PartitioningUtils {
       suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
   }
 
+  // scalastyle:off line.size.limit
   /**
-   * Converts a string to a [[Literal]] with automatic type inference.  
Currently only supports
-   * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType]], 
[[DateType]]
+   * Converts a string to a [[Literal]] with automatic type inference. 
Currently only supports
+   * [[NullType]], [[IntegerType]], [[LongType]], [[DoubleType]], 
[[DecimalType]], [[DateType]]
    * [[TimestampType]], and [[StringType]].
+   *
+   * When resolving conflicts, it follows the table below:
+   *
+   * 
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
+   * | InputA \ InputB    | NullType          | IntegerType       | LongType   
       | DecimalType(38,0)* | DoubleType | DateType      | TimestampType | 
StringType |
+   * 
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
+   * | NullType           | NullType          | IntegerType       | LongType   
       | DecimalType(38,0)  | DoubleType | DateType      | TimestampType | 
StringType |
+   * | IntegerType        | IntegerType       | IntegerType       | LongType   
       | DecimalType(38,0)  | DoubleType | StringType    | StringType    | 
StringType |
+   * | LongType           | LongType          | LongType          | LongType   
       | DecimalType(38,0)  | StringType | StringType    | StringType    | 
StringType |
+   * | DecimalType(38,0)* | DecimalType(38,0) | DecimalType(38,0) | 
DecimalType(38,0) | DecimalType(38,0)  | StringType | StringType    | 
StringType    | StringType |
+   * | DoubleType         | DoubleType        | DoubleType        | StringType 
       | StringType         | DoubleType | StringType    | StringType    | 
StringType |
+   * | DateType           | DateType          | StringType        | StringType 
       | StringType         | StringType | DateType      | TimestampType | 
StringType |
+   * | TimestampType      | TimestampType     | StringType        | StringType 
       | StringType         | StringType | TimestampType | TimestampType | 
StringType |
+   * | StringType         | StringType        | StringType        | StringType 
       | StringType         | StringType | StringType    | StringType    | 
StringType |
+   * 
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
+   * Note that, for DecimalType(38,0)*, the table above intentionally does not 
cover all other
+   * combinations of scales and precisions because currently we only infer 
decimal type like
+   * `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
    */
+  // scalastyle:on line.size.limit
   private[datasources] def inferPartitionColumnValue(
       raw: String,
       typeInference: Boolean,
@@ -427,9 +442,6 @@ object PartitioningUtils {
     }
   }
 
-  private val upCastingOrder: Seq[DataType] =
-    Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
-
   def validatePartitionColumn(
       schema: StructType,
       partitionColumns: Seq[String],
@@ -468,18 +480,26 @@ object PartitioningUtils {
   }
 
   /**
-   * Given a collection of [[Literal]]s, resolves possible type conflicts by 
up-casting "lower"
-   * types.
+   * Given a collection of [[Literal]]s, resolves possible type conflicts by
+   * [[findWiderTypeForPartitionColumn]].
    */
   private def resolveTypeConflicts(literals: Seq[Literal], timeZone: 
TimeZone): Seq[Literal] = {
-    val desiredType = {
-      val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
-      // Falls back to string if all values of this column are null or empty 
string
-      if (topType == NullType) StringType else topType
-    }
+    val litTypes = literals.map(_.dataType)
+    val desiredType = litTypes.reduce(findWiderTypeForPartitionColumn)
 
     literals.map { case l @ Literal(_, dataType) =>
       Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), 
desiredType)
     }
   }
+
+  /**
+   * Type widening rule for partition column types. It is similar to
+   * [[TypeCoercion.findWiderTypeForTwo]] but the main difference is that here 
we disallow
+   * precision loss when widening double/long and decimal, and fall back to 
string.
+   */
+  private val findWiderTypeForPartitionColumn: (DataType, DataType) => 
DataType = {
+    case (DoubleType, _: DecimalType) | (_: DecimalType, DoubleType) => 
StringType
+    case (DoubleType, LongType) | (LongType, DoubleType) => StringType
+    case (t1, t2) => TypeCoercion.findWiderTypeForTwo(t1, 
t2).getOrElse(StringType)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7ebf2f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index f79b92b..d490264 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -249,6 +249,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
           true,
           rootPaths,
           timeZoneId)
+      assert(actualSpec.partitionColumns === spec.partitionColumns)
+      assert(actualSpec.partitions.length === spec.partitions.length)
+      actualSpec.partitions.zip(spec.partitions).foreach { case (actual, 
expected) =>
+        assert(actual === expected)
+      }
       assert(actualSpec === spec)
     }
 
@@ -314,7 +319,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       PartitionSpec(
         StructType(Seq(
           StructField("a", DoubleType),
-          StructField("b", StringType))),
+          StructField("b", NullType))),
         Seq(
           Partition(InternalRow(10, null), 
s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
           Partition(InternalRow(10.5, null),
@@ -324,6 +329,32 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
       s"hdfs://host:9000/path1",
       s"hdfs://host:9000/path2"),
       PartitionSpec.emptySpec)
+
+    // The cases below check the resolution for type conflicts.
+    val t1 = Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
+    val t2 = Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000
+    // Values in column 'a' are inferred as null, date and timestamp each, and 
timestamp is set
+    // as a common type.
+    // Values in column 'b' are inferred as integer, decimal(22, 0) and null, 
and decimal(22, 0)
+    // is set as a common type.
+    check(Seq(
+      s"hdfs://host:9000/path/a=$defaultPartitionName/b=0",
+      s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111",
+      s"hdfs://host:9000/path/a=2014-01-01 
00%3A01%3A00.0/b=$defaultPartitionName"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", TimestampType),
+          StructField("b", DecimalType(22, 0)))),
+        Seq(
+          Partition(
+            InternalRow(null, Decimal(0)),
+            s"hdfs://host:9000/path/a=$defaultPartitionName/b=0"),
+          Partition(
+            InternalRow(t1, Decimal(s"${Long.MaxValue}111")),
+            s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111"),
+          Partition(
+            InternalRow(t2, null),
+            s"hdfs://host:9000/path/a=2014-01-01 
00%3A01%3A00.0/b=$defaultPartitionName"))))
   }
 
   test("parse partitions with type inference disabled") {
@@ -395,7 +426,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       PartitionSpec(
         StructType(Seq(
           StructField("a", StringType),
-          StructField("b", StringType))),
+          StructField("b", NullType))),
         Seq(
           Partition(InternalRow(UTF8String.fromString("10"), null),
             s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
@@ -1067,4 +1098,26 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
       checkAnswer(spark.read.load(path.getAbsolutePath), df)
     }
   }
+
+  test("Resolve type conflicts - decimals, dates and timestamps in partition 
column") {
+    withTempPath { path =>
+      val df = Seq((1, "2014-01-01"), (2, "2016-01-01"), (3, "2015-01-01 
00:01:00")).toDF("i", "ts")
+      df.write.format("parquet").partitionBy("ts").save(path.getAbsolutePath)
+      checkAnswer(
+        spark.read.load(path.getAbsolutePath),
+        Row(1, Timestamp.valueOf("2014-01-01 00:00:00")) ::
+          Row(2, Timestamp.valueOf("2016-01-01 00:00:00")) ::
+          Row(3, Timestamp.valueOf("2015-01-01 00:01:00")) :: Nil)
+    }
+
+    withTempPath { path =>
+      val df = Seq((1, "1"), (2, "3"), (3, "2" * 30)).toDF("i", "decimal")
+      
df.write.format("parquet").partitionBy("decimal").save(path.getAbsolutePath)
+      checkAnswer(
+        spark.read.load(path.getAbsolutePath),
+        Row(1, BigDecimal("1")) ::
+          Row(2, BigDecimal("3")) ::
+          Row(3, BigDecimal("2" * 30)) :: Nil)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to