[SPARK-7961][SQL]Refactor SQLConf to display better error message

1. Add `SQLConfEntry` to store the information about a configuration. For those 
configurations that cannot be found in `sql-programming-guide.md`, I left the 
doc as `<TODO>`.
2. Verify the value when setting a configuration if this is in SQLConf.
3. Use `SET -v` to display all public configurations.

Author: zsxwing <zsxw...@gmail.com>

Closes #6747 from zsxwing/sqlconf and squashes the following commits:

7d09bad [zsxwing] Use SQLConfEntry in HiveContext
49f6213 [zsxwing] Add getConf, setConf to SQLContext and HiveContext
e014f53 [zsxwing] Merge branch 'master' into sqlconf
93dad8e [zsxwing] Fix the unit tests
cf950c1 [zsxwing] Fix the code style and tests
3c5f03e [zsxwing] Add unsetConf(SQLConfEntry) and fix the code style
a2f4add [zsxwing] getConf will return the default value if a config is not set
037b1db [zsxwing] Add schema to SetCommand
0520c3c [zsxwing] Merge branch 'master' into sqlconf
7afb0ec [zsxwing] Fix the configurations about HiveThriftServer
7e728e3 [zsxwing] Add doc for SQLConfEntry and fix 'toString'
5e95b10 [zsxwing] Add enumConf
c6ba76d [zsxwing] setRawString => setConfString, getRawString => getConfString
4abd807 [zsxwing] Fix the test for 'set -v'
6e47e56 [zsxwing] Fix the compilation error
8973ced [zsxwing] Remove floatConf
1fc3a8b [zsxwing] Remove the 'conf' command and use 'set -v' instead
99c9c16 [zsxwing] Fix tests that use SQLConfEntry as a string
88a03cc [zsxwing] Add new lines between confs and return types
ce7c6c8 [zsxwing] Remove seqConf
f3c1b33 [zsxwing] Refactor SQLConf to display better error message


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78a430ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78a430ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78a430ea

Branch: refs/heads/master
Commit: 78a430ea4d2aef58a8bf38ce488553ca6acea428
Parents: 9db73ec
Author: zsxwing <zsxw...@gmail.com>
Authored: Wed Jun 17 23:22:54 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jun 17 23:22:54 2015 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |   4 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    | 493 +++++++++++++++----
 .../scala/org/apache/spark/sql/SQLContext.scala |  25 +-
 .../org/apache/spark/sql/SparkSQLParser.scala   |   4 +-
 .../apache/spark/sql/execution/commands.scala   |  98 +++-
 .../spark/sql/execution/debug/package.scala     |   2 +-
 .../sql/parquet/ParquetTableOperations.scala    |   8 +-
 .../apache/spark/sql/parquet/newParquet.scala   |   6 +-
 .../org/apache/spark/sql/sources/commands.scala |   2 +-
 .../apache/spark/sql/test/TestSQLContext.scala  |   2 +-
 .../spark/sql/DataFrameAggregateSuite.scala     |   4 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |  14 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  14 +-
 .../apache/spark/sql/SQLConfEntrySuite.scala    | 150 ++++++
 .../org/apache/spark/sql/SQLConfSuite.scala     |  10 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  42 +-
 .../columnar/PartitionBatchPruningSuite.scala   |   8 +-
 .../spark/sql/execution/PlannerSuite.scala      |   8 +-
 .../org/apache/spark/sql/json/JsonSuite.scala   |   4 +-
 .../spark/sql/parquet/ParquetFilterSuite.scala  |  14 +-
 .../spark/sql/parquet/ParquetIOSuite.scala      |  16 +-
 .../spark/sql/parquet/ParquetQuerySuite.scala   |   8 +-
 .../spark/sql/sources/DataSourceTest.scala      |   2 +-
 .../apache/spark/sql/test/SQLTestUtils.scala    |   6 +-
 .../hive/thriftserver/HiveThriftServer2.scala   |   4 +-
 .../SparkExecuteStatementOperation.scala        |   2 +-
 .../thriftserver/HiveThriftServer2Suites.scala  |  22 +-
 .../hive/execution/HiveCompatibilitySuite.scala |   8 +-
 .../execution/SortMergeCompatibilitySuite.scala |   4 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |  88 +++-
 .../apache/spark/sql/hive/test/TestHive.scala   |   5 +-
 .../spark/sql/hive/HiveParquetSuite.scala       |   4 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  16 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala |   8 +-
 .../sql/hive/execution/HiveQuerySuite.scala     |  12 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  20 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  20 +-
 37 files changed, 861 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 61f9c5f..c6e6ec8 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1220,7 +1220,7 @@ Configuration of Parquet can be done using the `setConf` 
method on `SQLContext`
   <td>false</td>
   <td>
     Some other Parquet-producing systems, in particular Impala and older 
versions of Spark SQL, do
-    not differentiate between binary data and strings when writing out the 
Parquet schema.  This
+    not differentiate between binary data and strings when writing out the 
Parquet schema. This
     flag tells Spark SQL to interpret binary data as a string to provide 
compatibility with these systems.
   </td>
 </tr>
@@ -1237,7 +1237,7 @@ Configuration of Parquet can be done using the `setConf` 
method on `SQLContext`
   <td><code>spark.sql.parquet.cacheMetadata</code></td>
   <td>true</td>
   <td>
