Repository: spark
Updated Branches:
  refs/heads/master d24e25157 -> 4538443e2


[SPARK-15584][SQL] Abstract duplicate code: `spark.sql.sources.` properties

## What changes were proposed in this pull request?

This PR replaces `spark.sql.sources.` strings with 
`CreateDataSourceTableUtils.*` constant variables.

## How was this patch tested?

Pass the existing Jenkins tests.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #13349 from dongjoon-hyun/SPARK-15584.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4538443e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4538443e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4538443e

Branch: refs/heads/master
Commit: 4538443e276597530a27c6922e48503677b13956
Parents: d24e251
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Fri May 27 11:10:31 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri May 27 11:10:31 2016 -0700

----------------------------------------------------------------------
 .../spark/ml/source/libsvm/LibSVMRelation.scala |  3 +-
 .../command/createDataSourceTables.scala        | 28 +++++-----
 .../spark/sql/execution/command/ddl.scala       | 19 +++----
 .../spark/sql/execution/command/tables.scala    |  4 +-
 .../datasources/DataSourceStrategy.scala        |  2 +-
 .../execution/datasources/WriterContainer.scala | 10 ++--
 .../execution/datasources/csv/CSVRelation.scala |  3 +-
 .../datasources/json/JsonFileFormat.scala       |  5 +-
 .../datasources/parquet/ParquetFileFormat.scala |  4 +-
 .../datasources/text/TextFileFormat.scala       |  3 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 10 ++--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 18 +++---
 .../spark/sql/hive/orc/OrcFileFormat.scala      |  3 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 58 ++++++++++----------
 .../sql/hive/execution/HiveCommandSuite.scala   | 16 +++---
 .../spark/sql/sources/SimpleTextRelation.scala  |  3 +-
 16 files changed, 95 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 64ebf0c..7629369 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -51,7 +52,7 @@ private[libsvm] class LibSVMOutputWriter(
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = 
configuration.get("spark.sql.sources.writeJobUUID")
+        val uniqueWriteJobId = 
configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index deedb68..4b9aab6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -256,15 +256,15 @@ case class CreateDataSourceTableAsSelectCommand(
 
 object CreateDataSourceTableUtils extends Logging {
 
-  // TODO: Actually replace usages with these variables (SPARK-15584)
-
   val DATASOURCE_PREFIX = "spark.sql.sources."
   val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
   val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID"
   val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
-  val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_PREFIX + "schema."
+  val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
+  val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
   val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
   val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
+  val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
   val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
   val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + 
"numBucketCols"
   val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
@@ -296,7 +296,7 @@ object CreateDataSourceTableUtils extends Logging {
       options: Map[String, String],
       isExternal: Boolean): Unit = {
     val tableProperties = new mutable.HashMap[String, String]
-    tableProperties.put("spark.sql.sources.provider", provider)
+    tableProperties.put(DATASOURCE_PROVIDER, provider)
 
     // Saves optional user specified schema.  Serialized JSON schema string 
may be too long to be
     // stored into a single metastore SerDe property.  In this case, we split 
the JSON string and
@@ -306,34 +306,32 @@ object CreateDataSourceTableUtils extends Logging {
       val schemaJsonString = schema.json
       // Split the JSON string.
       val parts = schemaJsonString.grouped(threshold).toSeq
-      tableProperties.put("spark.sql.sources.schema.numParts", 
parts.size.toString)
+      tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
       parts.zipWithIndex.foreach { case (part, index) =>
-        tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
+        tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
       }
     }
 
     if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
-      tableProperties.put("spark.sql.sources.schema.numPartCols", 
partitionColumns.length.toString)
+      tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, 
partitionColumns.length.toString)
       partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
-        tableProperties.put(s"spark.sql.sources.schema.partCol.$index", 
partCol)
+        tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", 
partCol)
       }
     }
 
     if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
       val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = 
bucketSpec.get
 
-      tableProperties.put("spark.sql.sources.schema.numBuckets", 
numBuckets.toString)
-      tableProperties.put("spark.sql.sources.schema.numBucketCols",
-        bucketColumnNames.length.toString)
+      tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
+      tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, 
bucketColumnNames.length.toString)
       bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
