Repository: spark
Updated Branches:
  refs/heads/master 311fab6f1 -> 30f3f556f


[SPARK-7763] [SPARK-7616] [SQL] Persists partition columns into metastore

Author: Yin Huai <yh...@databricks.com>
Author: Cheng Lian <l...@databricks.com>

Closes #6285 from liancheng/spark-7763 and squashes the following commits:

bb2829d [Yin Huai] Fix hashCode.
d677f7d [Cheng Lian] Fixes Scala style issue
44b283f [Cheng Lian] Adds test case for SPARK-7616
6733276 [Yin Huai] Fix a bug that potentially causes 
https://issues.apache.org/jira/browse/SPARK-7616.
6cabf3c [Yin Huai] Update unit test.
7e02910 [Yin Huai] Use metastore partition columns and do not hijack 
maybePartitionSpec.
e9a03ec [Cheng Lian] Persists partition columns into metastore


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30f3f556
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30f3f556
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30f3f556

Branch: refs/heads/master
Commit: 30f3f556f7161a49baf145c0cbba8c088b512a6a
Parents: 311fab6
Author: Yin Huai <yh...@databricks.com>
Authored: Thu May 21 13:51:40 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu May 21 13:51:40 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 26 ++++++++---
 .../org/apache/spark/sql/sources/commands.scala |  2 +
 .../org/apache/spark/sql/sources/ddl.scala      | 19 ++++++--
 .../apache/spark/sql/sources/interfaces.scala   | 31 +++++++++++--
 .../apache/spark/sql/test/SQLTestUtils.scala    |  7 +++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 49 ++++++++++++++++----
 .../spark/sql/hive/execution/commands.scala     |  2 +
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 35 +++++++++-----
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 30 ++++++++++++
 .../apache/spark/sql/hive/parquetSuites.scala   | 28 +++++------
 .../spark/sql/sources/SimpleTextRelation.scala  |  2 +-
 .../sql/sources/hadoopFsRelationSuites.scala    | 36 ++++++++++++--
 12 files changed, 211 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index c35b7ef..32986aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -49,8 +49,7 @@ private[sql] class DefaultSource extends 
HadoopFsRelationProvider {
       schema: Option[StructType],
       partitionColumns: Option[StructType],
       parameters: Map[String, String]): HadoopFsRelation = {
-    val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
-    new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
+    new ParquetRelation2(paths, schema, None, partitionColumns, 
parameters)(sqlContext)
   }
 }
 
@@ -118,12 +117,28 @@ private[sql] class ParquetOutputWriter(path: String, 
context: TaskAttemptContext
 private[sql] class ParquetRelation2(
     override val paths: Array[String],
     private val maybeDataSchema: Option[StructType],
+    // This is for metastore conversion.
     private val maybePartitionSpec: Option[PartitionSpec],
+    override val userDefinedPartitionColumns: Option[StructType],
     parameters: Map[String, String])(
     val sqlContext: SQLContext)
   extends HadoopFsRelation(maybePartitionSpec)
   with Logging {
 
+  private[sql] def this(
+      paths: Array[String],
+      maybeDataSchema: Option[StructType],
+      maybePartitionSpec: Option[PartitionSpec],
+      parameters: Map[String, String])(
+      sqlContext: SQLContext) = {
+    this(
+      paths,
+      maybeDataSchema,
+      maybePartitionSpec,
+      maybePartitionSpec.map(_.partitionColumns),
+      parameters)(sqlContext)
+  }
+
   // Should we merge schemas from all Parquet part-files?
   private val shouldMergeSchemas =
     parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
@@ -161,7 +176,7 @@ private[sql] class ParquetRelation2(
         Boolean.box(shouldMergeSchemas),
         paths.toSet,
         maybeDataSchema,
-        maybePartitionSpec)
+        partitionColumns)
     } else {
       Objects.hashCode(
         Boolean.box(shouldMergeSchemas),
@@ -169,7 +184,7 @@ private[sql] class ParquetRelation2(
         dataSchema,
         schema,
         maybeDataSchema,
-        maybePartitionSpec)
+        partitionColumns)
     }
   }
 