-    Turns on caching of Parquet schema metadata.  Can speed up querying of 
static data.
+    Turns on caching of Parquet schema metadata. Can speed up querying of 
static data.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 55ab6b3..16493c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -25,74 +25,333 @@ import scala.collection.JavaConversions._
 import org.apache.spark.sql.catalyst.CatalystConf
 
 private[spark] object SQLConf {
-  val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
-  val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
-  val IN_MEMORY_PARTITION_PRUNING = 
"spark.sql.inMemoryColumnarStorage.partitionPruning"
-  val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
-  val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
-  val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
-  val CODEGEN_ENABLED = "spark.sql.codegen"
-  val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
-  val DIALECT = "spark.sql.dialect"
-  val CASE_SENSITIVE = "spark.sql.caseSensitive"
-
-  val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
-  val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
-  val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
-  val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
-  val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
-  val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
-
-  val ORC_FILTER_PUSHDOWN_ENABLED = "spark.sql.orc.filterPushdown"
-
-  val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"
-
-  val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
-  val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
+
+  private val sqlConfEntries = java.util.Collections.synchronizedMap(
+    new java.util.HashMap[String, SQLConfEntry[_]]())
+
+  /**
+   * An entry contains all meta information for a configuration.
+   *
+   * @param key the key for the configuration
+   * @param defaultValue the default value for the configuration
+   * @param valueConverter how to convert a string to the value. It should 
throw an exception if the
+   *                       string does not have the required format.
+   * @param stringConverter how to convert a value to a string that the user 
can use it as a valid
+   *                        string value. It's usually `toString`. But 
sometimes, a custom converter
+   *                        is necessary. E.g., if T is List[String], `a, b, 
c` is better than
+   *                        `List(a, b, c)`.
+   * @param doc the document for the configuration
+   * @param isPublic if this configuration is public to the user. If it's 
`false`, this
+   *                 configuration is only used internally and we should not 
expose it to the user.
+   * @tparam T the value type
+   */
+  private[sql] class SQLConfEntry[T] private(
+      val key: String,
+      val defaultValue: Option[T],
+      val valueConverter: String => T,
+      val stringConverter: T => String,
+      val doc: String,
+      val isPublic: Boolean) {
+
+    def defaultValueString: String = 
defaultValue.map(stringConverter).getOrElse("<undefined>")
+
+    override def toString: String = {
+      s"SQLConfEntry(key = $key, defaultValue=$defaultValueString, doc=$doc, 
isPublic = $isPublic)"
+    }
+  }
+
+  private[sql] object SQLConfEntry {
+
+    private def apply[T](
+          key: String,
+          defaultValue: Option[T],
+          valueConverter: String => T,
+          stringConverter: T => String,
+          doc: String,
+          isPublic: Boolean): SQLConfEntry[T] =
+      sqlConfEntries.synchronized {
+        if (sqlConfEntries.containsKey(key)) {
+          throw new IllegalArgumentException(s"Duplicate SQLConfEntry. $key 
has been registered")
+        }
+        val entry =
+          new SQLConfEntry[T](key, defaultValue, valueConverter, 
stringConverter, doc, isPublic)
+        sqlConfEntries.put(key, entry)
+        entry
+      }
+
+    def intConf(
+          key: String,
+          defaultValue: Option[Int] = None,
+          doc: String = "",
+          isPublic: Boolean = true): SQLConfEntry[Int] =
+      SQLConfEntry(key, defaultValue, { v =>
+        try {
+          v.toInt
+        } catch {
+          case _: NumberFormatException =>
+            throw new IllegalArgumentException(s"$key should be int, but was 
$v")
+        }
+      }, _.toString, doc, isPublic)
+
+    def longConf(
+        key: String,
+        defaultValue: Option[Long] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[Long] =
+      SQLConfEntry(key, defaultValue, { v =>
+        try {
+          v.toLong
+        } catch {
+          case _: NumberFormatException =>
+            throw new IllegalArgumentException(s"$key should be long, but was 
$v")
+        }
+      }, _.toString, doc, isPublic)
+
+    def doubleConf(
+        key: String,
+        defaultValue: Option[Double] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[Double] =
+      SQLConfEntry(key, defaultValue, { v =>
+        try {
+          v.toDouble
+        } catch {
+          case _: NumberFormatException =>
+            throw new IllegalArgumentException(s"$key should be double, but 
was $v")
+        }
+      }, _.toString, doc, isPublic)
+
+    def booleanConf(
+        key: String,
+        defaultValue: Option[Boolean] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[Boolean] =
+      SQLConfEntry(key, defaultValue, { v =>
+        try {
+          v.toBoolean
+        } catch {
+          case _: IllegalArgumentException =>
+            throw new IllegalArgumentException(s"$key should be boolean, but 
was $v")
+        }
+      }, _.toString, doc, isPublic)
+
+    def stringConf(
+        key: String,
+        defaultValue: Option[String] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[String] =
+      SQLConfEntry(key, defaultValue, v => v, v => v, doc, isPublic)
+
+    def enumConf[T](
+        key: String,
+        valueConverter: String => T,
+        validValues: Set[T],
+        defaultValue: Option[T] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[T] =
+      SQLConfEntry(key, defaultValue, v => {
+        val _v = valueConverter(v)
+        if (!validValues.contains(_v)) {
+          throw new IllegalArgumentException(
+            s"The value of $key should be one of ${validValues.mkString(", 
")}, but was $v")
+        }
+        _v
+      }, _.toString, doc, isPublic)
+
+    def seqConf[T](
+        key: String,
+        valueConverter: String => T,
+        defaultValue: Option[Seq[T]] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[Seq[T]] = {
+      SQLConfEntry(
+        key, defaultValue, _.split(",").map(valueConverter), _.mkString(","), 
doc, isPublic)
+    }
+
+    def stringSeqConf(
+        key: String,
+        defaultValue: Option[Seq[String]] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[Seq[String]] = {
+      seqConf(key, s => s, defaultValue, doc, isPublic)
+    }
+  }
+
+  import SQLConfEntry._
+
+  val COMPRESS_CACHED = 
booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
+    defaultValue = Some(true),
+    doc = "When set to true Spark SQL will automatically select a compression 
codec for each " +
+      "column based on statistics of the data.")
+
+  val COLUMN_BATCH_SIZE = 
intConf("spark.sql.inMemoryColumnarStorage.batchSize",
+    defaultValue = Some(10000),
+    doc = "Controls the size of batches for columnar caching.  Larger batch 
sizes can improve " +
+      "memory utilization and compression, but risk OOMs when caching data.")
+
+  val IN_MEMORY_PARTITION_PRUNING =
+    booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning",
+      defaultValue = Some(false),
+      doc = "<TODO>")
+
+  val AUTO_BROADCASTJOIN_THRESHOLD = 
intConf("spark.sql.autoBroadcastJoinThreshold",
+    defaultValue = Some(10 * 1024 * 1024),
+    doc = "Configures the maximum size in bytes for a table that will be 
broadcast to all worker " +
+      "nodes when performing a join.  By setting this value to -1 broadcasting 
can be disabled. " +
+      "Note that currently statistics are only supported for Hive Metastore 
tables where the " +
+      "command<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS 
noscan</code> has been run.")
+
+  val DEFAULT_SIZE_IN_BYTES = longConf("spark.sql.defaultSizeInBytes", 
isPublic = false)
+
+  val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions",
+    defaultValue = Some(200),
+    doc = "Configures the number of partitions to use when shuffling data for 
joins or " +
+      "aggregations.")
+
+  val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
+    defaultValue = Some(true),
+    doc = "When true, code will be dynamically generated at runtime for 
expression evaluation in" +
+      " a specific query. For some queries with complicated expression this 
option can lead to " +
+      "significant speed-ups. However, for simple queries this can actually 
slow down query " +
+      "execution.")
+
+  val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
+    defaultValue = Some(false),
+    doc = "<TDDO>")
+
+  val DIALECT = stringConf("spark.sql.dialect", defaultValue = Some("sql"), 
doc = "<TODO>")
+
+  val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive",
+    defaultValue = Some(true),
+    doc = "<TODO>")
+
+  val PARQUET_BINARY_AS_STRING = 
booleanConf("spark.sql.parquet.binaryAsString",
+    defaultValue = Some(false),
+    doc = "Some other Parquet-producing systems, in particular Impala and 
older versions of " +
+      "Spark SQL, do not differentiate between binary data and strings when 
writing out the " +
+      "Parquet schema. This flag tells Spark SQL to interpret binary data as a 
string to provide " +
+      "compatibility with these systems.")
+
+  val PARQUET_INT96_AS_TIMESTAMP = 
booleanConf("spark.sql.parquet.int96AsTimestamp",
+    defaultValue = Some(true),
+    doc = "Some Parquet-producing systems, in particular Impala, store 
Timestamp into INT96. " +
+      "Spark would also store Timestamp as INT96 because we need to avoid 
precision lost of the " +
+      "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as 
a timestamp to " +
+      "provide compatibility with these systems.")
+
+  val PARQUET_CACHE_METADATA = booleanConf("spark.sql.parquet.cacheMetadata",
+    defaultValue = Some(true),
+    doc = "Turns on caching of Parquet schema metadata. Can speed up querying 
of static data.")
+
+  val PARQUET_COMPRESSION = enumConf("spark.sql.parquet.compression.codec",
+    valueConverter = v => v.toLowerCase,
+    validValues = Set("uncompressed", "snappy", "gzip", "lzo"),
+    defaultValue = Some("gzip"),
+    doc = "Sets the compression codec use when writing Parquet files. 
Acceptable values include: " +
+      "uncompressed, snappy, gzip, lzo.")
+
+  val PARQUET_FILTER_PUSHDOWN_ENABLED = 
booleanConf("spark.sql.parquet.filterPushdown",
+    defaultValue = Some(false),
+    doc = "Turn on Parquet filter pushdown optimization. This feature is 
turned off by default" +
+      " because of a known bug in Paruet 1.6.0rc3 " +
+      "(<a 
href=\"https://issues.apache.org/jira/browse/PARQUET-136\";>PARQUET-136</a>). 
However, " +
+      "if your table doesn't contain any nullable string or binary columns, 
it's still safe to " +
+      "turn this feature on.")
+
+  val PARQUET_USE_DATA_SOURCE_API = 
booleanConf("spark.sql.parquet.useDataSourceApi",
+    defaultValue = Some(true),
+    doc = "<TODO>")
+
+  val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
+    defaultValue = Some(false),
+    doc = "<TODO>")
+
+  val HIVE_VERIFY_PARTITIONPATH = 
booleanConf("spark.sql.hive.verifyPartitionPath",
+    defaultValue = Some(true),
+    doc = "<TODO>")
+
+  val COLUMN_NAME_OF_CORRUPT_RECORD = 
stringConf("spark.sql.columnNameOfCorruptRecord",
+    defaultValue = Some("_corrupt_record"),
+    doc = "<TODO>")
+
+  val BROADCAST_TIMEOUT = intConf("spark.sql.broadcastTimeout",
+    defaultValue = Some(5 * 60),
+    doc = "<TODO>")
 
   // Options that control which operators can be chosen by the query planner.  
These should be
   // considered hints and may be ignored by future versions of Spark SQL.
-  val EXTERNAL_SORT = "spark.sql.planner.externalSort"
-  val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
+  val EXTERNAL_SORT = booleanConf("spark.sql.planner.externalSort",
+    defaultValue = Some(true),
+    doc = "When true, performs sorts spilling to disk as needed otherwise sort 
each partition in" +
+      " memory.")
+
+  val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
+    defaultValue = Some(false),
+    doc = "<TODO>")
 
   // This is only used for the thriftserver
-  val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
-  val THRIFTSERVER_UI_STATEMENT_LIMIT = 
"spark.sql.thriftserver.ui.retainedStatements"
-  val THRIFTSERVER_UI_SESSION_LIMIT = 
"spark.sql.thriftserver.ui.retainedSessions"
+  val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
+    doc = "Set a Fair Scheduler pool for a JDBC client session")
+
+  val THRIFTSERVER_UI_STATEMENT_LIMIT = 
intConf("spark.sql.thriftserver.ui.retainedStatements",
+    defaultValue = Some(200),
+    doc = "<TODO>")
+
+  val THRIFTSERVER_UI_SESSION_LIMIT = 
intConf("spark.sql.thriftserver.ui.retainedSessions",
+    defaultValue = Some(200),
+    doc = "<TODO>")
 
   // This is used to set the default data source
-  val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
+  val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
+    defaultValue = Some("org.apache.spark.sql.parquet"),
+    doc = "<TODO>")
+
   // This is used to control the when we will split a schema's JSON string to 
multiple pieces
   // in order to fit the JSON string in metastore's table property (by 
default, the value has
   // a length restriction of 4000 characters). We will split the JSON string 
of a schema
   // to its length exceeds the threshold.
-  val SCHEMA_STRING_LENGTH_THRESHOLD = 
"spark.sql.sources.schemaStringLengthThreshold"
+  val SCHEMA_STRING_LENGTH_THRESHOLD = 
intConf("spark.sql.sources.schemaStringLengthThreshold",
+    defaultValue = Some(4000),
+    doc = "<TODO>")
 
   // Whether to perform partition discovery when loading external data 
sources.  Default to true.
-  val PARTITION_DISCOVERY_ENABLED = 
"spark.sql.sources.partitionDiscovery.enabled"
+  val PARTITION_DISCOVERY_ENABLED = 
booleanConf("spark.sql.sources.partitionDiscovery.enabled",
+    defaultValue = Some(true),
+    doc = "<TODO>")
 
   // Whether to perform partition column type inference. Default to true.
-  val PARTITION_COLUMN_TYPE_INFERENCE = 
"spark.sql.sources.partitionColumnTypeInference.enabled"
+  val PARTITION_COLUMN_TYPE_INFERENCE =
+    booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
+      defaultValue = Some(true),
+      doc = "<TODO>")
 
   // The output committer class used by FSBasedRelation. The specified class 
needs to be a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
   // NOTE: This property should be set in Hadoop `Configuration` rather than 
Spark `SQLConf`
-  val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
+  val OUTPUT_COMMITTER_CLASS =
+    stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
 
   // Whether to perform eager analysis when constructing a dataframe.
   // Set to false when debugging requires the ability to look at invalid query 
plans.
-  val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
+  val DATAFRAME_EAGER_ANALYSIS = booleanConf("spark.sql.eagerAnalysis",
+    defaultValue = Some(true),
+    doc = "<TODO>")
 
   // Whether to automatically resolve ambiguity in join conditions for 
self-joins.
   // See SPARK-6231.
-  val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = 
"spark.sql.selfJoinAutoResolveAmbiguity"
+  val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
+    booleanConf("spark.sql.selfJoinAutoResolveAmbiguity", defaultValue = 
Some(true), doc = "<TODO>")
 
   // Whether to retain group by columns or not in GroupedData.agg.
-  val DATAFRAME_RETAIN_GROUP_COLUMNS = "spark.sql.retainGroupColumns"
+  val DATAFRAME_RETAIN_GROUP_COLUMNS = 
booleanConf("spark.sql.retainGroupColumns",
+    defaultValue = Some(true),
+    doc = "<TODO>")
 
-  val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2"
+  val USE_SQL_SERIALIZER2 = booleanConf("spark.sql.useSerializer2",
+    defaultValue = Some(true), doc = "<TODO>")
 
-  val USE_JACKSON_STREAMING_API = "spark.sql.json.useJacksonStreamingAPI"
+  val USE_JACKSON_STREAMING_API = 
booleanConf("spark.sql.json.useJacksonStreamingAPI",
+    defaultValue = Some(true), doc = "<TODO>")
 
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -131,56 +390,54 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
    * Note that the choice of dialect does not affect things like what tables 
are available or
    * how query execution is performed.
    */
-  private[spark] def dialect: String = getConf(DIALECT, "sql")
+  private[spark] def dialect: String = getConf(DIALECT)
 
   /** When true tables cached using the in-memory columnar caching will be 
compressed. */
-  private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, 
"true").toBoolean
+  private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED)
 
   /** The compression codec for writing to a Parquetfile */
-  private[spark] def parquetCompressionCodec: String = 
getConf(PARQUET_COMPRESSION, "gzip")
+  private[spark] def parquetCompressionCodec: String = 
getConf(PARQUET_COMPRESSION)
+
+  private[spark] def parquetCacheMetadata: Boolean = 
getConf(PARQUET_CACHE_METADATA)
 
   /** The number of rows that will be  */
-  private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, 
"10000").toInt
+  private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
 
   /** Number of partitions to use for shuffle operators. */