-        tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", 
bucketCol)
+        tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", 
bucketCol)
       }
 
       if (sortColumnNames.nonEmpty) {
-        tableProperties.put("spark.sql.sources.schema.numSortCols",
-          sortColumnNames.length.toString)
+        tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, 
sortColumnNames.length.toString)
         sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
-          tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", 
sortCol)
+          tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", 
sortCol)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 15eba3b..95bac94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
CatalogTableType, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
-import 
org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.types._
 
@@ -464,7 +464,7 @@ case class AlterTableSetLocationCommand(
 object DDLUtils {
 
   def isDatasourceTable(props: Map[String, String]): Boolean = {
-    props.contains("spark.sql.sources.provider")
+    props.contains(DATASOURCE_PROVIDER)
   }
 
   def isDatasourceTable(table: CatalogTable): Boolean = {
@@ -503,8 +503,7 @@ object DDLUtils {
   }
 
   def isTablePartitioned(table: CatalogTable): Boolean = {
-    table.partitionColumns.nonEmpty ||
-      table.properties.contains("spark.sql.sources.schema.numPartCols")
+    table.partitionColumns.nonEmpty || 
table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
   }
 
   // A persisted data source table may not store its schema in the catalog. In 
this case, its schema
@@ -512,15 +511,15 @@ object DDLUtils {
   def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] 
= {
     require(isDatasourceTable(metadata))
     val props = metadata.properties
-    if (props.isDefinedAt("spark.sql.sources.schema")) {
+    if (props.isDefinedAt(DATASOURCE_SCHEMA)) {
       // Originally, we used spark.sql.sources.schema to store the schema of a 
data source table.
       // After SPARK-6024, we removed this flag.
       // Although we are not using spark.sql.sources.schema any more, we need 
to still support.
-      
props.get("spark.sql.sources.schema").map(DataType.fromJson(_).asInstanceOf[StructType])
+      
props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType])
     } else {
-      metadata.properties.get("spark.sql.sources.schema.numParts").map { 
numParts =>
+      metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
         val parts = (0 until numParts.toInt).map { index =>
-          val part = 
metadata.properties.get(s"spark.sql.sources.schema.part.$index").orNull
+          val part = 
metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
           if (part == null) {
             throw new AnalysisException(
               "Could not read schema from the metastore because it is 
corrupted " +
@@ -543,7 +542,7 @@ object DDLUtils {
       numCols <- 
props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
       index <- 0 until numCols.toInt
     } yield props.getOrElse(
-      s"spark.sql.sources.schema.${colType}Col.$index",
+      s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
       throw new AnalysisException(
         s"Corrupted $typeName in catalog: $numCols parts expected, but part 
$index is missing."
       )
@@ -556,7 +555,7 @@ object DDLUtils {
 
   def getBucketSpecFromTableProperties(metadata: CatalogTable): 
Option[BucketSpec] = {
     if (isDatasourceTable(metadata)) {
-      metadata.properties.get("spark.sql.sources.schema.numBuckets").map { 
numBuckets =>
+      metadata.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { numBuckets =>
         BucketSpec(
           numBuckets.toInt,
           getColumnNamesByType(metadata.properties, "bucket", "bucketing 
columns"),

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index d102409..2d6a3b4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -443,7 +443,7 @@ case class DescribeTableCommand(table: TableIdentifier, 
isExtended: Boolean, isF
     table.properties.filterNot {
       // Hides schema properties that hold user-defined schema, partition 
columns, and bucketing
       // information since they are already extracted and shown in other parts.
-      case (key, _) => key.startsWith("spark.sql.sources.schema")
+      case (key, _) => 
key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA)
     }.foreach { case (key, value) =>
       append(buffer, s"  $key", value, "")
     }
@@ -860,7 +860,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) 
extends RunnableComman
   private def showDataSourceTableOptions(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
     val props = metadata.properties
 
-    builder ++= s"USING ${props("spark.sql.sources.provider")}\n"
+    builder ++= s"USING 
${props(CreateDataSourceTableUtils.DATASOURCE_PROVIDER)}\n"
 
     val dataSourceOptions = metadata.storage.serdeProperties.filterNot {
       case (key, value) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a3d87cd..2b47865 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -101,7 +101,7 @@ private[sql] class FindDataSourceTable(sparkSession: 
SparkSession) extends Rule[
         userSpecifiedSchema = userSpecifiedSchema,
         partitionColumns = partitionColumns,
         bucketSpec = bucketSpec,
-        className = table.properties("spark.sql.sources.provider"),
+        className = 
table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
         options = options)
 
     LogicalRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 61dcbeb..f56b50a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.UnsafeKVExternalSorter
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -91,7 +92,7 @@ private[sql] abstract class BaseWriterContainer(
     // This UUID is sent to executor side together with the serialized 
`Configuration` object within
     // the `Job` instance.  `OutputWriters` on the executor side should use 
this UUID to generate
     // unique task output files.
-    job.getConfiguration.set("spark.sql.sources.writeJobUUID", 
uniqueWriteJobId.toString)
+    job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, 
uniqueWriteJobId.toString)
 
     // Order of the following two lines is important.  For Hadoop 1, 
TaskAttemptContext constructor
     // clones the Configuration object passed in.  If we initialize the 
TaskAttemptContext first,
@@ -241,7 +242,7 @@ private[sql] class DefaultWriterContainer(
   def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
     executorSideSetup(taskContext)
     val configuration = taskAttemptContext.getConfiguration
-    configuration.set("spark.sql.sources.output.path", outputPath)
+    configuration.set(DATASOURCE_OUTPUTPATH, outputPath)
     var writer = newOutputWriter(getWorkPath)
     writer.initConverter(dataSchema)
 
@@ -349,11 +350,10 @@ private[sql] class DynamicPartitionWriterContainer(
     val configuration = taskAttemptContext.getConfiguration
     val path = if (partitionColumns.nonEmpty) {
       val partitionPath = getPartitionString(key).getString(0)
-      configuration.set(
-        "spark.sql.sources.output.path", new Path(outputPath, 
partitionPath).toString)
+      configuration.set(DATASOURCE_OUTPUTPATH, new Path(outputPath, 
partitionPath).toString)
       new Path(getWorkPath, partitionPath).toString
     } else {
-      configuration.set("spark.sql.sources.output.path", outputPath)
+      configuration.set(DATASOURCE_OUTPUTPATH, outputPath)
       getWorkPath
     }
     val bucketId = getBucketIdFromKey(key)

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 9849484..d72c8b9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources.{OutputWriter, 
OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.types._
 
@@ -168,7 +169,7 @@ private[sql] class CsvOutputWriter(
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = 
configuration.get("spark.sql.sources.writeJobUUID")
+        val uniqueWriteJobId = 
configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 35f2476..c7c5281 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,8 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
@@ -170,7 +169,7 @@ private[json] class JsonOutputWriter(
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = 
configuration.get("spark.sql.sources.writeJobUUID")
+        val uniqueWriteJobId = 
configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b47d41e..ff7962d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -521,7 +522,8 @@ private[sql] class ParquetOutputWriter(
         //     partitions in the case of dynamic partitioning.
         override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
           val configuration = context.getConfiguration
-          val uniqueWriteJobId = 
configuration.get("spark.sql.sources.writeJobUUID")
+          val uniqueWriteJobId = configuration.get(
+            CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
           val taskAttemptId = context.getTaskAttemptID
           val split = taskAttemptId.getTaskID.getId
           val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index d9525ef..1e5bce4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.{AnalysisException, Row, 
SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
@@ -119,7 +120,7 @@ class TextOutputWriter(path: String, dataSchema: 
StructType, context: TaskAttemp
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
         val configuration = context.getConfiguration
-        val uniqueWriteJobId = 
configuration.get("spark.sql.sources.writeJobUUID")
+        val uniqueWriteJobId = 
configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
         val taskAttemptId = context.getTaskAttemptID
         val split = taskAttemptId.getTaskID.getId
         new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e975756..ccb4006 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, 
CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import 
org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -387,7 +387,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
       assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", 
"int")))
-      assert(table.properties("spark.sql.sources.provider") == "parquet")
+      assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
     }
   }
 
@@ -398,7 +398,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
       assert(table.schema.isEmpty) // partitioned datasource table is not 
hive-compatible
-      assert(table.properties("spark.sql.sources.provider") == "parquet")
+      assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
       assert(DDLUtils.getSchemaFromTableProperties(table) ==
         Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
       assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
@@ -414,7 +414,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       val table = catalog.getTableMetadata(TableIdentifier("tbl"))
       assert(table.tableType == CatalogTableType.MANAGED)
       assert(table.schema.isEmpty) // partitioned datasource table is not 
hive-compatible
-      assert(table.properties("spark.sql.sources.provider") == "parquet")
+      assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
       assert(DDLUtils.getSchemaFromTableProperties(table) ==
         Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
       assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
@@ -747,7 +747,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       catalog: SessionCatalog,
       tableIdent: TableIdentifier): Unit = {
     catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
-      properties = Map("spark.sql.sources.provider" -> "csv")))
+      properties = Map(DATASOURCE_PROVIDER -> "csv")))
   }
 
   private def testSetProperties(isDatasourceTable: Boolean): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ea721e4..ff395f3 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -74,9 +75,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
         // TODO: the following code is duplicated with 
FindDataSourceTable.readDataSourceTable
 
         def schemaStringFromParts: Option[String] = {
-          table.properties.get("spark.sql.sources.schema.numParts").map { 
numParts =>
+          table.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
             val parts = (0 until numParts.toInt).map { index =>
-              val part = 
table.properties.get(s"spark.sql.sources.schema.part.$index").orNull
+              val part = 
table.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
               if (part == null) {
                 throw new AnalysisException(
                   "Could not read schema from the metastore because it is 
corrupted " +
@@ -91,9 +92,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
         }
 
         def getColumnNames(colType: String): Seq[String] = {
-          
table.properties.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").map
 {
+          
table.properties.get(s"$DATASOURCE_SCHEMA.num${colType.capitalize}Cols").map {
             numCols => (0 until numCols.toInt).map { index =>
-              
table.properties.getOrElse(s"spark.sql.sources.schema.${colType}Col.$index",
+              
table.properties.getOrElse(s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
                 throw new AnalysisException(
                   s"Could not read $colType columns from the metastore because 
it is corrupted " +
                     s"(missing part $index of it, $numCols parts are 
expected)."))
@@ -104,8 +105,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
         // Originally, we used spark.sql.sources.schema to store the schema of 
a data source table.
         // After SPARK-6024, we removed this flag.
         // Although we are not using spark.sql.sources.schema any more, we 
need to still support.
-        val schemaString =
-          
table.properties.get("spark.sql.sources.schema").orElse(schemaStringFromParts)
+        val schemaString = 
table.properties.get(DATASOURCE_SCHEMA).orElse(schemaStringFromParts)
 
         val userSpecifiedSchema =
           schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
@@ -115,7 +115,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
         // from userSpecifiedSchema.
         val partitionColumns = getColumnNames("part")
 
-        val bucketSpec = 
table.properties.get("spark.sql.sources.schema.numBuckets").map { n =>
+        val bucketSpec = 
table.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { n =>
           BucketSpec(n.toInt, getColumnNames("bucket"), getColumnNames("sort"))
         }
 
@@ -126,7 +126,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
             userSpecifiedSchema = userSpecifiedSchema,
             partitionColumns = partitionColumns,
             bucketSpec = bucketSpec,
-            className = table.properties("spark.sql.sources.provider"),
+            className = table.properties(DATASOURCE_PROVIDER),
             options = options)
 
         LogicalRelation(
@@ -166,7 +166,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     val qualifiedTableName = getQualifiedTableName(tableIdent)
     val table = client.getTable(qualifiedTableName.database, 
qualifiedTableName.name)
 
-    if (table.properties.get("spark.sql.sources.provider").isDefined) {
+    if (table.properties.get(DATASOURCE_PROVIDER).isDefined) {
       val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
       val qualifiedTable = SubqueryAlias(qualifiedTableName.name, 
dataSourceTable)
       // Then, if alias is specified, wrap the table with a Subquery using the 
alias.

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index f119817..0e8c37d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -36,6 +36,7 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
 import org.apache.spark.sql.sources.{Filter, _}
@@ -217,7 +218,7 @@ private[orc] class OrcOutputWriter(
 
   private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
     recordWriterInstantiated = true
-    val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
+    val uniqueWriteJobId = 
conf.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
     val taskAttemptId = context.getTaskAttemptID
     val partition = taskAttemptId.getTaskID.getId
     val bucketString = 
bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 1e6de46..2c50cc8 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -700,7 +700,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
           val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", 
StringType, true)))
 
           // Manually create a metastore data source table.
-          CreateDataSourceTableUtils.createDataSourceTable(
+          createDataSourceTable(
             sparkSession = spark,
             tableIdent = TableIdentifier("wide_schema"),
             userSpecifiedSchema = Some(schema),
@@ -737,8 +737,8 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
             "path" -> 
sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
         ),
         properties = Map(
-          "spark.sql.sources.provider" -> "json",
-          "spark.sql.sources.schema" -> schema.json,
+          DATASOURCE_PROVIDER -> "json",
+          DATASOURCE_SCHEMA -> schema.json,
           "EXTERNAL" -> "FALSE"))
 
       sharedState.externalCatalog.createTable("default", hiveTable, 
ignoreIfExists = false)
@@ -762,13 +762,13 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       val metastoreTable = sharedState.externalCatalog.getTable("default", 
tableName)
       val expectedPartitionColumns = StructType(df.schema("d") :: 
df.schema("b") :: Nil)
 
-      val numPartCols = 
metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
+      val numPartCols = 
metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt
       assert(numPartCols == 2)
 
       val actualPartitionColumns =
         StructType(
           (0 until numPartCols).map { index =>
-            
df.schema(metastoreTable.properties(s"spark.sql.sources.schema.partCol.$index"))
+            
df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index"))
           })
       // Make sure partition columns are correctly stored in metastore.
       assert(
@@ -798,19 +798,19 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       val expectedBucketByColumns = StructType(df.schema("d") :: 
df.schema("b") :: Nil)
       val expectedSortByColumns = StructType(df.schema("c") :: Nil)
 
-      val numBuckets = 
metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt
+      val numBuckets = 
metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt
       assert(numBuckets == 8)
 
-      val numBucketCols = 
metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
+      val numBucketCols = 
metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt
       assert(numBucketCols == 2)
 
-      val numSortCols = 
metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt
+      val numSortCols = 
metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt
       assert(numSortCols == 1)
 
       val actualBucketByColumns =
         StructType(
           (0 until numBucketCols).map { index =>
-            
df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index"))
+            
df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index"))
           })
       // Make sure bucketBy columns are correctly stored in metastore.
       assert(
@@ -821,7 +821,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       val actualSortByColumns =
         StructType(
           (0 until numSortCols).map { index =>
-            
df.schema(metastoreTable.properties(s"spark.sql.sources.schema.sortCol.$index"))
+            
df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index"))
           })
       // Make sure sortBy columns are correctly stored in metastore.
       assert(
@@ -913,7 +913,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
     withTempDir { tempPath =>
       val schema = StructType((1 to 5).map(i => StructField(s"c_$i", 
StringType)))
 
-      CreateDataSourceTableUtils.createDataSourceTable(
+      createDataSourceTable(
         sparkSession = spark,
         tableIdent = TableIdentifier("not_skip_hive_metadata"),
         userSpecifiedSchema = Some(schema),
@@ -928,7 +928,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       assert(sharedState.externalCatalog.getTable("default", 
"not_skip_hive_metadata").schema
         .forall(column => CatalystSqlParser.parseDataType(column.dataType) == 
StringType))
 
-      CreateDataSourceTableUtils.createDataSourceTable(
+      createDataSourceTable(
         sparkSession = spark,
         tableIdent = TableIdentifier("skip_hive_metadata"),
         userSpecifiedSchema = Some(schema),
@@ -960,10 +960,10 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         )
 
         val metastoreTable = sharedState.externalCatalog.getTable("default", 
"t")
-        
assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt 
=== 1)
-        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBuckets"))
-        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBucketCols"))
-        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt 
=== 1)
+        
assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMBUCKETS))
+        
assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMBUCKETCOLS))
+        
assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMSORTCOLS))
 
         checkAnswer(table("t"), Row(2, 1))
       }
@@ -984,10 +984,10 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         )
 
         val metastoreTable = sharedState.externalCatalog.getTable("default", 
"t")
-        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
-        
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt 
=== 2)
-        
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
 === 1)
-        
assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt 
=== 1)
+        
assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS))
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt 
=== 2)
+        
assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt 
=== 1)
 
         checkAnswer(table("t"), Row(1, 2))
       }
@@ -1006,10 +1006,10 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         )
 
         val metastoreTable = sharedState.externalCatalog.getTable("default", 
"t")
-        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
-        
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt 
=== 2)
-        
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
 === 1)
-        
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
+        
assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS))
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt 
=== 2)
+        
assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1)
+        
assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMSORTCOLS))
 
         checkAnswer(table("t"), Row(1, 2))
       }
@@ -1031,10 +1031,10 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         )
 
         val metastoreTable = sharedState.externalCatalog.getTable("default", 
"t")
-        
assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt 
=== 1)
-        
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt 
=== 2)
-        
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
 === 1)
-        
assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt 
=== 1)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt 
=== 1)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt 
=== 2)
+        
assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1)
+        assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt 
=== 1)
 
         checkAnswer(table("t"), Row(2, 3, 1))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 6f374d7..741abcb 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
 
