[SPARK-7961][SQL]Refactor SQLConf to display better error message 1. Add `SQLConfEntry` to store the information about a configuration. For those configurations that cannot be found in `sql-programming-guide.md`, I left the doc as `<TODO>`. 2. Verify the value when setting a configuration if this is in SQLConf. 3. Use `SET -v` to display all public configurations.
Author: zsxwing <zsxw...@gmail.com> Closes #6747 from zsxwing/sqlconf and squashes the following commits: 7d09bad [zsxwing] Use SQLConfEntry in HiveContext 49f6213 [zsxwing] Add getConf, setConf to SQLContext and HiveContext e014f53 [zsxwing] Merge branch 'master' into sqlconf 93dad8e [zsxwing] Fix the unit tests cf950c1 [zsxwing] Fix the code style and tests 3c5f03e [zsxwing] Add unsetConf(SQLConfEntry) and fix the code style a2f4add [zsxwing] getConf will return the default value if a config is not set 037b1db [zsxwing] Add schema to SetCommand 0520c3c [zsxwing] Merge branch 'master' into sqlconf 7afb0ec [zsxwing] Fix the configurations about HiveThriftServer 7e728e3 [zsxwing] Add doc for SQLConfEntry and fix 'toString' 5e95b10 [zsxwing] Add enumConf c6ba76d [zsxwing] setRawString => setConfString, getRawString => getConfString 4abd807 [zsxwing] Fix the test for 'set -v' 6e47e56 [zsxwing] Fix the compilation error 8973ced [zsxwing] Remove floatConf 1fc3a8b [zsxwing] Remove the 'conf' command and use 'set -v' instead 99c9c16 [zsxwing] Fix tests that use SQLConfEntry as a string 88a03cc [zsxwing] Add new lines between confs and return types ce7c6c8 [zsxwing] Remove seqConf f3c1b33 [zsxwing] Refactor SQLConf to display better error message Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78a430ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78a430ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78a430ea Branch: refs/heads/master Commit: 78a430ea4d2aef58a8bf38ce488553ca6acea428 Parents: 9db73ec Author: zsxwing <zsxw...@gmail.com> Authored: Wed Jun 17 23:22:54 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Jun 17 23:22:54 2015 -0700 ---------------------------------------------------------------------- docs/sql-programming-guide.md | 4 +- .../scala/org/apache/spark/sql/SQLConf.scala | 493 +++++++++++++++---- .../scala/org/apache/spark/sql/SQLContext.scala | 25 +- .../org/apache/spark/sql/SparkSQLParser.scala | 4 +- .../apache/spark/sql/execution/commands.scala | 98 +++- .../spark/sql/execution/debug/package.scala | 2 +- .../sql/parquet/ParquetTableOperations.scala | 8 +- .../apache/spark/sql/parquet/newParquet.scala | 6 +- .../org/apache/spark/sql/sources/commands.scala | 2 +- .../apache/spark/sql/test/TestSQLContext.scala | 2 +- .../spark/sql/DataFrameAggregateSuite.scala | 4 +- .../org/apache/spark/sql/DataFrameSuite.scala | 14 +- .../scala/org/apache/spark/sql/JoinSuite.scala | 14 +- .../apache/spark/sql/SQLConfEntrySuite.scala | 150 ++++++ .../org/apache/spark/sql/SQLConfSuite.scala | 10 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 42 +- .../columnar/PartitionBatchPruningSuite.scala | 8 +- .../spark/sql/execution/PlannerSuite.scala | 8 +- .../org/apache/spark/sql/json/JsonSuite.scala | 4 +- .../spark/sql/parquet/ParquetFilterSuite.scala | 14 +- .../spark/sql/parquet/ParquetIOSuite.scala | 16 +- .../spark/sql/parquet/ParquetQuerySuite.scala | 8 +- .../spark/sql/sources/DataSourceTest.scala | 2 +- .../apache/spark/sql/test/SQLTestUtils.scala | 6 +- .../hive/thriftserver/HiveThriftServer2.scala | 4 +- .../SparkExecuteStatementOperation.scala | 2 +- .../thriftserver/HiveThriftServer2Suites.scala | 22 +- .../hive/execution/HiveCompatibilitySuite.scala | 8 +- .../execution/SortMergeCompatibilitySuite.scala | 4 +- .../org/apache/spark/sql/hive/HiveContext.scala | 88 +++- .../apache/spark/sql/hive/test/TestHive.scala | 5 +- .../spark/sql/hive/HiveParquetSuite.scala | 4 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 16 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 8 +- .../sql/hive/execution/HiveQuerySuite.scala | 12 +- .../sql/hive/execution/SQLQuerySuite.scala | 20 +- .../apache/spark/sql/hive/parquetSuites.scala | 20 +- 37 files changed, 861 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/docs/sql-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 61f9c5f..c6e6ec8 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1220,7 +1220,7 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext` <td>false</td> <td> Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do - not differentiate between binary data and strings when writing out the Parquet schema. This + not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems. </td> </tr> @@ -1237,7 +1237,7 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext` <td><code>spark.sql.parquet.cacheMetadata</code></td> <td>true</td> <td> - Turns on caching of Parquet schema metadata. Can speed up querying of static data. + Turns on caching of Parquet schema metadata. Can speed up querying of static data. </td> </tr> <tr> http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 55ab6b3..16493c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -25,74 +25,333 @@ import scala.collection.JavaConversions._ import org.apache.spark.sql.catalyst.CatalystConf private[spark] object SQLConf { - val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed" - val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize" - val IN_MEMORY_PARTITION_PRUNING = "spark.sql.inMemoryColumnarStorage.partitionPruning" - val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold" - val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes" - val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" - val CODEGEN_ENABLED = "spark.sql.codegen" - val UNSAFE_ENABLED = "spark.sql.unsafe.enabled" - val DIALECT = "spark.sql.dialect" - val CASE_SENSITIVE = "spark.sql.caseSensitive" - - val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString" - val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp" - val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata" - val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec" - val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown" - val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi" - - val ORC_FILTER_PUSHDOWN_ENABLED = "spark.sql.orc.filterPushdown" - - val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath" - - val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord" - val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout" + + private val sqlConfEntries = java.util.Collections.synchronizedMap( + new java.util.HashMap[String, SQLConfEntry[_]]()) + + /** + * An entry contains all meta information for a configuration. + * + * @param key the key for the configuration + * @param defaultValue the default value for the configuration + * @param valueConverter how to convert a string to the value. It should throw an exception if the + * string does not have the required format. + * @param stringConverter how to convert a value to a string that the user can use it as a valid + * string value. It's usually `toString`. But sometimes, a custom converter + * is necessary. E.g., if T is List[String], `a, b, c` is better than + * `List(a, b, c)`. + * @param doc the document for the configuration + * @param isPublic if this configuration is public to the user. If it's `false`, this + * configuration is only used internally and we should not expose it to the user. + * @tparam T the value type + */ + private[sql] class SQLConfEntry[T] private( + val key: String, + val defaultValue: Option[T], + val valueConverter: String => T, + val stringConverter: T => String, + val doc: String, + val isPublic: Boolean) { + + def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("<undefined>") + + override def toString: String = { + s"SQLConfEntry(key = $key, defaultValue=$defaultValueString, doc=$doc, isPublic = $isPublic)" + } + } + + private[sql] object SQLConfEntry { + + private def apply[T]( + key: String, + defaultValue: Option[T], + valueConverter: String => T, + stringConverter: T => String, + doc: String, + isPublic: Boolean): SQLConfEntry[T] = + sqlConfEntries.synchronized { + if (sqlConfEntries.containsKey(key)) { + throw new IllegalArgumentException(s"Duplicate SQLConfEntry. $key has been registered") + } + val entry = + new SQLConfEntry[T](key, defaultValue, valueConverter, stringConverter, doc, isPublic) + sqlConfEntries.put(key, entry) + entry + } + + def intConf( + key: String, + defaultValue: Option[Int] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Int] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toInt + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be int, but was $v") + } + }, _.toString, doc, isPublic) + + def longConf( + key: String, + defaultValue: Option[Long] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Long] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toLong + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be long, but was $v") + } + }, _.toString, doc, isPublic) + + def doubleConf( + key: String, + defaultValue: Option[Double] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Double] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toDouble + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be double, but was $v") + } + }, _.toString, doc, isPublic) + + def booleanConf( + key: String, + defaultValue: Option[Boolean] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Boolean] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toBoolean + } catch { + case _: IllegalArgumentException => + throw new IllegalArgumentException(s"$key should be boolean, but was $v") + } + }, _.toString, doc, isPublic) + + def stringConf( + key: String, + defaultValue: Option[String] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[String] = + SQLConfEntry(key, defaultValue, v => v, v => v, doc, isPublic) + + def enumConf[T]( + key: String, + valueConverter: String => T, + validValues: Set[T], + defaultValue: Option[T] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[T] = + SQLConfEntry(key, defaultValue, v => { + val _v = valueConverter(v) + if (!validValues.contains(_v)) { + throw new IllegalArgumentException( + s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v") + } + _v + }, _.toString, doc, isPublic) + + def seqConf[T]( + key: String, + valueConverter: String => T, + defaultValue: Option[Seq[T]] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Seq[T]] = { + SQLConfEntry( + key, defaultValue, _.split(",").map(valueConverter), _.mkString(","), doc, isPublic) + } + + def stringSeqConf( + key: String, + defaultValue: Option[Seq[String]] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Seq[String]] = { + seqConf(key, s => s, defaultValue, doc, isPublic) + } + } + + import SQLConfEntry._ + + val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed", + defaultValue = Some(true), + doc = "When set to true Spark SQL will automatically select a compression codec for each " + + "column based on statistics of the data.") + + val COLUMN_BATCH_SIZE = intConf("spark.sql.inMemoryColumnarStorage.batchSize", + defaultValue = Some(10000), + doc = "Controls the size of batches for columnar caching. Larger batch sizes can improve " + + "memory utilization and compression, but risk OOMs when caching data.") + + val IN_MEMORY_PARTITION_PRUNING = + booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning", + defaultValue = Some(false), + doc = "<TODO>") + + val AUTO_BROADCASTJOIN_THRESHOLD = intConf("spark.sql.autoBroadcastJoinThreshold", + defaultValue = Some(10 * 1024 * 1024), + doc = "Configures the maximum size in bytes for a table that will be broadcast to all worker " + + "nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " + + "Note that currently statistics are only supported for Hive Metastore tables where the " + + "command<code>ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan</code> has been run.") + + val DEFAULT_SIZE_IN_BYTES = longConf("spark.sql.defaultSizeInBytes", isPublic = false) + + val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions", + defaultValue = Some(200), + doc = "Configures the number of partitions to use when shuffling data for joins or " + + "aggregations.") + + val CODEGEN_ENABLED = booleanConf("spark.sql.codegen", + defaultValue = Some(true), + doc = "When true, code will be dynamically generated at runtime for expression evaluation in" + + " a specific query. For some queries with complicated expression this option can lead to " + + "significant speed-ups. However, for simple queries this can actually slow down query " + + "execution.") + + val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled", + defaultValue = Some(false), + doc = "<TDDO>") + + val DIALECT = stringConf("spark.sql.dialect", defaultValue = Some("sql"), doc = "<TODO>") + + val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive", + defaultValue = Some(true), + doc = "<TODO>") + + val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString", + defaultValue = Some(false), + doc = "Some other Parquet-producing systems, in particular Impala and older versions of " + + "Spark SQL, do not differentiate between binary data and strings when writing out the " + + "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " + + "compatibility with these systems.") + + val PARQUET_INT96_AS_TIMESTAMP = booleanConf("spark.sql.parquet.int96AsTimestamp", + defaultValue = Some(true), + doc = "Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " + + "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " + + "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " + + "provide compatibility with these systems.") + + val PARQUET_CACHE_METADATA = booleanConf("spark.sql.parquet.cacheMetadata", + defaultValue = Some(true), + doc = "Turns on caching of Parquet schema metadata. Can speed up querying of static data.") + + val PARQUET_COMPRESSION = enumConf("spark.sql.parquet.compression.codec", + valueConverter = v => v.toLowerCase, + validValues = Set("uncompressed", "snappy", "gzip", "lzo"), + defaultValue = Some("gzip"), + doc = "Sets the compression codec use when writing Parquet files. Acceptable values include: " + + "uncompressed, snappy, gzip, lzo.") + + val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown", + defaultValue = Some(false), + doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default" + + " because of a known bug in Paruet 1.6.0rc3 " + + "(<a href=\"https://issues.apache.org/jira/browse/PARQUET-136\">PARQUET-136</a>). However, " + + "if your table doesn't contain any nullable string or binary columns, it's still safe to " + + "turn this feature on.") + + val PARQUET_USE_DATA_SOURCE_API = booleanConf("spark.sql.parquet.useDataSourceApi", + defaultValue = Some(true), + doc = "<TODO>") + + val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown", + defaultValue = Some(false), + doc = "<TODO>") + + val HIVE_VERIFY_PARTITIONPATH = booleanConf("spark.sql.hive.verifyPartitionPath", + defaultValue = Some(true), + doc = "<TODO>") + + val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord", + defaultValue = Some("_corrupt_record"), + doc = "<TODO>") + + val BROADCAST_TIMEOUT = intConf("spark.sql.broadcastTimeout", + defaultValue = Some(5 * 60), + doc = "<TODO>") // Options that control which operators can be chosen by the query planner. These should be // considered hints and may be ignored by future versions of Spark SQL. - val EXTERNAL_SORT = "spark.sql.planner.externalSort" - val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin" + val EXTERNAL_SORT = booleanConf("spark.sql.planner.externalSort", + defaultValue = Some(true), + doc = "When true, performs sorts spilling to disk as needed otherwise sort each partition in" + + " memory.") + + val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin", + defaultValue = Some(false), + doc = "<TODO>") // This is only used for the thriftserver - val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool" - val THRIFTSERVER_UI_STATEMENT_LIMIT = "spark.sql.thriftserver.ui.retainedStatements" - val THRIFTSERVER_UI_SESSION_LIMIT = "spark.sql.thriftserver.ui.retainedSessions" + val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool", + doc = "Set a Fair Scheduler pool for a JDBC client session") + + val THRIFTSERVER_UI_STATEMENT_LIMIT = intConf("spark.sql.thriftserver.ui.retainedStatements", + defaultValue = Some(200), + doc = "<TODO>") + + val THRIFTSERVER_UI_SESSION_LIMIT = intConf("spark.sql.thriftserver.ui.retainedSessions", + defaultValue = Some(200), + doc = "<TODO>") // This is used to set the default data source - val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default" + val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default", + defaultValue = Some("org.apache.spark.sql.parquet"), + doc = "<TODO>") + // This is used to control the when we will split a schema's JSON string to multiple pieces // in order to fit the JSON string in metastore's table property (by default, the value has // a length restriction of 4000 characters). We will split the JSON string of a schema // to its length exceeds the threshold. - val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold" + val SCHEMA_STRING_LENGTH_THRESHOLD = intConf("spark.sql.sources.schemaStringLengthThreshold", + defaultValue = Some(4000), + doc = "<TODO>") // Whether to perform partition discovery when loading external data sources. Default to true. - val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled" + val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled", + defaultValue = Some(true), + doc = "<TODO>") // Whether to perform partition column type inference. Default to true. - val PARTITION_COLUMN_TYPE_INFERENCE = "spark.sql.sources.partitionColumnTypeInference.enabled" + val PARTITION_COLUMN_TYPE_INFERENCE = + booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled", + defaultValue = Some(true), + doc = "<TODO>") // The output committer class used by FSBasedRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf` - val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass" + val OUTPUT_COMMITTER_CLASS = + stringConf("spark.sql.sources.outputCommitterClass", isPublic = false) // Whether to perform eager analysis when constructing a dataframe. // Set to false when debugging requires the ability to look at invalid query plans. - val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis" + val DATAFRAME_EAGER_ANALYSIS = booleanConf("spark.sql.eagerAnalysis", + defaultValue = Some(true), + doc = "<TODO>") // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231. - val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = "spark.sql.selfJoinAutoResolveAmbiguity" + val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = + booleanConf("spark.sql.selfJoinAutoResolveAmbiguity", defaultValue = Some(true), doc = "<TODO>") // Whether to retain group by columns or not in GroupedData.agg. - val DATAFRAME_RETAIN_GROUP_COLUMNS = "spark.sql.retainGroupColumns" + val DATAFRAME_RETAIN_GROUP_COLUMNS = booleanConf("spark.sql.retainGroupColumns", + defaultValue = Some(true), + doc = "<TODO>") - val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2" + val USE_SQL_SERIALIZER2 = booleanConf("spark.sql.useSerializer2", + defaultValue = Some(true), doc = "<TODO>") - val USE_JACKSON_STREAMING_API = "spark.sql.json.useJacksonStreamingAPI" + val USE_JACKSON_STREAMING_API = booleanConf("spark.sql.json.useJacksonStreamingAPI", + defaultValue = Some(true), doc = "<TODO>") object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" @@ -131,56 +390,54 @@ private[sql] class SQLConf extends Serializable with CatalystConf { * Note that the choice of dialect does not affect things like what tables are available or * how query execution is performed. */ - private[spark] def dialect: String = getConf(DIALECT, "sql") + private[spark] def dialect: String = getConf(DIALECT) /** When true tables cached using the in-memory columnar caching will be compressed. */ - private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "true").toBoolean + private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED) /** The compression codec for writing to a Parquetfile */ - private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "gzip") + private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) + + private[spark] def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) /** The number of rows that will be */ - private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "10000").toInt + private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) /** Number of partitions to use for shuffle operators. */ - private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt + private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) /** When true predicates will be passed to the parquet record reader when possible. */ - private[spark] def parquetFilterPushDown = - getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean + private[spark] def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED) /** When true uses Parquet implementation based on data source API */ - private[spark] def parquetUseDataSourceApi = - getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean + private[spark] def parquetUseDataSourceApi: Boolean = getConf(PARQUET_USE_DATA_SOURCE_API) - private[spark] def orcFilterPushDown = - getConf(ORC_FILTER_PUSHDOWN_ENABLED, "false").toBoolean + private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) /** When true uses verifyPartitionPath to prune the path which is not exists. */ - private[spark] def verifyPartitionPath = - getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean + private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITIONPATH) /** When true the planner will use the external sort, which may spill to disk. */ - private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "true").toBoolean + private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT) /** * Sort merge join would sort the two side of join first, and then iterate both sides together * only once to get all matches. Using sort merge join can save a lot of memory usage compared * to HashJoin. */ - private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN, "false").toBoolean + private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN) /** * When set to true, Spark SQL will use the Janino at runtime to generate custom bytecode * that evaluates expressions found in queries. In general this custom code runs much faster * than interpreted evaluation, but there are some start-up costs (5-10ms) due to compilation. */ - private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "true").toBoolean + private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED) /** * caseSensitive analysis true by default */ - def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, "true").toBoolean + def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) /** * When set to true, Spark SQL will use managed memory for certain operations. This option only @@ -188,15 +445,14 @@ private[sql] class SQLConf extends Serializable with CatalystConf { * * Defaults to false as this feature is currently experimental. */ - private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, "false").toBoolean + private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED) - private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean + private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2) /** * Selects between the new (true) and old (false) JSON handlers, to be removed in Spark 1.5.0 */ - private[spark] def useJacksonStreamingAPI: Boolean = - getConf(USE_JACKSON_STREAMING_API, "true").toBoolean + private[spark] def useJacksonStreamingAPI: Boolean = getConf(USE_JACKSON_STREAMING_API) /** * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to @@ -205,8 +461,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf { * * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is 10000. */ - private[spark] def autoBroadcastJoinThreshold: Int = - getConf(AUTO_BROADCASTJOIN_THRESHOLD, (10 * 1024 * 1024).toString).toInt + private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD) /** * The default size in bytes to assign to a logical operator's estimation statistics. By default, @@ -215,82 +470,116 @@ private[sql] class SQLConf extends Serializable with CatalystConf { * in joins. */ private[spark] def defaultSizeInBytes: Long = - getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong + getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L) /** * When set to true, we always treat byte arrays in Parquet files as strings. */ - private[spark] def isParquetBinaryAsString: Boolean = - getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean + private[spark] def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING) /** * When set to true, we always treat INT96Values in Parquet files as timestamp. */ - private[spark] def isParquetINT96AsTimestamp: Boolean = - getConf(PARQUET_INT96_AS_TIMESTAMP, "true").toBoolean + private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP) /** * When set to true, partition pruning for in-memory columnar tables is enabled. */ - private[spark] def inMemoryPartitionPruning: Boolean = - getConf(IN_MEMORY_PARTITION_PRUNING, "false").toBoolean + private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) - private[spark] def columnNameOfCorruptRecord: String = - getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record") + private[spark] def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD) /** * Timeout in seconds for the broadcast wait time in hash join */ - private[spark] def broadcastTimeout: Int = - getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt + private[spark] def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT) - private[spark] def defaultDataSourceName: String = - getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet") + private[spark] def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) - private[spark] def partitionDiscoveryEnabled() = - getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean + private[spark] def partitionDiscoveryEnabled(): Boolean = + getConf(SQLConf.PARTITION_DISCOVERY_ENABLED) - private[spark] def partitionColumnTypeInferenceEnabled() = - getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE, "true").toBoolean + private[spark] def partitionColumnTypeInferenceEnabled(): Boolean = + getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE) // Do not use a value larger than 4000 as the default value of this property. // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information. - private[spark] def schemaStringLengthThreshold: Int = - getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt + private[spark] def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD) - private[spark] def dataFrameEagerAnalysis: Boolean = - getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean + private[spark] def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS) private[spark] def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = - getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY, "true").toBoolean + getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) - private[spark] def dataFrameRetainGroupColumns: Boolean = - getConf(DATAFRAME_RETAIN_GROUP_COLUMNS, "true").toBoolean + private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS) /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ def setConf(props: Properties): Unit = settings.synchronized { - props.foreach { case (k, v) => settings.put(k, v) } + props.foreach { case (k, v) => setConfString(k, v) } } - /** Set the given Spark SQL configuration property. */ - def setConf(key: String, value: String): Unit = { + /** Set the given Spark SQL configuration property using a `string` value. */ + def setConfString(key: String, value: String): Unit = { require(key != null, "key cannot be null") require(value != null, s"value cannot be null for key: $key") + val entry = sqlConfEntries.get(key) + if (entry != null) { + // Only verify configs in the SQLConf object + entry.valueConverter(value) + } settings.put(key, value) } + /** Set the given Spark SQL configuration property. */ + def setConf[T](entry: SQLConfEntry[T], value: T): Unit = { + require(entry != null, "entry cannot be null") + require(value != null, s"value cannot be null for key: ${entry.key}") + require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") + settings.put(entry.key, entry.stringConverter(value)) + } + /** Return the value of Spark SQL configuration property for the given key. */ - def getConf(key: String): String = { - Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key)) + def getConfString(key: String): String = { + Option(settings.get(key)). + orElse { + // Try to use the default value + Option(sqlConfEntries.get(key)).map(_.defaultValueString) + }. + getOrElse(throw new NoSuchElementException(key)) + } + + /** + * Return the value of Spark SQL configuration property for the given key. If the key is not set + * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the + * desired one. + */ + def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = { + require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") + Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue) } /** * Return the value of Spark SQL configuration property for the given key. If the key is not set - * yet, return `defaultValue`. + * yet, return `defaultValue` in [[SQLConfEntry]]. + */ + def getConf[T](entry: SQLConfEntry[T]): T = { + require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") + Option(settings.get(entry.key)).map(entry.valueConverter).orElse(entry.defaultValue). + getOrElse(throw new NoSuchElementException(entry.key)) + } + + /** + * Return the `string` value of Spark SQL configuration property for the given key. If the key is + * not set yet, return `defaultValue`. */ - def getConf(key: String, defaultValue: String): String = { + def getConfString(key: String, defaultValue: String): String = { + val entry = sqlConfEntries.get(key) + if (entry != null && defaultValue != "<undefined>") { + // Only verify configs in the SQLConf object + entry.valueConverter(defaultValue) + } Option(settings.get(key)).getOrElse(defaultValue) } @@ -300,11 +589,25 @@ private[sql] class SQLConf extends Serializable with CatalystConf { */ def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap } - private[spark] def unsetConf(key: String) { + /** + * Return all the configuration definitions that have been defined in [[SQLConf]]. Each + * definition contains key, defaultValue and doc. + */ + def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized { + sqlConfEntries.values.filter(_.isPublic).map { entry => + (entry.key, entry.defaultValueString, entry.doc) + }.toSeq + } + + private[spark] def unsetConf(key: String): Unit = { settings -= key } - private[spark] def clear() { + private[spark] def unsetConf(entry: SQLConfEntry[_]): Unit = { + settings -= entry.key + } + + private[spark] def clear(): Unit = { settings.clear() } } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6b605f7..04fc798 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -31,6 +31,7 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLConf.SQLConfEntry import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.catalyst.expressions._ @@ -79,13 +80,16 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def setConf(props: Properties): Unit = conf.setConf(props) + /** Set the given Spark SQL configuration property. */ + private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = conf.setConf(entry, value) + /** * Set the given Spark SQL configuration property. * * @group config * @since 1.0.0 */ - def setConf(key: String, value: String): Unit = conf.setConf(key, value) + def setConf(key: String, value: String): Unit = conf.setConfString(key, value) /** * Return the value of Spark SQL configuration property for the given key. @@ -93,7 +97,22 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group config * @since 1.0.0 */ - def getConf(key: String): String = conf.getConf(key) + def getConf(key: String): String = conf.getConfString(key) + + /** + * Return the value of Spark SQL configuration property for the given key. If the key is not set + * yet, return `defaultValue` in [[SQLConfEntry]]. + */ + private[sql] def getConf[T](entry: SQLConfEntry[T]): T = conf.getConf(entry) + + /** + * Return the value of Spark SQL configuration property for the given key. If the key is not set + * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the + * desired one. + */ + private[sql] def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = { + conf.getConf(entry, defaultValue) + } /** * Return the value of Spark SQL configuration property for the given key. If the key is not set @@ -102,7 +121,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group config * @since 1.0.0 */ - def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue) + def getConf(key: String, defaultValue: String): String = conf.getConfString(key, defaultValue) /** * Return all the configuration properties that have been set (i.e. not the default). http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala index 305b306..e59fa6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala @@ -44,8 +44,8 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr private val pair: Parser[LogicalPlan] = (key ~ ("=".r ~> value).?).? ^^ { - case None => SetCommand(None, output) - case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)), output) + case None => SetCommand(None) + case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim))) } def apply(input: String): LogicalPlan = parseAll(pair, input) match { http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index c9dfcea..5e9951f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import java.util.NoSuchElementException + import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD @@ -75,48 +77,92 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan * :: DeveloperApi :: */ @DeveloperApi -case class SetCommand( - kv: Option[(String, Option[String])], - override val output: Seq[Attribute]) - extends RunnableCommand with Logging { +case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableCommand with Logging { + + private def keyValueOutput: Seq[Attribute] = { + val schema = StructType( + StructField("key", StringType, false) :: + StructField("value", StringType, false) :: Nil) + schema.toAttributes + } - override def run(sqlContext: SQLContext): Seq[Row] = kv match { + private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match { // Configures the deprecated "mapred.reduce.tasks" property. case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) => - logWarning( - s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + - s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") - if (value.toInt < 1) { - val msg = s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " + - "determining the number of reducers is not supported." - throw new IllegalArgumentException(msg) - } else { - sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value) - Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value")) + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") + if (value.toInt < 1) { + val msg = + s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " + + "determining the number of reducers is not supported." + throw new IllegalArgumentException(msg) + } else { + sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value) + Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value)) + } } + (keyValueOutput, runFunc) // Configures a single property. case Some((key, Some(value))) => - sqlContext.setConf(key, value) - Seq(Row(s"$key=$value")) + val runFunc = (sqlContext: SQLContext) => { + sqlContext.setConf(key, value) + Seq(Row(key, value)) + } + (keyValueOutput, runFunc) - // Queries all key-value pairs that are set in the SQLConf of the sqlContext. - // Notice that different from Hive, here "SET -v" is an alias of "SET". // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.) - case Some(("-v", None)) | None => - sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq + // Queries all key-value pairs that are set in the SQLConf of the sqlContext. + case None => + val runFunc = (sqlContext: SQLContext) => { + sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq + } + (keyValueOutput, runFunc) + + // Queries all properties along with their default values and docs that are defined in the + // SQLConf of the sqlContext. + case Some(("-v", None)) => + val runFunc = (sqlContext: SQLContext) => { + sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) => + Row(key, defaultValue, doc) + } + } + val schema = StructType( + StructField("key", StringType, false) :: + StructField("default", StringType, false) :: + StructField("meaning", StringType, false) :: Nil) + (schema.toAttributes, runFunc) // Queries the deprecated "mapred.reduce.tasks" property. case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) => - logWarning( - s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + - s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.") - Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}")) + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + + s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") + Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString)) + } + (keyValueOutput, runFunc) // Queries a single property. case Some((key, None)) => - Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}")) + val runFunc = (sqlContext: SQLContext) => { + val value = + try { + sqlContext.getConf(key) + } catch { + case _: NoSuchElementException => "<undefined>" + } + Seq(Row(key, value)) + } + (keyValueOutput, runFunc) } + + override val output: Seq[Attribute] = _output + + override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext) + } /** http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 3ee4033..2964eda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -48,7 +48,7 @@ package object debug { */ implicit class DebugSQLContext(sqlContext: SQLContext) { def debug(): Unit = { - sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false") + sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false) } } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 39360e1..65ecad9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -113,12 +113,12 @@ private[sql] case class ParquetTableScan( .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata - conf.set( - SQLConf.PARQUET_CACHE_METADATA, - sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true")) + conf.setBoolean( + SQLConf.PARQUET_CACHE_METADATA.key, + sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, true)) // Use task side metadata in parquet - conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true); + conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) val baseRDD = new org.apache.spark.rdd.NewHadoopRDD( http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index bba6f1e..4c702c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -220,7 +220,7 @@ private[sql] class ParquetRelation2( } conf.setClass( - SQLConf.OUTPUT_COMMITTER_CLASS, + SQLConf.OUTPUT_COMMITTER_CLASS.key, committerClass, classOf[ParquetOutputCommitter]) @@ -259,7 +259,7 @@ private[sql] class ParquetRelation2( filters: Array[Filter], inputFiles: Array[FileStatus], broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = { - val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean + val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA) val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown // Create the function to set variable Parquet confs at both driver and executor side. val initLocalJobFuncOpt = @@ -498,7 +498,7 @@ private[sql] object ParquetRelation2 extends Logging { ParquetTypesConverter.convertToString(dataSchema.toAttributes)) // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata - conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString) + conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) } /** This closure sets input paths at the driver side. */ http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 3dbe6fa..d39a20b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -323,7 +323,7 @@ private[sql] abstract class BaseWriterContainer( private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { val committerClass = context.getConfiguration.getClass( - SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter]) + SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) Option(committerClass).map { clazz => logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala index 356a610..9fa3945 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -38,7 +38,7 @@ class LocalSQLContext protected[sql] class SQLSession extends super.SQLSession { protected[sql] override lazy val conf: SQLConf = new SQLConf { /** Fewer partitions to speed up testing. */ - override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt + override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, 5) } } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 790b405..b26d3ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -68,12 +68,12 @@ class DataFrameAggregateSuite extends QueryTest { Seq(Row(1, 3), Row(2, 3), Row(3, 3)) ) - ctx.conf.setConf("spark.sql.retainGroupColumns", "false") + ctx.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, false) checkAnswer( testData2.groupBy("a").agg(sum($"b")), Seq(Row(3), Row(3), Row(3)) ) - ctx.conf.setConf("spark.sql.retainGroupColumns", "true") + ctx.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, true) } test("agg without groups") { http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index fa98e23..ba1d020 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -33,7 +33,7 @@ class DataFrameSuite extends QueryTest { test("analysis error should be eagerly reported") { val oldSetting = ctx.conf.dataFrameEagerAnalysis // Eager analysis. - ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true") + ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, true) intercept[Exception] { testData.select('nonExistentName) } intercept[Exception] { @@ -47,11 +47,11 @@ class DataFrameSuite extends QueryTest { } // No more eager analysis once the flag is turned off - ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false") + ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false) testData.select('nonExistentName) // Set the flag back to original value before this test. - ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) + ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting) } test("dataframe toString") { @@ -70,7 +70,7 @@ class DataFrameSuite extends QueryTest { test("invalid plan toString, debug mode") { val oldSetting = ctx.conf.dataFrameEagerAnalysis - ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true") + ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, true) // Turn on debug mode so we can see invalid query plans. import org.apache.spark.sql.execution.debug._ @@ -83,7 +83,7 @@ class DataFrameSuite extends QueryTest { badPlan.toString) // Set the flag back to original value before this test. - ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) + ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting) } test("access complex data") { @@ -556,13 +556,13 @@ class DataFrameSuite extends QueryTest { test("SPARK-6899") { val originalValue = ctx.conf.codegenEnabled - ctx.setConf(SQLConf.CODEGEN_ENABLED, "true") + ctx.setConf(SQLConf.CODEGEN_ENABLED, true) try{ checkAnswer( decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2.0))) } finally { - ctx.setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString) + ctx.setConf(SQLConf.CODEGEN_ENABLED, originalValue) } } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index ffd26c4..20390a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -95,14 +95,14 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { classOf[BroadcastNestedLoopJoin]) ).foreach { case (query, joinClass) => assertJoin(query, joinClass) } try { - ctx.conf.setConf("spark.sql.planner.sortMergeJoin", "true") + ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true) Seq( ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]), ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]), ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]) ).foreach { case (query, joinClass) => assertJoin(query, joinClass) } } finally { - ctx.conf.setConf("spark.sql.planner.sortMergeJoin", SORTMERGEJOIN_ENABLED.toString) + ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED) } } @@ -118,7 +118,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { classOf[BroadcastHashJoin]) ).foreach { case (query, joinClass) => assertJoin(query, joinClass) } try { - ctx.conf.setConf("spark.sql.planner.sortMergeJoin", "true") + ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true) Seq( ("SELECT * FROM testData join testData2 ON key = a", classOf[BroadcastHashJoin]), ("SELECT * FROM testData join testData2 ON key = a and key = 2", @@ -127,7 +127,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { classOf[BroadcastHashJoin]) ).foreach { case (query, joinClass) => assertJoin(query, joinClass) } } finally { - ctx.conf.setConf("spark.sql.planner.sortMergeJoin", SORTMERGEJOIN_ENABLED.toString) + ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED) } ctx.sql("UNCACHE TABLE testData") @@ -416,7 +416,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { ctx.sql("CACHE TABLE testData") val tmp = ctx.conf.autoBroadcastJoinThreshold - ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000") + ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=1000000000") Seq( ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[BroadcastLeftSemiJoinHash]) @@ -424,7 +424,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { case (query, joinClass) => assertJoin(query, joinClass) } - ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1") + ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1") Seq( ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash]) @@ -432,7 +432,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { case (query, joinClass) => assertJoin(query, joinClass) } - ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp.toString) + ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp) ctx.sql("UNCACHE TABLE testData") } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala new file mode 100644 index 0000000..2e33777 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.SQLConf._ + +class SQLConfEntrySuite extends SparkFunSuite { + + val conf = new SQLConf + + test("intConf") { + val key = "spark.sql.SQLConfEntrySuite.int" + val confEntry = SQLConfEntry.intConf(key) + assert(conf.getConf(confEntry, 5) === 5) + + conf.setConf(confEntry, 10) + assert(conf.getConf(confEntry, 5) === 10) + + conf.setConfString(key, "20") + assert(conf.getConfString(key, "5") === "20") + assert(conf.getConfString(key) === "20") + assert(conf.getConf(confEntry, 5) === 20) + + val e = intercept[IllegalArgumentException] { + conf.setConfString(key, "abc") + } + assert(e.getMessage === s"$key should be int, but was abc") + } + + test("longConf") { + val key = "spark.sql.SQLConfEntrySuite.long" + val confEntry = SQLConfEntry.longConf(key) + assert(conf.getConf(confEntry, 5L) === 5L) + + conf.setConf(confEntry, 10L) + assert(conf.getConf(confEntry, 5L) === 10L) + + conf.setConfString(key, "20") + assert(conf.getConfString(key, "5") === "20") + assert(conf.getConfString(key) === "20") + assert(conf.getConf(confEntry, 5L) === 20L) + + val e = intercept[IllegalArgumentException] { + conf.setConfString(key, "abc") + } + assert(e.getMessage === s"$key should be long, but was abc") + } + + test("booleanConf") { + val key = "spark.sql.SQLConfEntrySuite.boolean" + val confEntry = SQLConfEntry.booleanConf(key) + assert(conf.getConf(confEntry, false) === false) + + conf.setConf(confEntry, true) + assert(conf.getConf(confEntry, false) === true) + + conf.setConfString(key, "true") + assert(conf.getConfString(key, "false") === "true") + assert(conf.getConfString(key) === "true") + assert(conf.getConf(confEntry, false) === true) + + val e = intercept[IllegalArgumentException] { + conf.setConfString(key, "abc") + } + assert(e.getMessage === s"$key should be boolean, but was abc") + } + + test("doubleConf") { + val key = "spark.sql.SQLConfEntrySuite.double" + val confEntry = SQLConfEntry.doubleConf(key) + assert(conf.getConf(confEntry, 5.0) === 5.0) + + conf.setConf(confEntry, 10.0) + assert(conf.getConf(confEntry, 5.0) === 10.0) + + conf.setConfString(key, "20.0") + assert(conf.getConfString(key, "5.0") === "20.0") + assert(conf.getConfString(key) === "20.0") + assert(conf.getConf(confEntry, 5.0) === 20.0) + + val e = intercept[IllegalArgumentException] { + conf.setConfString(key, "abc") + } + assert(e.getMessage === s"$key should be double, but was abc") + } + + test("stringConf") { + val key = "spark.sql.SQLConfEntrySuite.string" + val confEntry = SQLConfEntry.stringConf(key) + assert(conf.getConf(confEntry, "abc") === "abc") + + conf.setConf(confEntry, "abcd") + assert(conf.getConf(confEntry, "abc") === "abcd") + + conf.setConfString(key, "abcde") + assert(conf.getConfString(key, "abc") === "abcde") + assert(conf.getConfString(key) === "abcde") + assert(conf.getConf(confEntry, "abc") === "abcde") + } + + test("enumConf") { + val key = "spark.sql.SQLConfEntrySuite.enum" + val confEntry = SQLConfEntry.enumConf(key, v => v, Set("a", "b", "c"), defaultValue = Some("a")) + assert(conf.getConf(confEntry) === "a") + + conf.setConf(confEntry, "b") + assert(conf.getConf(confEntry) === "b") + + conf.setConfString(key, "c") + assert(conf.getConfString(key, "a") === "c") + assert(conf.getConfString(key) === "c") + assert(conf.getConf(confEntry) === "c") + + val e = intercept[IllegalArgumentException] { + conf.setConfString(key, "d") + } + assert(e.getMessage === s"The value of $key should be one of a, b, c, but was d") + } + + test("stringSeqConf") { + val key = "spark.sql.SQLConfEntrySuite.stringSeq" + val confEntry = SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq", + defaultValue = Some(Nil)) + assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c")) + + conf.setConf(confEntry, Seq("a", "b", "c", "d")) + assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d")) + + conf.setConfString(key, "a,b,c,d,e") + assert(conf.getConfString(key, "a,b,c") === "a,b,c,d,e") + assert(conf.getConfString(key) === "a,b,c,d,e") + assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e")) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala index 76d0dd1..75791e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala @@ -75,6 +75,14 @@ class SQLConfSuite extends QueryTest { test("deprecated property") { ctx.conf.clear() ctx.sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10") - assert(ctx.getConf(SQLConf.SHUFFLE_PARTITIONS) === "10") + assert(ctx.conf.numShufflePartitions === 10) + } + + test("invalid conf value") { + ctx.conf.clear() + val e = intercept[IllegalArgumentException] { + ctx.sql(s"set ${SQLConf.CASE_SENSITIVE.key}=10") + } + assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10") } } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 30db840..82f3fdb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -190,7 +190,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { test("aggregation with codegen") { val originalValue = sqlContext.conf.codegenEnabled - sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true") + sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true) // Prepare a table that we can group some rows. sqlContext.table("testData") .unionAll(sqlContext.table("testData")) @@ -287,7 +287,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { Row(0, null, 0) :: Nil) } finally { sqlContext.dropTempTable("testData3x") - sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString) + sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue) } } @@ -480,41 +480,41 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { test("sorting") { val before = sqlContext.conf.externalSortEnabled - sqlContext.setConf(SQLConf.EXTERNAL_SORT, "false") + sqlContext.setConf(SQLConf.EXTERNAL_SORT, false) sortTest() - sqlContext.setConf(SQLConf.EXTERNAL_SORT, before.toString) + sqlContext.setConf(SQLConf.EXTERNAL_SORT, before) } test("external sorting") { val before = sqlContext.conf.externalSortEnabled - sqlContext.setConf(SQLConf.EXTERNAL_SORT, "true") + sqlContext.setConf(SQLConf.EXTERNAL_SORT, true) sortTest() - sqlContext.setConf(SQLConf.EXTERNAL_SORT, before.toString) + sqlContext.setConf(SQLConf.EXTERNAL_SORT, before) } test("SPARK-6927 sorting with codegen on") { val externalbefore = sqlContext.conf.externalSortEnabled val codegenbefore = sqlContext.conf.codegenEnabled - sqlContext.setConf(SQLConf.EXTERNAL_SORT, "false") - sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true") + sqlContext.setConf(SQLConf.EXTERNAL_SORT, false) + sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true) try{ sortTest() } finally { - sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore.toString) - sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore.toString) + sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore) + sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore) } } test("SPARK-6927 external sorting with codegen on") { val externalbefore = sqlContext.conf.externalSortEnabled val codegenbefore = sqlContext.conf.codegenEnabled - sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true") - sqlContext.setConf(SQLConf.EXTERNAL_SORT, "true") + sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true) + sqlContext.setConf(SQLConf.EXTERNAL_SORT, true) try { sortTest() } finally { - sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore.toString) - sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore.toString) + sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore) + sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore) } } @@ -908,25 +908,25 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { sql(s"SET $testKey=$testVal") checkAnswer( sql("SET"), - Row(s"$testKey=$testVal") + Row(testKey, testVal) ) sql(s"SET ${testKey + testKey}=${testVal + testVal}") checkAnswer( sql("set"), Seq( - Row(s"$testKey=$testVal"), - Row(s"${testKey + testKey}=${testVal + testVal}")) + Row(testKey, testVal), + Row(testKey + testKey, testVal + testVal)) ) // "set key" checkAnswer( sql(s"SET $testKey"), - Row(s"$testKey=$testVal") + Row(testKey, testVal) ) checkAnswer( sql(s"SET $nonexistentKey"), - Row(s"$nonexistentKey=<undefined>") + Row(nonexistentKey, "<undefined>") ) sqlContext.conf.clear() } @@ -1340,12 +1340,12 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { } test("SPARK-4699 case sensitivity SQL query") { - sqlContext.setConf(SQLConf.CASE_SENSITIVE, "false") + sqlContext.setConf(SQLConf.CASE_SENSITIVE, false) val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil val rdd = sqlContext.sparkContext.parallelize((0 to 1).map(i => data(i))) rdd.toDF().registerTempTable("testTable1") checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1")) - sqlContext.setConf(SQLConf.CASE_SENSITIVE, "true") + sqlContext.setConf(SQLConf.CASE_SENSITIVE, true) } test("SPARK-6145: ORDER BY test for nested fields") { http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala index 6545c6b..2c08799 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala @@ -32,7 +32,7 @@ class PartitionBatchPruningSuite extends SparkFunSuite with BeforeAndAfterAll wi override protected def beforeAll(): Unit = { // Make a table with 5 partitions, 2 batches per partition, 10 elements per batch - ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, "10") + ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, 10) val pruningData = ctx.sparkContext.makeRDD((1 to 100).map { key => val string = if (((key - 1) / 10) % 2 == 0) null else key.toString @@ -41,14 +41,14 @@ class PartitionBatchPruningSuite extends SparkFunSuite with BeforeAndAfterAll wi pruningData.registerTempTable("pruningData") // Enable in-memory partition pruning - ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, "true") + ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) // Enable in-memory table scan accumulators ctx.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true") } override protected def afterAll(): Unit = { - ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize.toString) - ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning.toString) + ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) + ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) } before { http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 3e27f58..5854ab4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -63,7 +63,7 @@ class PlannerSuite extends SparkFunSuite { test("sizeInBytes estimation of limit operator for broadcast hash join optimization") { def checkPlan(fieldTypes: Seq[DataType], newThreshold: Int): Unit = { - setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, newThreshold.toString) + setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, newThreshold) val fields = fieldTypes.zipWithIndex.map { case (dataType, index) => StructField(s"c${index}", dataType, true) } :+ StructField("key", IntegerType, true) @@ -119,12 +119,12 @@ class PlannerSuite extends SparkFunSuite { checkPlan(complexTypes, newThreshold = 901617) - setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString) + setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold) } test("InMemoryRelation statistics propagation") { val origThreshold = conf.autoBroadcastJoinThreshold - setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString) + setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920) testData.limit(3).registerTempTable("tiny") sql("CACHE TABLE tiny") @@ -139,6 +139,6 @@ class PlannerSuite extends SparkFunSuite { assert(broadcastHashJoins.size === 1, "Should use broadcast hash join") assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join") - setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString) + setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold) } } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index fca2436..945d437 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -1077,14 +1077,14 @@ class JsonSuite extends QueryTest with TestJsonData { } test("SPARK-7565 MapType in JsonRDD") { - val useStreaming = ctx.getConf(SQLConf.USE_JACKSON_STREAMING_API, "true") + val useStreaming = ctx.conf.useJacksonStreamingAPI val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed") val schemaWithSimpleMap = StructType( StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) try{ - for (useStreaming <- List("true", "false")) { + for (useStreaming <- List(true, false)) { ctx.setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming) val temp = Utils.createTempDir().getPath http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index fa5d4ec..a2763c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -51,7 +51,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { expected: Seq[Row]): Unit = { val output = predicate.collect { case a: Attribute => a }.distinct - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") { + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { val query = df .select(output.map(e => Column(e)): _*) .where(Column(predicate)) @@ -314,17 +314,17 @@ class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeA lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi override protected def beforeAll(): Unit = { - sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true) } override protected def afterAll(): Unit = { - sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) + sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf) } test("SPARK-6554: don't push down predicates which reference partition columns") { import sqlContext.implicits._ - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") { + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { withTempPath { dir => val path = s"${dir.getCanonicalPath}/part=1" (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) @@ -343,17 +343,17 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi override protected def beforeAll(): Unit = { - sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false") + sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false) } override protected def afterAll(): Unit = { - sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) + sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf) } test("SPARK-6742: don't push down predicates which reference partition columns") { import sqlContext.implicits._ - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") { + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { withTempPath { dir => val path = s"${dir.getCanonicalPath}/part=1" (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index fc827bc..284d99d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -94,8 +94,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { val data = (1 to 4).map(i => Tuple1(i.toString)) // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL // as we store Spark SQL schema in the extra metadata. - withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "false")(checkParquetFile(data)) - withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "true")(checkParquetFile(data)) + withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "false")(checkParquetFile(data)) + withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true")(checkParquetFile(data)) } test("fixed-length decimals") { @@ -231,7 +231,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { val data = (0 until 10).map(i => (i, i.toString)) def checkCompressionCodec(codec: CompressionCodecName): Unit = { - withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { withParquetFile(data) { path => assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) { compressionCodecFor(path) @@ -408,7 +408,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { val clonedConf = new Configuration(configuration) configuration.set( - SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName) + SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[ParquetOutputCommitter].getCanonicalName) configuration.set( "spark.sql.parquet.output.committer.class", @@ -440,11 +440,11 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi override protected def beforeAll(): Unit = { - sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true) } override protected def afterAll(): Unit = { - sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) + sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key, originalConf.toString) } test("SPARK-6330 regression test") { @@ -464,10 +464,10 @@ class ParquetDataSourceOffIOSuite extends ParquetIOSuiteBase with BeforeAndAfter private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi override protected def beforeAll(): Unit = { - sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false") + sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false) } override protected def afterAll(): Unit = { - sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) + sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf) } } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index be3b34d..fafad67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -128,11 +128,11 @@ class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with BeforeAnd private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi override protected def beforeAll(): Unit = { - sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true) } override protected def afterAll(): Unit = { - sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) + sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf) } } @@ -140,10 +140,10 @@ class ParquetDataSourceOffQuerySuite extends ParquetQuerySuiteBase with BeforeAn private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi override protected def beforeAll(): Unit = { - sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false") + sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false) } override protected def afterAll(): Unit = { - sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) + sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf) } } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala index 3f77960..00cc7d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala @@ -27,7 +27,7 @@ abstract class DataSourceTest extends QueryTest with BeforeAndAfter { // We want to test some edge cases. protected implicit lazy val caseInsensitiveContext = { val ctx = new SQLContext(TestSQLContext.sparkContext) - ctx.setConf(SQLConf.CASE_SENSITIVE, "false") + ctx.setConf(SQLConf.CASE_SENSITIVE, false) ctx } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index ac4a00a..fa01823 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -37,11 +37,11 @@ trait SQLTestUtils { */ protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { val (keys, values) = pairs.unzip - val currentValues = keys.map(key => Try(sqlContext.conf.getConf(key)).toOption) - (keys, values).zipped.foreach(sqlContext.conf.setConf) + val currentValues = keys.map(key => Try(sqlContext.conf.getConfString(key)).toOption) + (keys, values).zipped.foreach(sqlContext.conf.setConfString) try f finally { keys.zip(currentValues).foreach { - case (key, Some(value)) => sqlContext.conf.setConf(key, value) + case (key, Some(value)) => sqlContext.conf.setConfString(key, value) case (key, None) => sqlContext.conf.unsetConf(key) } } http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index c9da252..700d994 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -153,9 +153,9 @@ object HiveThriftServer2 extends Logging { val sessionList = new mutable.LinkedHashMap[String, SessionInfo] val executionList = new mutable.LinkedHashMap[String, ExecutionInfo] val retainedStatements = - conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT, "200").toInt + conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT) val retainedSessions = - conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT, "200").toInt + conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT) var totalRunning = 0 override def onJobStart(jobStart: SparkListenerJobStart): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index e071103..e875888 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -219,7 +219,7 @@ private[hive] class SparkExecuteStatementOperation( result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { - case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value))), _) => + case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => sessionToActivePool(parentSession.getSessionHandle) = value logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") case _ => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org