-  private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, 
"200").toInt
+  private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
 
   /** When true predicates will be passed to the parquet record reader when 
possible. */
-  private[spark] def parquetFilterPushDown =
-    getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+  private[spark] def parquetFilterPushDown: Boolean = 
getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
 
   /** When true uses Parquet implementation based on data source API */
-  private[spark] def parquetUseDataSourceApi =
-    getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
+  private[spark] def parquetUseDataSourceApi: Boolean = 
getConf(PARQUET_USE_DATA_SOURCE_API)
 
-  private[spark] def orcFilterPushDown =
-    getConf(ORC_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+  private[spark] def orcFilterPushDown: Boolean = 
getConf(ORC_FILTER_PUSHDOWN_ENABLED)
 
   /** When true uses verifyPartitionPath to prune the path which is not 
exists. */
-  private[spark] def verifyPartitionPath =
-    getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
+  private[spark] def verifyPartitionPath: Boolean = 
getConf(HIVE_VERIFY_PARTITIONPATH)
 
   /** When true the planner will use the external sort, which may spill to 
disk. */
-  private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, 
"true").toBoolean
+  private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
 
   /**
    * Sort merge join would sort the two side of join first, and then iterate 
both sides together
    * only once to get all matches. Using sort merge join can save a lot of 
memory usage compared
    * to HashJoin.
    */