@@ -101,24 +101,22 @@ class HiveCommandSuite extends QueryTest with 
SQLTestUtils with TestHiveSingleto
 
   test("show tblproperties of data source tables - basic") {
     checkAnswer(
-      sql("SHOW TBLPROPERTIES parquet_tab1")
-        .filter(s"key = 'spark.sql.sources.provider'"),
-      Row("spark.sql.sources.provider", 
"org.apache.spark.sql.parquet.DefaultSource") :: Nil
+      sql("SHOW TBLPROPERTIES parquet_tab1").filter(s"key = 
'$DATASOURCE_PROVIDER'"),
+      Row(DATASOURCE_PROVIDER, "org.apache.spark.sql.parquet.DefaultSource") 
:: Nil
     )
 
     checkAnswer(
-      sql("SHOW TBLPROPERTIES parquet_tab1(spark.sql.sources.provider)"),
+      sql(s"SHOW TBLPROPERTIES parquet_tab1($DATASOURCE_PROVIDER)"),
       Row("org.apache.spark.sql.parquet.DefaultSource") :: Nil
     )
 
     checkAnswer(
-      sql("SHOW TBLPROPERTIES parquet_tab1")
-        .filter(s"key = 'spark.sql.sources.schema.numParts'"),
-      Row("spark.sql.sources.schema.numParts", "1") :: Nil
+      sql("SHOW TBLPROPERTIES parquet_tab1").filter(s"key = 
'$DATASOURCE_SCHEMA_NUMPARTS'"),
+      Row(DATASOURCE_SCHEMA_NUMPARTS, "1") :: Nil
     )
 
     checkAnswer(
-      sql("SHOW TBLPROPERTIES 
parquet_tab1('spark.sql.sources.schema.numParts')"),
+      sql(s"SHOW TBLPROPERTIES parquet_tab1('$DATASOURCE_SCHEMA_NUMPARTS')"),
       Row("1"))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4538443e/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 0fa1841..1fb777a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.{sources, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, 
GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, 
Literal}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -144,7 +145,7 @@ class AppendingTextOutputFormat(outputFile: Path) extends 
TextOutputFormat[NullW
 
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
     val configuration = context.getConfiguration
-    val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+    val uniqueWriteJobId = configuration.get(DATASOURCE_WRITEJOBUUID)
     val taskAttemptId = context.getTaskAttemptID
     val split = taskAttemptId.getTaskID.getId
     val name = FileOutputFormat.getOutputName(context)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to