@@ -185,9 +200,6 @@ private[sql] class ParquetRelation2(
 
   override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
 
-  override def userDefinedPartitionColumns: Option[StructType] =
-    maybePartitionSpec.map(_.partitionColumns)
-
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
     val conf = ContextUtil.getConfiguration(job)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index d54dbb0..498f753 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -93,6 +93,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
       job.setOutputValueClass(classOf[Row])
       FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
 
+      // We create a DataFrame by applying the schema of relation to the data 
to make sure.
+      // We are writing data based on the expected schema,
       val df = sqlContext.createDataFrame(
         DataFrame(sqlContext, query).queryExecution.toRdd,
         relation.schema,

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index a13ab74..5e72312 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.Logging
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Row}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.RunnableCommand
@@ -245,12 +245,13 @@ private[sql] object ResolvedDataSource {
             SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
           }
 
-          val dataSchema = StructType(schema.filterNot(f => 
partitionColumns.contains(f.name)))
+          val dataSchema =
+            StructType(schema.filterNot(f => 
partitionColumns.contains(f.name))).asNullable
 
           dataSource.createRelation(
             sqlContext,
             paths,
-            Some(schema),
+            Some(dataSchema),
             maybePartitionsSchema,
             caseInsensitiveOptions)
         case dataSource: org.apache.spark.sql.sources.RelationProvider =>
@@ -320,10 +321,20 @@ private[sql] object ResolvedDataSource {
           Some(dataSchema.asNullable),
           Some(partitionColumnsSchema(data.schema, partitionColumns)),
           caseInsensitiveOptions)
+
+        // For partitioned relation r, r.schema's column ordering is different 
with the column
+        // ordering of data.logicalPlan. We need a Project to adjust the 
ordering.
+        // So, inside InsertIntoHadoopFsRelation, we can safely apply the 
schema of r.schema to
+        // the data.
+        val project =
+          Project(
+            r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
+            data.logicalPlan)
+
         sqlContext.executePlan(
           InsertIntoHadoopFsRelation(
             r,
-            data.logicalPlan,
+            project,
             partitionColumns.toArray,
             mode)).toRdd
         r

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index fcbac0d..61fc4e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.{DeveloperApi, 
Experimental}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SerializableWritable
-import org.apache.spark.sql._
+import org.apache.spark.sql.{Row, _}
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -120,11 +120,13 @@ trait HadoopFsRelationProvider {
    * Returns a new base relation with the given parameters, a user defined 
schema, and a list of
    * partition columns. Note: the parameters' keywords are case insensitive 
and this insensitivity
    * is enforced by the Map that is passed to the function.
+   *
+   * @param dataSchema Schema of data columns (i.e., columns that are not 
partition columns).
    */
   def createRelation(
       sqlContext: SQLContext,
       paths: Array[String],
-      schema: Option[StructType],
+      dataSchema: Option[StructType],
       partitionColumns: Option[StructType],
       parameters: Map[String, String]): HadoopFsRelation
 }
@@ -416,8 +418,29 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
   final private[sql] def partitionSpec: PartitionSpec = {
     if (_partitionSpec == null) {
       _partitionSpec = maybePartitionSpec
-        .map(spec => spec.copy(partitionColumns = 
spec.partitionColumns.asNullable))
-        .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, 
Array.empty[Partition])))
+        .flatMap {
+          case spec if spec.partitions.nonEmpty =>
+            Some(spec.copy(partitionColumns = 
spec.partitionColumns.asNullable))
+          case _ =>
+            None
+        }
+        .orElse {
+          // We only know the partition columns and their data types. We need 
to discover
+          // partition values.
+          userDefinedPartitionColumns.map { partitionSchema =>
+            val spec = discoverPartitions()
+            val castedPartitions = spec.partitions.map { case p @ 
Partition(values, path) =>
+              val literals = 
values.toSeq.zip(spec.partitionColumns.map(_.dataType)).map {
+                case (value, dataType) => Literal.create(value, dataType)
+              }
+              val castedValues = partitionSchema.zip(literals).map { case 
(field, literal) =>
+                Cast(literal, field.dataType).eval()
+              }
+              p.copy(values = Row.fromSeq(castedValues))
+            }
+            PartitionSpec(partitionSchema, castedPartitions)
+          }
+        }
         .getOrElse {
           if (sqlContext.conf.partitionDiscoveryEnabled()) {
             discoverPartitions()

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 75d2906..ca66cdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -78,4 +78,11 @@ trait SQLTestUtils {
   protected def withTempTable(tableName: String)(f: => Unit): Unit = {
     try f finally sqlContext.dropTempTable(tableName)
   }
+
+  /**
+   * Drops table `tableName` after calling `f`.
+   */
+  protected def withTable(tableName: String)(f: => Unit): Unit = {
+    try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2aa80b4..5b68400 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -66,11 +66,11 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         def schemaStringFromParts: Option[String] = {
           table.properties.get("spark.sql.sources.schema.numParts").map { 
numParts =>
             val parts = (0 until numParts.toInt).map { index =>
-              val part = 
table.properties.get(s"spark.sql.sources.schema.part.${index}").orNull
+              val part = 
table.properties.get(s"spark.sql.sources.schema.part.$index").orNull
               if (part == null) {
                 throw new AnalysisException(
-                  s"Could not read schema from the metastore because it is 
corrupted " +
-                  s"(missing part ${index} of the schema).")
+                  "Could not read schema from the metastore because it is 
corrupted " +
+                    s"(missing part $index of the schema, $numParts parts are 
expected).")
               }
 
               part
@@ -89,6 +89,11 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         val userSpecifiedSchema =
           schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
 
+        // We only need names at here since userSpecifiedSchema we loaded from 
the metastore
+        // contains partition columns. We can always get datatypes of 
partitioning columns
+        // from userSpecifiedSchema.
+        val partitionColumns = table.partitionColumns.map(_.name)
+
         // It does not appear that the ql client for the metastore has a way 
to enumerate all the
         // SerDe properties directly...
         val options = table.serdeProperties
@@ -97,7 +102,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
           ResolvedDataSource(
             hive,
             userSpecifiedSchema,
-            Array.empty[String],
+            partitionColumns.toArray,
             table.properties("spark.sql.sources.provider"),
             options)
 
@@ -111,8 +116,8 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
   override def refreshTable(databaseName: String, tableName: String): Unit = {
     // refreshTable does not eagerly reload the cache. It just invalidate the 
cache.
     // Next time when we use the table, it will be populated in the cache.
-    // Since we also cache ParquetRealtions converted from Hive Parquet tables 
and
-    // adding converted ParquetRealtions into the cache is not defined in the 
load function
+    // Since we also cache ParquetRelations converted from Hive Parquet tables 
and
+    // adding converted ParquetRelations into the cache is not defined in the 
load function
     // of the cache (instead, we add the cache entry in 
convertToParquetRelation),
     // it is better at here to invalidate the cache to avoid confusing waring 
logs from the
     // cache loader (e.g. cannot find data source provider, which is only 
defined for
@@ -133,12 +138,17 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
   def createDataSourceTable(
       tableName: String,
       userSpecifiedSchema: Option[StructType],
+      partitionColumns: Array[String],
       provider: String,
       options: Map[String, String],
       isExternal: Boolean): Unit = {
     val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
     val tableProperties = new scala.collection.mutable.HashMap[String, String]
     tableProperties.put("spark.sql.sources.provider", provider)
+
+    // Saves optional user specified schema.  Serialized JSON schema string 
may be too long to be
+    // stored into a single metastore SerDe property.  In this case, we split 
the JSON string and
+    // store each part as a separate SerDe property.
     if (userSpecifiedSchema.isDefined) {
       val threshold = conf.schemaStringLengthThreshold
       val schemaJsonString = userSpecifiedSchema.get.json
@@ -146,8 +156,29 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       val parts = schemaJsonString.grouped(threshold).toSeq
       tableProperties.put("spark.sql.sources.schema.numParts", 
parts.size.toString)
       parts.zipWithIndex.foreach { case (part, index) =>
-        tableProperties.put(s"spark.sql.sources.schema.part.${index}", part)
+        tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
+      }
+    }
+
+    val metastorePartitionColumns = userSpecifiedSchema.map { schema =>
+      val fields = partitionColumns.map(col => schema(col))
+      fields.map { field =>
+        HiveColumn(
+          name = field.name,
+          hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
+          comment = "")
+      }.toSeq
+    }.getOrElse {
+      if (partitionColumns.length > 0) {
+        // The table does not have a specified schema, which means that the 
schema will be inferred
+        // when we load the table. So, we are not expecting partition columns 
and we will discover
+        // partitions when we load the table. However, if there are specified 
partition columns,
+        // we simplily ignore them and provide a warning message..
+        logWarning(
+          s"The schema and partitions of table $tableName will be inferred 
when it is loaded. " +
+            s"Specified partition columns (${partitionColumns.mkString(",")}) 
will be ignored.")
       }
+      Seq.empty[HiveColumn]
     }
 
     val tableType = if (isExternal) {
@@ -163,7 +194,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         specifiedDatabase = Option(dbName),
         name = tblName,
         schema = Seq.empty,
-        partitionColumns = Seq.empty,
+        partitionColumns = metastorePartitionColumns,
         tableType = tableType,
         properties = tableProperties.toMap,
         serdeProperties = options))
@@ -199,7 +230,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       val dataSourceTable =
         cachedDataSourceTables(QualifiedTableName(databaseName, 
tblName).toLowerCase)
       // Then, if alias is specified, wrap the table with a Subquery using the 
alias.
-      // Othersie, wrap the table with a Subquery using the table name.
+      // Otherwise, wrap the table with a Subquery using the table name.
       val withAlias =
         alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
           Subquery(tableIdent.last, dataSourceTable))

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 6609763..0ba94d7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -146,6 +146,7 @@ case class CreateMetastoreDataSource(
     hiveContext.catalog.createDataSourceTable(
       tableName,
       userSpecifiedSchema,
+      Array.empty[String],
       provider,
       optionsWithPath,
       isExternal)
@@ -244,6 +245,7 @@ case class CreateMetastoreDataSourceAsSelect(
       hiveContext.catalog.createDataSourceTable(
         tableName,
         Some(resolved.relation.schema),
+        partitionColumns,
         provider,
         optionsWithPath,
         isExternal)

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index b69e14a..f03c4cd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -48,15 +48,14 @@ private[sql] class DefaultSource extends 
HadoopFsRelationProvider {
   def createRelation(
       sqlContext: SQLContext,
       paths: Array[String],
-      schema: Option[StructType],
+      dataSchema: Option[StructType],
       partitionColumns: Option[StructType],
       parameters: Map[String, String]): HadoopFsRelation = {
     assert(
       sqlContext.isInstanceOf[HiveContext],
       "The ORC data source can only be used with HiveContext.")
 
-    val partitionSpec = partitionColumns.map(PartitionSpec(_, 
Seq.empty[Partition]))
-    OrcRelation(paths, parameters, schema, partitionSpec)(sqlContext)
+    new OrcRelation(paths, dataSchema, None, partitionColumns, 
parameters)(sqlContext)
   }
 }
 
@@ -136,23 +135,35 @@ private[orc] class OrcOutputWriter(
 }
 
 @DeveloperApi
-private[sql] case class OrcRelation(
+private[sql] class OrcRelation(
     override val paths: Array[String],
-    parameters: Map[String, String],
-    maybeSchema: Option[StructType] = None,
-    maybePartitionSpec: Option[PartitionSpec] = None)(
+    maybeDataSchema: Option[StructType],
+    maybePartitionSpec: Option[PartitionSpec],
+    override val userDefinedPartitionColumns: Option[StructType],
+    parameters: Map[String, String])(
     @transient val sqlContext: SQLContext)
   extends HadoopFsRelation(maybePartitionSpec)
   with Logging {
 
-  override val dataSchema: StructType = maybeSchema.getOrElse {
+  private[sql] def this(
+      paths: Array[String],
+      maybeDataSchema: Option[StructType],
+      maybePartitionSpec: Option[PartitionSpec],
+      parameters: Map[String, String])(
+      sqlContext: SQLContext) = {
+    this(
+      paths,
+      maybeDataSchema,
+      maybePartitionSpec,
+      maybePartitionSpec.map(_.partitionColumns),
+      parameters)(sqlContext)
+  }
+
+  override val dataSchema: StructType = maybeDataSchema.getOrElse {
     OrcFileOperator.readSchema(
       paths.head, Some(sqlContext.sparkContext.hadoopConfiguration))
   }
 
-  override def userDefinedPartitionColumns: Option[StructType] =
-    maybePartitionSpec.map(_.partitionColumns)
-
   override def needConversion: Boolean = false
 
   override def equals(other: Any): Boolean = other match {
@@ -169,7 +180,7 @@ private[sql] case class OrcRelation(
       paths.toSet,
       dataSchema,
       schema,
-      maybePartitionSpec)
+      partitionColumns)
   }
 
   override def buildScan(

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 30db976..c4c7b63 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -670,6 +670,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
     catalog.createDataSourceTable(
       tableName = "wide_schema",
       userSpecifiedSchema = Some(schema),
+      partitionColumns = Array.empty[String],
       provider = "json",
       options = Map("path" -> "just a dummy path"),
       isExternal = false)
@@ -705,6 +706,35 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
     sql(s"drop table $tableName")
   }
 
+  test("Saving partition columns information") {
+    val df =
+      sparkContext.parallelize(1 to 10, 4).map { i =>
+        Tuple4(i, i + 1, s"str$i", s"str${i + 1}")
+      }.toDF("a", "b", "c", "d")
+
+    val tableName = s"partitionInfo_${System.currentTimeMillis()}"
+    df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
+    invalidateTable(tableName)
+    val metastoreTable = catalog.client.getTable("default", tableName)
+    val expectedPartitionColumns =
+      StructType(df.schema("d") :: df.schema("b") :: Nil)
+    val actualPartitionColumns =
+      StructType(
+        metastoreTable.partitionColumns.map(c =>
+          StructField(c.name, HiveMetastoreTypes.toDataType(c.hiveType))))
+    // Make sure partition columns are correctly stored in metastore.
+    assert(
+      expectedPartitionColumns.sameType(actualPartitionColumns),
+      s"Partitions columns stored in metastore $actualPartitionColumns is not 
the " +
+        s"partition columns defined by the saveAsTable operation 
$expectedPartitionColumns.")
+
+    // Check the content of the saved table.
+    checkAnswer(
+      table(tableName).selectExpr("c", "b", "d", "a"),
+      df.selectExpr("c", "b", "d", "a").collect())
+
+    sql(s"drop table $tableName")
+  }
 
   test("insert into a table") {
     def createDF(from: Int, to: Int): DataFrame =

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 1da990b..223ba65 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -435,9 +435,9 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
   }
 
   test("Caching converted data source Parquet Relations") {
-    def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
+    def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = {
       // Converted test_parquet should be cached.
-      catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
+      catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => fail("Converted test_parquet should be cached in the 
cache.")
         case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => 
// OK
         case other =>
@@ -463,30 +463,30 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
         |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
       """.stripMargin)
 
-    var tableIdentifer = catalog.QualifiedTableName("default", 
"test_insert_parquet")
+    var tableIdentifier = catalog.QualifiedTableName("default", 
"test_insert_parquet")
 
     // First, make sure the converted test_parquet is not cached.
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === 
null)
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === 
null)
     // Table lookup will make the table cached.
     table("test_insert_parquet")
-    checkCached(tableIdentifer)
+    checkCached(tableIdentifier)
     // For insert into non-partitioned table, we will do the conversion,
     // so the converted test_insert_parquet should be cached.
     invalidateTable("test_insert_parquet")
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === 
null)
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === 
null)
     sql(
       """
         |INSERT INTO TABLE test_insert_parquet
         |select a, b from jt
       """.stripMargin)
-    checkCached(tableIdentifer)
+    checkCached(tableIdentifier)
     // Make sure we can read the data.
     checkAnswer(
       sql("select * from test_insert_parquet"),
       sql("select a, b from jt").collect())
     // Invalidate the cache.
     invalidateTable("test_insert_parquet")
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === 
null)
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === 
null)
 
     // Create a partitioned table.
     sql(
@@ -503,8 +503,8 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
         |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
       """.stripMargin)
 
-    tableIdentifer = catalog.QualifiedTableName("default", 
"test_parquet_partitioned_cache_test")
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === 
null)
+    tableIdentifier = catalog.QualifiedTableName("default", 
"test_parquet_partitioned_cache_test")
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === 
null)
     sql(
       """
         |INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -513,18 +513,18 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
       """.stripMargin)
     // Right now, insert into a partitioned Parquet is not supported in data 
source Parquet.
     // So, we expect it is not cached.
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === 
null)
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === 
null)
     sql(
       """
         |INSERT INTO TABLE test_parquet_partitioned_cache_test
         |PARTITION (date='2015-04-02')
         |select a, b from jt
       """.stripMargin)
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === 
null)
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === 
null)
 
     // Make sure we can cache the partitioned table.
     table("test_parquet_partitioned_cache_test")
-    checkCached(tableIdentifer)
+    checkCached(tableIdentifier)
     // Make sure we can read the data.
     checkAnswer(
       sql("select STRINGField, date, intField from 
test_parquet_partitioned_cache_test"),
@@ -536,7 +536,7 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
         """.stripMargin).collect())
 
     invalidateTable("test_parquet_partitioned_cache_test")
-    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === 
null)
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === 
null)
 
     sql("DROP TABLE test_insert_parquet")
     sql("DROP TABLE test_parquet_partitioned_cache_test")

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 09eed66..2d69b89 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -99,7 +99,7 @@ class SimpleTextRelation(
   }
 
   override def hashCode(): Int =
-    Objects.hashCode(paths, maybeDataSchema, dataSchema)
+    Objects.hashCode(paths, maybeDataSchema, dataSchema, partitionColumns)
 
   override def buildScan(inputStatuses: Array[FileStatus]): RDD[Row] = {
     val fields = dataSchema.map(_.dataType)

http://git-wip-us.apache.org/repos/asf/spark/blob/30f3f556/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index ad4a482..c7c8bcd 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.parquet.ParquetTest
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 
@@ -237,10 +236,6 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils {
     }
   }
 
-  def withTable(tableName: String)(f: => Unit): Unit = {
-    try f finally sql(s"DROP TABLE $tableName")
-  }
-
   test("saveAsTable()/load() - non-partitioned table - Overwrite") {
     testDF.write.format(dataSourceName).mode(SaveMode.Overwrite)
       .option("dataSchema", dataSchema.json)
@@ -444,6 +439,23 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils {
       checkAnswer(df, partitionedTestDF.collect())
     }
   }
+
+  test("Partition column type casting") {
+    withTempPath { file =>
+      val input = partitionedTestDF.select('a, 'b, 
'p1.cast(StringType).as('ps), 'p2)
+
+      input
+        .write
+        .format(dataSourceName)
+        .mode(SaveMode.Overwrite)
+        .partitionBy("ps", "p2")
+        .saveAsTable("t")
+
+      withTempTable("t") {
+        checkAnswer(table("t"), input.collect())
+      }
+    }
+  }
 }
 
 class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -504,4 +516,18 @@ class ParquetHadoopFsRelationSuite extends 
HadoopFsRelationTest {
           .load(file.getCanonicalPath))
     }
   }
+
+  test("SPARK-7616: adjust column name order accordingly when saving 
partitioned table") {
+    val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
+
+    df.write
+      .format("parquet")
+      .mode(SaveMode.Overwrite)
+      .partitionBy("c", "a")
+      .saveAsTable("t")
+
+    withTable("t") {
+      checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to