-  private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN, 
"false").toBoolean
+  private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
 
   /**
    * When set to true, Spark SQL will use the Janino at runtime to generate 
custom bytecode
    * that evaluates expressions found in queries.  In general this custom code 
runs much faster
    * than interpreted evaluation, but there are some start-up costs (5-10ms) 
due to compilation.
    */
-  private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, 
"true").toBoolean
+  private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED)
 
   /**
    * caseSensitive analysis true by default
    */
-  def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, 
"true").toBoolean
+  def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
   /**
    * When set to true, Spark SQL will use managed memory for certain 
operations.  This option only
@@ -188,15 +445,14 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
    *
    * Defaults to false as this feature is currently experimental.
    */
-  private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, 
"false").toBoolean
+  private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED)
 
-  private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, 
"true").toBoolean
+  private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2)
 
   /**
    * Selects between the new (true) and old (false) JSON handlers, to be 
removed in Spark 1.5.0
    */
-  private[spark] def useJacksonStreamingAPI: Boolean =
-    getConf(USE_JACKSON_STREAMING_API, "true").toBoolean
+  private[spark] def useJacksonStreamingAPI: Boolean = 
getConf(USE_JACKSON_STREAMING_API)
 
   /**
    * Upper bound on the sizes (in bytes) of the tables qualified for the auto 
conversion to
@@ -205,8 +461,7 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
    *
    * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose 
default value is 10000.
    */
-  private[spark] def autoBroadcastJoinThreshold: Int =
-    getConf(AUTO_BROADCASTJOIN_THRESHOLD, (10 * 1024 * 1024).toString).toInt
+  private[spark] def autoBroadcastJoinThreshold: Int = 
getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
   /**
    * The default size in bytes to assign to a logical operator's estimation 
statistics.  By default,
@@ -215,82 +470,116 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
    * in joins.
    */
   private[spark] def defaultSizeInBytes: Long =
-    getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 
1).toString).toLong
+    getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L)
 
   /**
    * When set to true, we always treat byte arrays in Parquet files as strings.
    */
-  private[spark] def isParquetBinaryAsString: Boolean =
-    getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean
+  private[spark] def isParquetBinaryAsString: Boolean = 
getConf(PARQUET_BINARY_AS_STRING)
 
   /**
    * When set to true, we always treat INT96Values in Parquet files as 
timestamp.
    */
-  private[spark] def isParquetINT96AsTimestamp: Boolean =
-    getConf(PARQUET_INT96_AS_TIMESTAMP, "true").toBoolean
+  private[spark] def isParquetINT96AsTimestamp: Boolean = 
getConf(PARQUET_INT96_AS_TIMESTAMP)
 
   /**
    * When set to true, partition pruning for in-memory columnar tables is 
enabled.
    */
-  private[spark] def inMemoryPartitionPruning: Boolean =
-    getConf(IN_MEMORY_PARTITION_PRUNING, "false").toBoolean
+  private[spark] def inMemoryPartitionPruning: Boolean = 
getConf(IN_MEMORY_PARTITION_PRUNING)
 
-  private[spark] def columnNameOfCorruptRecord: String =
-    getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record")
+  private[spark] def columnNameOfCorruptRecord: String = 
getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
 
   /**
    * Timeout in seconds for the broadcast wait time in hash join
    */
-  private[spark] def broadcastTimeout: Int =
-    getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt
+  private[spark] def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT)
 
-  private[spark] def defaultDataSourceName: String =
-    getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
+  private[spark] def defaultDataSourceName: String = 
getConf(DEFAULT_DATA_SOURCE_NAME)
 
-  private[spark] def partitionDiscoveryEnabled() =
-    getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean
+  private[spark] def partitionDiscoveryEnabled(): Boolean =
+    getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)
 
-  private[spark] def partitionColumnTypeInferenceEnabled() =
-    getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE, "true").toBoolean
+  private[spark] def partitionColumnTypeInferenceEnabled(): Boolean =
+    getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
 
   // Do not use a value larger than 4000 as the default value of this property.
   // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more 
information.
-  private[spark] def schemaStringLengthThreshold: Int =
-    getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt
+  private[spark] def schemaStringLengthThreshold: Int = 
getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
 
-  private[spark] def dataFrameEagerAnalysis: Boolean =
-    getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean
+  private[spark] def dataFrameEagerAnalysis: Boolean = 
getConf(DATAFRAME_EAGER_ANALYSIS)
 
   private[spark] def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
-    getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY, "true").toBoolean
+    getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)
 
-  private[spark] def dataFrameRetainGroupColumns: Boolean =
-    getConf(DATAFRAME_RETAIN_GROUP_COLUMNS, "true").toBoolean
+  private[spark] def dataFrameRetainGroupColumns: Boolean = 
getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
 
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
   def setConf(props: Properties): Unit = settings.synchronized {
-    props.foreach { case (k, v) => settings.put(k, v) }
+    props.foreach { case (k, v) => setConfString(k, v) }
   }
 
-  /** Set the given Spark SQL configuration property. */
-  def setConf(key: String, value: String): Unit = {
+  /** Set the given Spark SQL configuration property using a `string` value. */
+  def setConfString(key: String, value: String): Unit = {
     require(key != null, "key cannot be null")
     require(value != null, s"value cannot be null for key: $key")
+    val entry = sqlConfEntries.get(key)
+    if (entry != null) {
+      // Only verify configs in the SQLConf object
+      entry.valueConverter(value)
+    }
     settings.put(key, value)
   }
 
+  /** Set the given Spark SQL configuration property. */
+  def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+    require(entry != null, "entry cannot be null")
+    require(value != null, s"value cannot be null for key: ${entry.key}")
+    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not 
registered")
+    settings.put(entry.key, entry.stringConverter(value))
+  }
+
   /** Return the value of Spark SQL configuration property for the given key. 
*/
-  def getConf(key: String): String = {
-    Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key))
+  def getConfString(key: String): String = {
+    Option(settings.get(key)).
+      orElse {
+        // Try to use the default value
+        Option(sqlConfEntries.get(key)).map(_.defaultValueString)
+      }.
+      getOrElse(throw new NoSuchElementException(key))
+  }
+
+  /**
+   * Return the value of Spark SQL configuration property for the given key. 
If the key is not set
+   * yet, return `defaultValue`. This is useful when `defaultValue` in 
SQLConfEntry is not the
+   * desired one.
+   */
+  def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not 
registered")
+    
Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue)
   }
 
   /**
    * Return the value of Spark SQL configuration property for the given key. 
If the key is not set
-   * yet, return `defaultValue`.
+   * yet, return `defaultValue` in [[SQLConfEntry]].
+   */
+  def getConf[T](entry: SQLConfEntry[T]): T = {
+    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not 
registered")
+    
Option(settings.get(entry.key)).map(entry.valueConverter).orElse(entry.defaultValue).
+      getOrElse(throw new NoSuchElementException(entry.key))
+  }
+
+  /**
+   * Return the `string` value of Spark SQL configuration property for the 
given key. If the key is
+   * not set yet, return `defaultValue`.
    */
-  def getConf(key: String, defaultValue: String): String = {
+  def getConfString(key: String, defaultValue: String): String = {
+    val entry = sqlConfEntries.get(key)
+    if (entry != null && defaultValue != "<undefined>") {
+      // Only verify configs in the SQLConf object
+      entry.valueConverter(defaultValue)
+    }
     Option(settings.get(key)).getOrElse(defaultValue)
   }
 
@@ -300,11 +589,25 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
    */
   def getAllConfs: immutable.Map[String, String] = settings.synchronized { 
settings.toMap }
 
-  private[spark] def unsetConf(key: String) {
+  /**
+   * Return all the configuration definitions that have been defined in 
[[SQLConf]]. Each
+   * definition contains key, defaultValue and doc.
+   */
+  def getAllDefinedConfs: Seq[(String, String, String)] = 
sqlConfEntries.synchronized {
+    sqlConfEntries.values.filter(_.isPublic).map { entry =>
+      (entry.key, entry.defaultValueString, entry.doc)
+    }.toSeq
+  }
+
+  private[spark] def unsetConf(key: String): Unit = {
     settings -= key
   }
 
-  private[spark] def clear() {
+  private[spark] def unsetConf(entry: SQLConfEntry[_]): Unit = {
+    settings -= entry.key
+  }
+
+  private[spark] def clear(): Unit = {
     settings.clear()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6b605f7..04fc798 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLConf.SQLConfEntry
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.errors.DialectException
 import org.apache.spark.sql.catalyst.expressions._
@@ -79,13 +80,16 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   def setConf(props: Properties): Unit = conf.setConf(props)
 
+  /** Set the given Spark SQL configuration property. */
+  private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = 
conf.setConf(entry, value)
+
   /**
    * Set the given Spark SQL configuration property.
    *
    * @group config
    * @since 1.0.0
    */
-  def setConf(key: String, value: String): Unit = conf.setConf(key, value)
+  def setConf(key: String, value: String): Unit = conf.setConfString(key, 
value)
 
   /**
    * Return the value of Spark SQL configuration property for the given key.
@@ -93,7 +97,22 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group config
    * @since 1.0.0
    */
-  def getConf(key: String): String = conf.getConf(key)
+  def getConf(key: String): String = conf.getConfString(key)
+
+  /**
+   * Return the value of Spark SQL configuration property for the given key. 
If the key is not set
+   * yet, return `defaultValue` in [[SQLConfEntry]].
+   */
+  private[sql] def getConf[T](entry: SQLConfEntry[T]): T = conf.getConf(entry)
+
+  /**
+   * Return the value of Spark SQL configuration property for the given key. 
If the key is not set
+   * yet, return `defaultValue`. This is useful when `defaultValue` in 
SQLConfEntry is not the
+   * desired one.
+   */
+  private[sql] def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+    conf.getConf(entry, defaultValue)
+  }
 
   /**
    * Return the value of Spark SQL configuration property for the given key. 
If the key is not set
@@ -102,7 +121,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group config
    * @since 1.0.0
    */
-  def getConf(key: String, defaultValue: String): String = conf.getConf(key, 
defaultValue)
+  def getConf(key: String, defaultValue: String): String = 
conf.getConfString(key, defaultValue)
 
   /**
    * Return all the configuration properties that have been set (i.e. not the 
default).

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
index 305b306..e59fa6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
@@ -44,8 +44,8 @@ private[sql] class SparkSQLParser(fallback: String => 
LogicalPlan) extends Abstr
 
     private val pair: Parser[LogicalPlan] =
       (key ~ ("=".r ~> value).?).? ^^ {
-        case None => SetCommand(None, output)
-        case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)), output)
+        case None => SetCommand(None)
+        case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)))
       }
 
     def apply(input: String): LogicalPlan = parseAll(pair, input) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index c9dfcea..5e9951f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.NoSuchElementException
+
 import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
@@ -75,48 +77,92 @@ private[sql] case class ExecutedCommand(cmd: 
RunnableCommand) extends SparkPlan
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class SetCommand(
-    kv: Option[(String, Option[String])],
-    override val output: Seq[Attribute])
-  extends RunnableCommand with Logging {
+case class SetCommand(kv: Option[(String, Option[String])]) extends 
RunnableCommand with Logging {
+
+  private def keyValueOutput: Seq[Attribute] = {
+    val schema = StructType(
+      StructField("key", StringType, false) ::
+        StructField("value", StringType, false) :: Nil)
+    schema.toAttributes
+  }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = kv match {
+  private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = 
kv match {
     // Configures the deprecated "mapred.reduce.tasks" property.
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
-      logWarning(
-        s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
-          s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
-      if (value.toInt < 1) {
-        val msg = s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} 
for automatically " +
-          "determining the number of reducers is not supported."
-        throw new IllegalArgumentException(msg)
-      } else {
-        sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
-        Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, 
" +
+            s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} 
instead.")
+        if (value.toInt < 1) {
+          val msg =
+            s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for 
automatically " +
+              "determining the number of reducers is not supported."
+          throw new IllegalArgumentException(msg)
+        } else {
+          sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
+          Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
+        }
       }
+      (keyValueOutput, runFunc)
 
     // Configures a single property.
     case Some((key, Some(value))) =>
-      sqlContext.setConf(key, value)
-      Seq(Row(s"$key=$value"))
+      val runFunc = (sqlContext: SQLContext) => {
+        sqlContext.setConf(key, value)
+        Seq(Row(key, value))
+      }
+      (keyValueOutput, runFunc)
 
-    // Queries all key-value pairs that are set in the SQLConf of the 
sqlContext.
-    // Notice that different from Hive, here "SET -v" is an alias of "SET".
     // (In Hive, "SET" returns all changed properties while "SET -v" returns 
all properties.)
-    case Some(("-v", None)) | None =>
-      sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
+    // Queries all key-value pairs that are set in the SQLConf of the 
sqlContext.
+    case None =>
+      val runFunc = (sqlContext: SQLContext) => {
+        sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
+      }
+      (keyValueOutput, runFunc)
+
+    // Queries all properties along with their default values and docs that 
are defined in the
+    // SQLConf of the sqlContext.
+    case Some(("-v", None)) =>
+      val runFunc = (sqlContext: SQLContext) => {
+        sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) 
=>
+          Row(key, defaultValue, doc)
+        }
+      }
+      val schema = StructType(
+        StructField("key", StringType, false) ::
+          StructField("default", StringType, false) ::
+          StructField("meaning", StringType, false) :: Nil)
+      (schema.toAttributes, runFunc)
 
     // Queries the deprecated "mapred.reduce.tasks" property.
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
-      logWarning(
-        s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
-          s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
-      
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}"))
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, 
" +
+            s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
+        Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, 
sqlContext.conf.numShufflePartitions.toString))
+      }
+      (keyValueOutput, runFunc)
 
     // Queries a single property.
     case Some((key, None)) =>
-      Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}"))
+      val runFunc = (sqlContext: SQLContext) => {
+        val value =
+          try {
+            sqlContext.getConf(key)
+          } catch {
+            case _: NoSuchElementException => "<undefined>"
+          }
+        Seq(Row(key, value))
+      }
+      (keyValueOutput, runFunc)
   }
+
+  override val output: Seq[Attribute] = _output
+
+  override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext)
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 3ee4033..2964eda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -48,7 +48,7 @@ package object debug {
    */
   implicit class DebugSQLContext(sqlContext: SQLContext) {
     def debug(): Unit = {
-      sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
+      sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 39360e1..65ecad9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -113,12 +113,12 @@ private[sql] case class ParquetTableScan(
       .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
 
     // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
-    conf.set(
-      SQLConf.PARQUET_CACHE_METADATA,
-      sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
+    conf.setBoolean(
+      SQLConf.PARQUET_CACHE_METADATA.key,
+      sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, true))
 
     // Use task side metadata in parquet
-    conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);
+    conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
 
     val baseRDD =
       new org.apache.spark.rdd.NewHadoopRDD(

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index bba6f1e..4c702c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -220,7 +220,7 @@ private[sql] class ParquetRelation2(
     }
 
     conf.setClass(
-      SQLConf.OUTPUT_COMMITTER_CLASS,
+      SQLConf.OUTPUT_COMMITTER_CLASS.key,
       committerClass,
       classOf[ParquetOutputCommitter])
 
@@ -259,7 +259,7 @@ private[sql] class ParquetRelation2(
       filters: Array[Filter],
       inputFiles: Array[FileStatus],
       broadcastedConf: Broadcast[SerializableWritable[Configuration]]): 
RDD[Row] = {
-    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, 
"true").toBoolean
+    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
     val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
     // Create the function to set variable Parquet confs at both driver and 
executor side.
     val initLocalJobFuncOpt =
@@ -498,7 +498,7 @@ private[sql] object ParquetRelation2 extends Logging {
       ParquetTypesConverter.convertToString(dataSchema.toAttributes))
 
     // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet 
and FS metadata
-    conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+    conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
   }
 
   /** This closure sets input paths at the driver side. */

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 3dbe6fa..d39a20b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -323,7 +323,7 @@ private[sql] abstract class BaseWriterContainer(
 
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
     val committerClass = context.getConfiguration.getClass(
-      SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
+      SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
 
     Option(committerClass).map { clazz =>
       logInfo(s"Using user defined output committer class 
${clazz.getCanonicalName}")

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 356a610..9fa3945 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -38,7 +38,7 @@ class LocalSQLContext
   protected[sql] class SQLSession extends super.SQLSession {
     protected[sql] override lazy val conf: SQLConf = new SQLConf {
       /** Fewer partitions to speed up testing. */
-      override def numShufflePartitions: Int = 
this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+      override def numShufflePartitions: Int = 
this.getConf(SQLConf.SHUFFLE_PARTITIONS, 5)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 790b405..b26d3ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -68,12 +68,12 @@ class DataFrameAggregateSuite extends QueryTest {
       Seq(Row(1, 3), Row(2, 3), Row(3, 3))
     )
 
-    ctx.conf.setConf("spark.sql.retainGroupColumns", "false")
+    ctx.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, false)
     checkAnswer(
       testData2.groupBy("a").agg(sum($"b")),
       Seq(Row(3), Row(3), Row(3))
     )
-    ctx.conf.setConf("spark.sql.retainGroupColumns", "true")
+    ctx.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, true)
   }
 
   test("agg without groups") {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index fa98e23..ba1d020 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -33,7 +33,7 @@ class DataFrameSuite extends QueryTest {
   test("analysis error should be eagerly reported") {
     val oldSetting = ctx.conf.dataFrameEagerAnalysis
     // Eager analysis.
-    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")
+    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, true)
 
     intercept[Exception] { testData.select('nonExistentName) }
     intercept[Exception] {
@@ -47,11 +47,11 @@ class DataFrameSuite extends QueryTest {
     }
 
     // No more eager analysis once the flag is turned off
-    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
+    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
     testData.select('nonExistentName)
 
     // Set the flag back to original value before this test.
-    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
+    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting)
   }
 
   test("dataframe toString") {
@@ -70,7 +70,7 @@ class DataFrameSuite extends QueryTest {
 
   test("invalid plan toString, debug mode") {
     val oldSetting = ctx.conf.dataFrameEagerAnalysis
-    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")
+    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, true)
 
     // Turn on debug mode so we can see invalid query plans.
     import org.apache.spark.sql.execution.debug._
@@ -83,7 +83,7 @@ class DataFrameSuite extends QueryTest {
         badPlan.toString)
 
     // Set the flag back to original value before this test.
-    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
+    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting)
   }
 
   test("access complex data") {
@@ -556,13 +556,13 @@ class DataFrameSuite extends QueryTest {
 
   test("SPARK-6899") {
     val originalValue = ctx.conf.codegenEnabled
-    ctx.setConf(SQLConf.CODEGEN_ENABLED, "true")
+    ctx.setConf(SQLConf.CODEGEN_ENABLED, true)
     try{
       checkAnswer(
         decimalData.agg(avg('a)),
         Row(new java.math.BigDecimal(2.0)))
     } finally {
-      ctx.setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+      ctx.setConf(SQLConf.CODEGEN_ENABLED, originalValue)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index ffd26c4..20390a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -95,14 +95,14 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
         classOf[BroadcastNestedLoopJoin])
     ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
     try {
-      ctx.conf.setConf("spark.sql.planner.sortMergeJoin", "true")
+      ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true)
       Seq(
         ("SELECT * FROM testData JOIN testData2 ON key = a", 
classOf[SortMergeJoin]),
         ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", 
classOf[SortMergeJoin]),
         ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", 
classOf[SortMergeJoin])
       ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
     } finally {
-      ctx.conf.setConf("spark.sql.planner.sortMergeJoin", 
SORTMERGEJOIN_ENABLED.toString)
+      ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
     }
   }
 
@@ -118,7 +118,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
         classOf[BroadcastHashJoin])
     ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
     try {
-      ctx.conf.setConf("spark.sql.planner.sortMergeJoin", "true")
+      ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true)
       Seq(
         ("SELECT * FROM testData join testData2 ON key = a", 
classOf[BroadcastHashJoin]),
         ("SELECT * FROM testData join testData2 ON key = a and key = 2",
@@ -127,7 +127,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
           classOf[BroadcastHashJoin])
       ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
     } finally {
-      ctx.conf.setConf("spark.sql.planner.sortMergeJoin", 
SORTMERGEJOIN_ENABLED.toString)
+      ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
     }
 
     ctx.sql("UNCACHE TABLE testData")
@@ -416,7 +416,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
     ctx.sql("CACHE TABLE testData")
     val tmp = ctx.conf.autoBroadcastJoinThreshold
 
-    ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000")
+    ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=1000000000")
     Seq(
       ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
         classOf[BroadcastLeftSemiJoinHash])
@@ -424,7 +424,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
       case (query, joinClass) => assertJoin(query, joinClass)
     }
 
-    ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1")
+    ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1")
 
     Seq(
       ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", 
classOf[LeftSemiJoinHash])
@@ -432,7 +432,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
       case (query, joinClass) => assertJoin(query, joinClass)
     }
 
-    ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp.toString)
+    ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp)
     ctx.sql("UNCACHE TABLE testData")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala
new file mode 100644
index 0000000..2e33777
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.SQLConf._
+
+class SQLConfEntrySuite extends SparkFunSuite {
+
+  val conf = new SQLConf
+
+  test("intConf") {
+    val key = "spark.sql.SQLConfEntrySuite.int"
+    val confEntry = SQLConfEntry.intConf(key)
+    assert(conf.getConf(confEntry, 5) === 5)
+
+    conf.setConf(confEntry, 10)
+    assert(conf.getConf(confEntry, 5) === 10)
+
+    conf.setConfString(key, "20")
+    assert(conf.getConfString(key, "5") === "20")
+    assert(conf.getConfString(key) === "20")
+    assert(conf.getConf(confEntry, 5) === 20)
+
+    val e = intercept[IllegalArgumentException] {
+      conf.setConfString(key, "abc")
+    }
+    assert(e.getMessage === s"$key should be int, but was abc")
+  }
+
+  test("longConf") {
+    val key = "spark.sql.SQLConfEntrySuite.long"
+    val confEntry = SQLConfEntry.longConf(key)
+    assert(conf.getConf(confEntry, 5L) === 5L)
+
+    conf.setConf(confEntry, 10L)
+    assert(conf.getConf(confEntry, 5L) === 10L)
+
+    conf.setConfString(key, "20")
+    assert(conf.getConfString(key, "5") === "20")
+    assert(conf.getConfString(key) === "20")
+    assert(conf.getConf(confEntry, 5L) === 20L)
+
+    val e = intercept[IllegalArgumentException] {
+      conf.setConfString(key, "abc")
+    }
+    assert(e.getMessage === s"$key should be long, but was abc")
+  }
+
+  test("booleanConf") {
+    val key = "spark.sql.SQLConfEntrySuite.boolean"
+    val confEntry = SQLConfEntry.booleanConf(key)
+    assert(conf.getConf(confEntry, false) === false)
+
+    conf.setConf(confEntry, true)
+    assert(conf.getConf(confEntry, false) === true)
+
+    conf.setConfString(key, "true")
+    assert(conf.getConfString(key, "false") === "true")
+    assert(conf.getConfString(key) === "true")
+    assert(conf.getConf(confEntry, false) === true)
+
+    val e = intercept[IllegalArgumentException] {
+      conf.setConfString(key, "abc")
+    }
+    assert(e.getMessage === s"$key should be boolean, but was abc")
+  }
+
+  test("doubleConf") {
+    val key = "spark.sql.SQLConfEntrySuite.double"
+    val confEntry = SQLConfEntry.doubleConf(key)
+    assert(conf.getConf(confEntry, 5.0) === 5.0)
+
+    conf.setConf(confEntry, 10.0)
+    assert(conf.getConf(confEntry, 5.0) === 10.0)
+
+    conf.setConfString(key, "20.0")
+    assert(conf.getConfString(key, "5.0") === "20.0")
+    assert(conf.getConfString(key) === "20.0")
+    assert(conf.getConf(confEntry, 5.0) === 20.0)
+
+    val e = intercept[IllegalArgumentException] {
+      conf.setConfString(key, "abc")
+    }
+    assert(e.getMessage === s"$key should be double, but was abc")
+  }
+
+  test("stringConf") {
+    val key = "spark.sql.SQLConfEntrySuite.string"
+    val confEntry = SQLConfEntry.stringConf(key)
+    assert(conf.getConf(confEntry, "abc") === "abc")
+
+    conf.setConf(confEntry, "abcd")
+    assert(conf.getConf(confEntry, "abc") === "abcd")
+
+    conf.setConfString(key, "abcde")
+    assert(conf.getConfString(key, "abc") === "abcde")
+    assert(conf.getConfString(key) === "abcde")
+    assert(conf.getConf(confEntry, "abc") === "abcde")
+  }
+
+  test("enumConf") {
+    val key = "spark.sql.SQLConfEntrySuite.enum"
+    val confEntry = SQLConfEntry.enumConf(key, v => v, Set("a", "b", "c"), 
defaultValue = Some("a"))
+    assert(conf.getConf(confEntry) === "a")
+
+    conf.setConf(confEntry, "b")
+    assert(conf.getConf(confEntry) === "b")
+
+    conf.setConfString(key, "c")
+    assert(conf.getConfString(key, "a") === "c")
+    assert(conf.getConfString(key) === "c")
+    assert(conf.getConf(confEntry) === "c")
+
+    val e = intercept[IllegalArgumentException] {
+      conf.setConfString(key, "d")
+    }
+    assert(e.getMessage === s"The value of $key should be one of a, b, c, but 
was d")
+  }
+
+  test("stringSeqConf") {
+    val key = "spark.sql.SQLConfEntrySuite.stringSeq"
+    val confEntry = 
SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq",
+      defaultValue = Some(Nil))
+    assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c"))
+
+    conf.setConf(confEntry, Seq("a", "b", "c", "d"))
+    assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", 
"d"))
+
+    conf.setConfString(key, "a,b,c,d,e")
+    assert(conf.getConfString(key, "a,b,c") === "a,b,c,d,e")
+    assert(conf.getConfString(key) === "a,b,c,d,e")
+    assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", 
"d", "e"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 76d0dd1..75791e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -75,6 +75,14 @@ class SQLConfSuite extends QueryTest {
   test("deprecated property") {
     ctx.conf.clear()
     ctx.sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
-    assert(ctx.getConf(SQLConf.SHUFFLE_PARTITIONS) === "10")
+    assert(ctx.conf.numShufflePartitions === 10)
+  }
+
+  test("invalid conf value") {
+    ctx.conf.clear()
+    val e = intercept[IllegalArgumentException] {
+      ctx.sql(s"set ${SQLConf.CASE_SENSITIVE.key}=10")
+    }
+    assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, 
but was 10")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 30db840..82f3fdb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -190,7 +190,7 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll with SQLTestUtils {
 
   test("aggregation with codegen") {
     val originalValue = sqlContext.conf.codegenEnabled
-    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true")
+    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
     // Prepare a table that we can group some rows.
     sqlContext.table("testData")
       .unionAll(sqlContext.table("testData"))
@@ -287,7 +287,7 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll with SQLTestUtils {
         Row(0, null, 0) :: Nil)
     } finally {
       sqlContext.dropTempTable("testData3x")
-      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue)
     }
   }
 
@@ -480,41 +480,41 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll with SQLTestUtils {
 
   test("sorting") {
     val before = sqlContext.conf.externalSortEnabled
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, "false")
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, false)
     sortTest()
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, before.toString)
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, before)
   }
 
   test("external sorting") {
     val before = sqlContext.conf.externalSortEnabled
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, "true")
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, true)
     sortTest()
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, before.toString)
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, before)
   }
 
   test("SPARK-6927 sorting with codegen on") {
     val externalbefore = sqlContext.conf.externalSortEnabled
     val codegenbefore = sqlContext.conf.codegenEnabled
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, "false")
-    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true")
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, false)
+    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
     try{
       sortTest()
     } finally {
-      sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore.toString)
-      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore.toString)
+      sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore)
+      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore)
     }
   }
 
   test("SPARK-6927 external sorting with codegen on") {
     val externalbefore = sqlContext.conf.externalSortEnabled
     val codegenbefore = sqlContext.conf.codegenEnabled
-    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true")
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, "true")
+    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, true)
     try {
       sortTest()
     } finally {
-      sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore.toString)
-      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore.toString)
+      sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore)
+      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore)
     }
   }
 
@@ -908,25 +908,25 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll with SQLTestUtils {
     sql(s"SET $testKey=$testVal")
     checkAnswer(
       sql("SET"),
-      Row(s"$testKey=$testVal")
+      Row(testKey, testVal)
     )
 
     sql(s"SET ${testKey + testKey}=${testVal + testVal}")
     checkAnswer(
       sql("set"),
       Seq(
-        Row(s"$testKey=$testVal"),
-        Row(s"${testKey + testKey}=${testVal + testVal}"))
+        Row(testKey, testVal),
+        Row(testKey + testKey, testVal + testVal))
     )
 
     // "set key"
     checkAnswer(
       sql(s"SET $testKey"),
-      Row(s"$testKey=$testVal")
+      Row(testKey, testVal)
     )
     checkAnswer(
       sql(s"SET $nonexistentKey"),
-      Row(s"$nonexistentKey=<undefined>")
+      Row(nonexistentKey, "<undefined>")
     )
     sqlContext.conf.clear()
   }
@@ -1340,12 +1340,12 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll with SQLTestUtils {
   }
 
   test("SPARK-4699 case sensitivity SQL query") {
-    sqlContext.setConf(SQLConf.CASE_SENSITIVE, "false")
+    sqlContext.setConf(SQLConf.CASE_SENSITIVE, false)
     val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
     val rdd = sqlContext.sparkContext.parallelize((0 to 1).map(i => data(i)))
     rdd.toDF().registerTempTable("testTable1")
     checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), 
Row("val_1"))
-    sqlContext.setConf(SQLConf.CASE_SENSITIVE, "true")
+    sqlContext.setConf(SQLConf.CASE_SENSITIVE, true)
   }
 
   test("SPARK-6145: ORDER BY test for nested fields") {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
index 6545c6b..2c08799 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
@@ -32,7 +32,7 @@ class PartitionBatchPruningSuite extends SparkFunSuite with 
BeforeAndAfterAll wi
 
   override protected def beforeAll(): Unit = {
     // Make a table with 5 partitions, 2 batches per partition, 10 elements 
per batch
-    ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, "10")
+    ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, 10)
 
     val pruningData = ctx.sparkContext.makeRDD((1 to 100).map { key =>
       val string = if (((key - 1) / 10) % 2 == 0) null else key.toString
@@ -41,14 +41,14 @@ class PartitionBatchPruningSuite extends SparkFunSuite with 
BeforeAndAfterAll wi
     pruningData.registerTempTable("pruningData")
 
     // Enable in-memory partition pruning
-    ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, "true")
+    ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
     // Enable in-memory table scan accumulators
     ctx.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true")
   }
 
   override protected def afterAll(): Unit = {
-    ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize.toString)
-    ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, 
originalInMemoryPartitionPruning.toString)
+    ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
+    ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, 
originalInMemoryPartitionPruning)
   }
 
   before {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 3e27f58..5854ab4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -63,7 +63,7 @@ class PlannerSuite extends SparkFunSuite {
 
   test("sizeInBytes estimation of limit operator for broadcast hash join 
optimization") {
     def checkPlan(fieldTypes: Seq[DataType], newThreshold: Int): Unit = {
-      setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, newThreshold.toString)
+      setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, newThreshold)
       val fields = fieldTypes.zipWithIndex.map {
         case (dataType, index) => StructField(s"c${index}", dataType, true)
       } :+ StructField("key", IntegerType, true)
@@ -119,12 +119,12 @@ class PlannerSuite extends SparkFunSuite {
 
     checkPlan(complexTypes, newThreshold = 901617)
 
-    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString)
+    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold)
   }
 
   test("InMemoryRelation statistics propagation") {
     val origThreshold = conf.autoBroadcastJoinThreshold
-    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
+    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920)
 
     testData.limit(3).registerTempTable("tiny")
     sql("CACHE TABLE tiny")
@@ -139,6 +139,6 @@ class PlannerSuite extends SparkFunSuite {
     assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
     assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
 
-    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString)
+    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index fca2436..945d437 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -1077,14 +1077,14 @@ class JsonSuite extends QueryTest with TestJsonData {
   }
 
   test("SPARK-7565 MapType in JsonRDD") {
-    val useStreaming = ctx.getConf(SQLConf.USE_JACKSON_STREAMING_API, "true")
+    val useStreaming = ctx.conf.useJacksonStreamingAPI
     val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord
     ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
 
     val schemaWithSimpleMap = StructType(
       StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
     try{
-      for (useStreaming <- List("true", "false")) {
+      for (useStreaming <- List(true, false)) {
         ctx.setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming)
         val temp = Utils.createTempDir().getPath
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index fa5d4ec..a2763c7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -51,7 +51,7 @@ class ParquetFilterSuiteBase extends QueryTest with 
ParquetTest {
       expected: Seq[Row]): Unit = {
     val output = predicate.collect { case a: Attribute => a }.distinct
 
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
       val query = df
         .select(output.map(e => Column(e)): _*)
         .where(Column(predicate))
@@ -314,17 +314,17 @@ class ParquetDataSourceOnFilterSuite extends 
ParquetFilterSuiteBase with BeforeA
   lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, 
originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 
   test("SPARK-6554: don't push down predicates which reference partition 
columns") {
     import sqlContext.implicits._
 
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
       withTempPath { dir =>
         val path = s"${dir.getCanonicalPath}/part=1"
         (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
@@ -343,17 +343,17 @@ class ParquetDataSourceOffFilterSuite extends 
ParquetFilterSuiteBase with Before
   lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, 
originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 
   test("SPARK-6742: don't push down predicates which reference partition 
columns") {
     import sqlContext.implicits._
 
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
       withTempPath { dir =>
         val path = s"${dir.getCanonicalPath}/part=1"
         (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index fc827bc..284d99d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -94,8 +94,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     val data = (1 to 4).map(i => Tuple1(i.toString))
     // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet 
files written by Spark SQL
     // as we store Spark SQL schema in the extra metadata.
-    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> 
"false")(checkParquetFile(data))
-    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> 
"true")(checkParquetFile(data))
+    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> 
"false")(checkParquetFile(data))
+    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> 
"true")(checkParquetFile(data))
   }
 
   test("fixed-length decimals") {
@@ -231,7 +231,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest 
{
     val data = (0 until 10).map(i => (i, i.toString))
 
     def checkCompressionCodec(codec: CompressionCodecName): Unit = {
-      withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
+      withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
         withParquetFile(data) { path =>
           assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) {
             compressionCodecFor(path)
@@ -408,7 +408,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest 
{
       val clonedConf = new Configuration(configuration)
 
       configuration.set(
-        SQLConf.OUTPUT_COMMITTER_CLASS, 
classOf[ParquetOutputCommitter].getCanonicalName)
+        SQLConf.OUTPUT_COMMITTER_CLASS.key, 
classOf[ParquetOutputCommitter].getCanonicalName)
 
       configuration.set(
         "spark.sql.parquet.output.committer.class",
@@ -440,11 +440,11 @@ class ParquetDataSourceOnIOSuite extends 
ParquetIOSuiteBase with BeforeAndAfterA
   private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, 
originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key, 
originalConf.toString)
   }
 
   test("SPARK-6330 regression test") {
@@ -464,10 +464,10 @@ class ParquetDataSourceOffIOSuite extends 
ParquetIOSuiteBase with BeforeAndAfter
   private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, 
originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index be3b34d..fafad67 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -128,11 +128,11 @@ class ParquetDataSourceOnQuerySuite extends 
ParquetQuerySuiteBase with BeforeAnd
   private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, 
originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 }
 
@@ -140,10 +140,10 @@ class ParquetDataSourceOffQuerySuite extends 
ParquetQuerySuiteBase with BeforeAn
   private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, 
originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
index 3f77960..00cc7d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
@@ -27,7 +27,7 @@ abstract class DataSourceTest extends QueryTest with 
BeforeAndAfter {
   // We want to test some edge cases.
   protected implicit lazy val caseInsensitiveContext = {
     val ctx = new SQLContext(TestSQLContext.sparkContext)
-    ctx.setConf(SQLConf.CASE_SENSITIVE, "false")
+    ctx.setConf(SQLConf.CASE_SENSITIVE, false)
     ctx
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index ac4a00a..fa01823 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -37,11 +37,11 @@ trait SQLTestUtils {
    */
   protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
     val (keys, values) = pairs.unzip
-    val currentValues = keys.map(key => 
Try(sqlContext.conf.getConf(key)).toOption)
-    (keys, values).zipped.foreach(sqlContext.conf.setConf)
+    val currentValues = keys.map(key => 
Try(sqlContext.conf.getConfString(key)).toOption)
+    (keys, values).zipped.foreach(sqlContext.conf.setConfString)
     try f finally {
       keys.zip(currentValues).foreach {
-        case (key, Some(value)) => sqlContext.conf.setConf(key, value)
+        case (key, Some(value)) => sqlContext.conf.setConfString(key, value)
         case (key, None) => sqlContext.conf.unsetConf(key)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index c9da252..700d994 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -153,9 +153,9 @@ object HiveThriftServer2 extends Logging {
     val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
     val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
     val retainedStatements =
-      conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT, "200").toInt
+      conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
     val retainedSessions =
-      conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT, "200").toInt
+      conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
     var totalRunning = 0
 
     override def onJobStart(jobStart: SparkListenerJobStart): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index e071103..e875888 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -219,7 +219,7 @@ private[hive] class SparkExecuteStatementOperation(
       result = hiveContext.sql(statement)
       logDebug(result.queryExecution.toString())
       result.queryExecution.logical match {
-        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value))), _) =>
+        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
           sessionToActivePool(parentSession.getSessionHandle) = value
           logInfo(s"Setting spark.scheduler.pool=$value for future statements 
in this session.")
         case _ =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to