http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala new file mode 100644 index 0000000..c1e3f38 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -0,0 +1,730 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +import java.util.Properties + +import scala.collection.JavaConverters._ +import scala.collection.immutable + +import org.apache.parquet.hadoop.ParquetOutputCommitter + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.parser.ParserConf +import org.apache.spark.util.Utils + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// This file defines the configuration options for Spark SQL. +//////////////////////////////////////////////////////////////////////////////////////////////////// + + +object SQLConf { + + private val sqlConfEntries = java.util.Collections.synchronizedMap( + new java.util.HashMap[String, SQLConfEntry[_]]()) + + /** + * An entry contains all meta information for a configuration. + * + * @param key the key for the configuration + * @param defaultValue the default value for the configuration + * @param valueConverter how to convert a string to the value. It should throw an exception if the + * string does not have the required format. + * @param stringConverter how to convert a value to a string that the user can use it as a valid + * string value. It's usually `toString`. But sometimes, a custom converter + * is necessary. E.g., if T is List[String], `a, b, c` is better than + * `List(a, b, c)`. + * @param doc the document for the configuration + * @param isPublic if this configuration is public to the user. If it's `false`, this + * configuration is only used internally and we should not expose it to the user. + * @tparam T the value type + */ + class SQLConfEntry[T] private( + val key: String, + val defaultValue: Option[T], + val valueConverter: String => T, + val stringConverter: T => String, + val doc: String, + val isPublic: Boolean) { + + def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("<undefined>") + + override def toString: String = { + s"SQLConfEntry(key = $key, defaultValue=$defaultValueString, doc=$doc, isPublic = $isPublic)" + } + } + + object SQLConfEntry { + + private def apply[T]( + key: String, + defaultValue: Option[T], + valueConverter: String => T, + stringConverter: T => String, + doc: String, + isPublic: Boolean): SQLConfEntry[T] = + sqlConfEntries.synchronized { + if (sqlConfEntries.containsKey(key)) { + throw new IllegalArgumentException(s"Duplicate SQLConfEntry. $key has been registered") + } + val entry = + new SQLConfEntry[T](key, defaultValue, valueConverter, stringConverter, doc, isPublic) + sqlConfEntries.put(key, entry) + entry + } + + def intConf( + key: String, + defaultValue: Option[Int] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Int] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toInt + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be int, but was $v") + } + }, _.toString, doc, isPublic) + + def longConf( + key: String, + defaultValue: Option[Long] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Long] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toLong + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be long, but was $v") + } + }, _.toString, doc, isPublic) + + def longMemConf( + key: String, + defaultValue: Option[Long] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Long] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toLong + } catch { + case _: NumberFormatException => + try { + Utils.byteStringAsBytes(v) + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be long, but was $v") + } + } + }, _.toString, doc, isPublic) + + def doubleConf( + key: String, + defaultValue: Option[Double] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Double] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toDouble + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be double, but was $v") + } + }, _.toString, doc, isPublic) + + def booleanConf( + key: String, + defaultValue: Option[Boolean] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Boolean] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toBoolean + } catch { + case _: IllegalArgumentException => + throw new IllegalArgumentException(s"$key should be boolean, but was $v") + } + }, _.toString, doc, isPublic) + + def stringConf( + key: String, + defaultValue: Option[String] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[String] = + SQLConfEntry(key, defaultValue, v => v, v => v, doc, isPublic) + + def enumConf[T]( + key: String, + valueConverter: String => T, + validValues: Set[T], + defaultValue: Option[T] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[T] = + SQLConfEntry(key, defaultValue, v => { + val _v = valueConverter(v) + if (!validValues.contains(_v)) { + throw new IllegalArgumentException( + s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v") + } + _v + }, _.toString, doc, isPublic) + + def seqConf[T]( + key: String, + valueConverter: String => T, + defaultValue: Option[Seq[T]] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Seq[T]] = { + SQLConfEntry( + key, defaultValue, _.split(",").map(valueConverter), _.mkString(","), doc, isPublic) + } + + def stringSeqConf( + key: String, + defaultValue: Option[Seq[String]] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Seq[String]] = { + seqConf(key, s => s, defaultValue, doc, isPublic) + } + } + + import SQLConfEntry._ + + val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts", + defaultValue = Some(true), + doc = "When set to true, creating multiple SQLContexts/HiveContexts is allowed." + + "When set to false, only one SQLContext/HiveContext is allowed to be created " + + "through the constructor (new SQLContexts/HiveContexts created through newSession " + + "method is allowed). Please note that this conf needs to be set in Spark Conf. Once" + + "a SQLContext/HiveContext has been created, changing the value of this conf will not" + + "have effect.", + isPublic = true) + + val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed", + defaultValue = Some(true), + doc = "When set to true Spark SQL will automatically select a compression codec for each " + + "column based on statistics of the data.", + isPublic = false) + + val COLUMN_BATCH_SIZE = intConf("spark.sql.inMemoryColumnarStorage.batchSize", + defaultValue = Some(10000), + doc = "Controls the size of batches for columnar caching. Larger batch sizes can improve " + + "memory utilization and compression, but risk OOMs when caching data.", + isPublic = false) + + val IN_MEMORY_PARTITION_PRUNING = + booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning", + defaultValue = Some(true), + doc = "When true, enable partition pruning for in-memory columnar tables.", + isPublic = false) + + val AUTO_BROADCASTJOIN_THRESHOLD = intConf("spark.sql.autoBroadcastJoinThreshold", + defaultValue = Some(10 * 1024 * 1024), + doc = "Configures the maximum size in bytes for a table that will be broadcast to all worker " + + "nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " + + "Note that currently statistics are only supported for Hive Metastore tables where the " + + "command<code>ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan</code> has been run.") + + val DEFAULT_SIZE_IN_BYTES = longConf( + "spark.sql.defaultSizeInBytes", + doc = "The default table size used in query planning. By default, it is set to a larger " + + "value than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. That is to say " + + "by default the optimizer will not choose to broadcast a table unless it knows for sure its" + + "size is small enough.", + isPublic = false) + + val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions", + defaultValue = Some(200), + doc = "The default number of partitions to use when shuffling data for joins or aggregations.") + + val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = + longMemConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", + defaultValue = Some(64 * 1024 * 1024), + doc = "The target post-shuffle input size in bytes of a task.") + + val ADAPTIVE_EXECUTION_ENABLED = booleanConf("spark.sql.adaptive.enabled", + defaultValue = Some(false), + doc = "When true, enable adaptive query execution.") + + val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = + intConf("spark.sql.adaptive.minNumPostShufflePartitions", + defaultValue = Some(-1), + doc = "The advisory minimal number of post-shuffle partitions provided to " + + "ExchangeCoordinator. This setting is used in our test to make sure we " + + "have enough parallelism to expose issues that will not be exposed with a " + + "single partition. When the value is a non-positive value, this setting will" + + "not be provided to ExchangeCoordinator.", + isPublic = false) + + val SUBEXPRESSION_ELIMINATION_ENABLED = booleanConf("spark.sql.subexpressionElimination.enabled", + defaultValue = Some(true), + doc = "When true, common subexpressions will be eliminated.", + isPublic = false) + + val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive", + defaultValue = Some(true), + doc = "Whether the query analyzer should be case sensitive or not.") + + val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema", + defaultValue = Some(false), + doc = "When true, the Parquet data source merges schemas collected from all data files, " + + "otherwise the schema is picked from the summary file or a random data file " + + "if no summary file is available.") + + val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles", + defaultValue = Some(false), + doc = "When true, we make assumption that all part-files of Parquet are consistent with " + + "summary files and we will ignore them when merging schema. Otherwise, if this is " + + "false, which is the default, we will merge all part-files. This should be considered " + + "as expert-only option, and shouldn't be enabled before knowing what it means exactly.") + + val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString", + defaultValue = Some(false), + doc = "Some other Parquet-producing systems, in particular Impala and older versions of " + + "Spark SQL, do not differentiate between binary data and strings when writing out the " + + "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " + + "compatibility with these systems.") + + val PARQUET_INT96_AS_TIMESTAMP = booleanConf("spark.sql.parquet.int96AsTimestamp", + defaultValue = Some(true), + doc = "Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " + + "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " + + "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " + + "provide compatibility with these systems.") + + val PARQUET_CACHE_METADATA = booleanConf("spark.sql.parquet.cacheMetadata", + defaultValue = Some(true), + doc = "Turns on caching of Parquet schema metadata. Can speed up querying of static data.") + + val PARQUET_COMPRESSION = enumConf("spark.sql.parquet.compression.codec", + valueConverter = v => v.toLowerCase, + validValues = Set("uncompressed", "snappy", "gzip", "lzo"), + defaultValue = Some("gzip"), + doc = "Sets the compression codec use when writing Parquet files. Acceptable values include: " + + "uncompressed, snappy, gzip, lzo.") + + val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown", + defaultValue = Some(true), + doc = "Enables Parquet filter push-down optimization when set to true.") + + val PARQUET_WRITE_LEGACY_FORMAT = booleanConf( + key = "spark.sql.parquet.writeLegacyFormat", + defaultValue = Some(false), + doc = "Whether to follow Parquet's format specification when converting Parquet schema to " + + "Spark SQL schema and vice versa.") + + val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf( + key = "spark.sql.parquet.output.committer.class", + defaultValue = Some(classOf[ParquetOutputCommitter].getName), + doc = "The output committer class used by Parquet. The specified class needs to be a " + + "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " + + "of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " + + "option must be set in Hadoop Configuration. 2. This option overrides " + + "\"spark.sql.sources.outputCommitterClass\".") + + val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = booleanConf( + key = "spark.sql.parquet.enableUnsafeRowRecordReader", + defaultValue = Some(true), + doc = "Enables using the custom ParquetUnsafeRowRecordReader.") + + // Note: this can not be enabled all the time because the reader will not be returning UnsafeRows. + // Doing so is very expensive and we should remove this requirement instead of fixing it here. + // Initial testing seems to indicate only sort requires this. + val PARQUET_VECTORIZED_READER_ENABLED = booleanConf( + key = "spark.sql.parquet.enableVectorizedReader", + defaultValue = Some(false), + doc = "Enables vectorized parquet decoding.") + + val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown", + defaultValue = Some(false), + doc = "When true, enable filter pushdown for ORC files.") + + val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath", + defaultValue = Some(false), + doc = "When true, check all the partition paths under the table\'s root directory " + + "when reading data stored in HDFS.") + + val HIVE_METASTORE_PARTITION_PRUNING = booleanConf("spark.sql.hive.metastorePartitionPruning", + defaultValue = Some(false), + doc = "When true, some predicates will be pushed down into the Hive metastore so that " + + "unmatching partitions can be eliminated earlier.") + + val NATIVE_VIEW = booleanConf("spark.sql.nativeView", + defaultValue = Some(false), + doc = "When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " + + "Note that this function is experimental and should ony be used when you are using " + + "non-hive-compatible tables written by Spark SQL. The SQL string used to create " + + "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " + + "possible, or you may get wrong result.", + isPublic = false) + + val CANONICAL_NATIVE_VIEW = booleanConf("spark.sql.nativeView.canonical", + defaultValue = Some(true), + doc = "When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " + + "CREATE VIEW statement using SQL query string generated from view definition logical " + + "plan. If the logical plan doesn't have a SQL representation, we fallback to the " + + "original native view implementation.", + isPublic = false) + + val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord", + defaultValue = Some("_corrupt_record"), + doc = "The name of internal column for storing raw/un-parsed JSON records that fail to parse.") + + val BROADCAST_TIMEOUT = intConf("spark.sql.broadcastTimeout", + defaultValue = Some(5 * 60), + doc = "Timeout in seconds for the broadcast wait time in broadcast joins.") + + // This is only used for the thriftserver + val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool", + doc = "Set a Fair Scheduler pool for a JDBC client session") + + val THRIFTSERVER_UI_STATEMENT_LIMIT = intConf("spark.sql.thriftserver.ui.retainedStatements", + defaultValue = Some(200), + doc = "The number of SQL statements kept in the JDBC/ODBC web UI history.") + + val THRIFTSERVER_UI_SESSION_LIMIT = intConf("spark.sql.thriftserver.ui.retainedSessions", + defaultValue = Some(200), + doc = "The number of SQL client sessions kept in the JDBC/ODBC web UI history.") + + // This is used to set the default data source + val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default", + defaultValue = Some("org.apache.spark.sql.parquet"), + doc = "The default data source to use in input/output.") + + // This is used to control the when we will split a schema's JSON string to multiple pieces + // in order to fit the JSON string in metastore's table property (by default, the value has + // a length restriction of 4000 characters). We will split the JSON string of a schema + // to its length exceeds the threshold. + val SCHEMA_STRING_LENGTH_THRESHOLD = intConf("spark.sql.sources.schemaStringLengthThreshold", + defaultValue = Some(4000), + doc = "The maximum length allowed in a single cell when " + + "storing additional schema information in Hive's metastore.", + isPublic = false) + + val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled", + defaultValue = Some(true), + doc = "When true, automatically discover data partitions.") + + val PARTITION_COLUMN_TYPE_INFERENCE = + booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled", + defaultValue = Some(true), + doc = "When true, automatically infer the data types for partitioned columns.") + + val PARTITION_MAX_FILES = + intConf("spark.sql.sources.maxConcurrentWrites", + defaultValue = Some(1), + doc = "The maximum number of concurrent files to open before falling back on sorting when " + + "writing out files using dynamic partitioning.") + + val BUCKETING_ENABLED = booleanConf("spark.sql.sources.bucketing.enabled", + defaultValue = Some(true), + doc = "When false, we will treat bucketed table as normal table") + + // The output committer class used by HadoopFsRelation. The specified class needs to be a + // subclass of org.apache.hadoop.mapreduce.OutputCommitter. + // + // NOTE: + // + // 1. Instead of SQLConf, this option *must be set in Hadoop Configuration*. + // 2. This option can be overriden by "spark.sql.parquet.output.committer.class". + val OUTPUT_COMMITTER_CLASS = + stringConf("spark.sql.sources.outputCommitterClass", isPublic = false) + + val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = intConf( + key = "spark.sql.sources.parallelPartitionDiscovery.threshold", + defaultValue = Some(32), + doc = "The degree of parallelism for schema merging and partition discovery of " + + "Parquet data sources.") + + // Whether to perform eager analysis when constructing a dataframe. + // Set to false when debugging requires the ability to look at invalid query plans. + val DATAFRAME_EAGER_ANALYSIS = booleanConf( + "spark.sql.eagerAnalysis", + defaultValue = Some(true), + doc = "When true, eagerly applies query analysis on DataFrame operations.", + isPublic = false) + + // Whether to automatically resolve ambiguity in join conditions for self-joins. + // See SPARK-6231. + val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = booleanConf( + "spark.sql.selfJoinAutoResolveAmbiguity", + defaultValue = Some(true), + isPublic = false) + + // Whether to retain group by columns or not in GroupedData.agg. + val DATAFRAME_RETAIN_GROUP_COLUMNS = booleanConf( + "spark.sql.retainGroupColumns", + defaultValue = Some(true), + isPublic = false) + + val DATAFRAME_PIVOT_MAX_VALUES = intConf( + "spark.sql.pivotMaxValues", + defaultValue = Some(10000), + doc = "When doing a pivot without specifying values for the pivot column this is the maximum " + + "number of (distinct) values that will be collected without error." + ) + + val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles", + defaultValue = Some(true), + isPublic = false, + doc = "When true, we could use `datasource`.`path` as table in SQL query" + ) + + val PARSER_SUPPORT_QUOTEDID = booleanConf("spark.sql.parser.supportQuotedIdentifiers", + defaultValue = Some(true), + isPublic = false, + doc = "Whether to use quoted identifier.\n false: default(past) behavior. Implies only" + + "alphaNumeric and underscore are valid characters in identifiers.\n" + + " true: implies column names can contain any character.") + + val PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS = booleanConf( + "spark.sql.parser.supportSQL11ReservedKeywords", + defaultValue = Some(false), + isPublic = false, + doc = "This flag should be set to true to enable support for SQL2011 reserved keywords.") + + val WHOLESTAGE_CODEGEN_ENABLED = booleanConf("spark.sql.codegen.wholeStage", + defaultValue = Some(true), + doc = "When true, the whole stage (of multiple operators) will be compiled into single java" + + " method", + isPublic = false) + + + object Deprecated { + val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" + val EXTERNAL_SORT = "spark.sql.planner.externalSort" + val USE_SQL_AGGREGATE2 = "spark.sql.useAggregate2" + val TUNGSTEN_ENABLED = "spark.sql.tungsten.enabled" + val CODEGEN_ENABLED = "spark.sql.codegen" + val UNSAFE_ENABLED = "spark.sql.unsafe.enabled" + val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin" + } +} + +/** + * A class that enables the setting and getting of mutable config parameters/hints. + * + * In the presence of a SQLContext, these can be set and queried by passing SET commands + * into Spark SQL's query functions (i.e. sql()). Otherwise, users of this class can + * modify the hints by programmatically calling the setters and getters of this class. + * + * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). + */ +class SQLConf extends Serializable with CatalystConf with ParserConf with Logging { + import SQLConf._ + + /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */ + @transient protected[spark] val settings = java.util.Collections.synchronizedMap( + new java.util.HashMap[String, String]()) + + /** ************************ Spark SQL Params/Hints ******************* */ + + def useCompression: Boolean = getConf(COMPRESS_CACHED) + + def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) + + def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) + + def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) + + def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) + + def targetPostShuffleInputSize: Long = + getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) + + def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) + + def minNumPostShufflePartitions: Int = + getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) + + def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED) + + def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) + + def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) + + def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) + + def nativeView: Boolean = getConf(NATIVE_VIEW) + + def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED) + + def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW) + + def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) + + def subexpressionEliminationEnabled: Boolean = + getConf(SUBEXPRESSION_ELIMINATION_ENABLED) + + def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD) + + def defaultSizeInBytes: Long = + getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L) + + def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING) + + def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP) + + def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT) + + def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) + + def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD) + + def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT) + + def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) + + def partitionDiscoveryEnabled(): Boolean = + getConf(SQLConf.PARTITION_DISCOVERY_ENABLED) + + def partitionColumnTypeInferenceEnabled(): Boolean = + getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE) + + def parallelPartitionDiscoveryThreshold: Int = + getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) + + def bucketingEnabled(): Boolean = getConf(SQLConf.BUCKETING_ENABLED) + + // Do not use a value larger than 4000 as the default value of this property. + // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information. + def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD) + + def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS) + + def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = + getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) + + def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS) + + def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES) + + def supportQuotedId: Boolean = getConf(PARSER_SUPPORT_QUOTEDID) + + def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS) + + /** ********************** SQLConf functionality methods ************ */ + + /** Set Spark SQL configuration properties. */ + def setConf(props: Properties): Unit = settings.synchronized { + props.asScala.foreach { case (k, v) => setConfString(k, v) } + } + + /** Set the given Spark SQL configuration property using a `string` value. */ + def setConfString(key: String, value: String): Unit = { + require(key != null, "key cannot be null") + require(value != null, s"value cannot be null for key: $key") + val entry = sqlConfEntries.get(key) + if (entry != null) { + // Only verify configs in the SQLConf object + entry.valueConverter(value) + } + setConfWithCheck(key, value) + } + + /** Set the given Spark SQL configuration property. */ + def setConf[T](entry: SQLConfEntry[T], value: T): Unit = { + require(entry != null, "entry cannot be null") + require(value != null, s"value cannot be null for key: ${entry.key}") + require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") + setConfWithCheck(entry.key, entry.stringConverter(value)) + } + + /** Return the value of Spark SQL configuration property for the given key. */ + def getConfString(key: String): String = { + Option(settings.get(key)). + orElse { + // Try to use the default value + Option(sqlConfEntries.get(key)).map(_.defaultValueString) + }. + getOrElse(throw new NoSuchElementException(key)) + } + + /** + * Return the value of Spark SQL configuration property for the given key. If the key is not set + * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the + * desired one. + */ + def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = { + require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") + Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue) + } + + /** + * Return the value of Spark SQL configuration property for the given key. If the key is not set + * yet, return `defaultValue` in [[SQLConfEntry]]. + */ + def getConf[T](entry: SQLConfEntry[T]): T = { + require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") + Option(settings.get(entry.key)).map(entry.valueConverter).orElse(entry.defaultValue). + getOrElse(throw new NoSuchElementException(entry.key)) + } + + /** + * Return the `string` value of Spark SQL configuration property for the given key. If the key is + * not set yet, return `defaultValue`. + */ + def getConfString(key: String, defaultValue: String): String = { + val entry = sqlConfEntries.get(key) + if (entry != null && defaultValue != "<undefined>") { + // Only verify configs in the SQLConf object + entry.valueConverter(defaultValue) + } + Option(settings.get(key)).getOrElse(defaultValue) + } + + /** + * Return all the configuration properties that have been set (i.e. not the default). + * This creates a new copy of the config properties in the form of a Map. + */ + def getAllConfs: immutable.Map[String, String] = + settings.synchronized { settings.asScala.toMap } + + /** + * Return all the configuration definitions that have been defined in [[SQLConf]]. Each + * definition contains key, defaultValue and doc. + */ + def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized { + sqlConfEntries.values.asScala.filter(_.isPublic).map { entry => + (entry.key, entry.defaultValueString, entry.doc) + }.toSeq + } + + private def setConfWithCheck(key: String, value: String): Unit = { + if (key.startsWith("spark.") && !key.startsWith("spark.sql.")) { + logWarning(s"Attempt to set non-Spark SQL config in SQLConf: key = $key, value = $value") + } + settings.put(key, value) + } + + def unsetConf(key: String): Unit = { + settings.remove(key) + } + + def unsetConf(entry: SQLConfEntry[_]): Unit = { + settings.remove(entry.key) + } + + def clear(): Unit = { + settings.clear() + } +} +
http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/main/scala/org/apache/spark/sql/internal/package-info.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/package-info.java b/sql/core/src/main/scala/org/apache/spark/sql/internal/package-info.java new file mode 100644 index 0000000..1e801cb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * All classes in this package are considered an internal API to Spark and + * are subject to change between minor releases. + */ +package org.apache.spark.sql.internal; http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/main/scala/org/apache/spark/sql/internal/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/package.scala new file mode 100644 index 0000000..c2394f4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +/** + * All classes in this package are considered an internal API to Spark and + * are subject to change between minor releases. + */ +package object internal http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 78bf6c1..f54bff9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.DecimalType http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala index bc1a336..368aa5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext class DataFramePivotSuite extends QueryTest with SharedSQLContext{ http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b8d1b5a..84f30c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union} import org.apache.spark.sql.execution.aggregate.TungstenAggregate import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext} import org.apache.spark.sql.test.SQLTestData.TestData2 import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index a1211e4..41e27ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.joins._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala index 6a375a3..0b5a92c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterAll import org.apache.spark._ +import org.apache.spark.sql.internal.SQLConf class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 5401212..c05aa54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.{LogicalRDD, Queryable} import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.SQLConf abstract class QueryTest extends PlanTest { http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala deleted file mode 100644 index 2e33777..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.SQLConf._ - -class SQLConfEntrySuite extends SparkFunSuite { - - val conf = new SQLConf - - test("intConf") { - val key = "spark.sql.SQLConfEntrySuite.int" - val confEntry = SQLConfEntry.intConf(key) - assert(conf.getConf(confEntry, 5) === 5) - - conf.setConf(confEntry, 10) - assert(conf.getConf(confEntry, 5) === 10) - - conf.setConfString(key, "20") - assert(conf.getConfString(key, "5") === "20") - assert(conf.getConfString(key) === "20") - assert(conf.getConf(confEntry, 5) === 20) - - val e = intercept[IllegalArgumentException] { - conf.setConfString(key, "abc") - } - assert(e.getMessage === s"$key should be int, but was abc") - } - - test("longConf") { - val key = "spark.sql.SQLConfEntrySuite.long" - val confEntry = SQLConfEntry.longConf(key) - assert(conf.getConf(confEntry, 5L) === 5L) - - conf.setConf(confEntry, 10L) - assert(conf.getConf(confEntry, 5L) === 10L) - - conf.setConfString(key, "20") - assert(conf.getConfString(key, "5") === "20") - assert(conf.getConfString(key) === "20") - assert(conf.getConf(confEntry, 5L) === 20L) - - val e = intercept[IllegalArgumentException] { - conf.setConfString(key, "abc") - } - assert(e.getMessage === s"$key should be long, but was abc") - } - - test("booleanConf") { - val key = "spark.sql.SQLConfEntrySuite.boolean" - val confEntry = SQLConfEntry.booleanConf(key) - assert(conf.getConf(confEntry, false) === false) - - conf.setConf(confEntry, true) - assert(conf.getConf(confEntry, false) === true) - - conf.setConfString(key, "true") - assert(conf.getConfString(key, "false") === "true") - assert(conf.getConfString(key) === "true") - assert(conf.getConf(confEntry, false) === true) - - val e = intercept[IllegalArgumentException] { - conf.setConfString(key, "abc") - } - assert(e.getMessage === s"$key should be boolean, but was abc") - } - - test("doubleConf") { - val key = "spark.sql.SQLConfEntrySuite.double" - val confEntry = SQLConfEntry.doubleConf(key) - assert(conf.getConf(confEntry, 5.0) === 5.0) - - conf.setConf(confEntry, 10.0) - assert(conf.getConf(confEntry, 5.0) === 10.0) - - conf.setConfString(key, "20.0") - assert(conf.getConfString(key, "5.0") === "20.0") - assert(conf.getConfString(key) === "20.0") - assert(conf.getConf(confEntry, 5.0) === 20.0) - - val e = intercept[IllegalArgumentException] { - conf.setConfString(key, "abc") - } - assert(e.getMessage === s"$key should be double, but was abc") - } - - test("stringConf") { - val key = "spark.sql.SQLConfEntrySuite.string" - val confEntry = SQLConfEntry.stringConf(key) - assert(conf.getConf(confEntry, "abc") === "abc") - - conf.setConf(confEntry, "abcd") - assert(conf.getConf(confEntry, "abc") === "abcd") - - conf.setConfString(key, "abcde") - assert(conf.getConfString(key, "abc") === "abcde") - assert(conf.getConfString(key) === "abcde") - assert(conf.getConf(confEntry, "abc") === "abcde") - } - - test("enumConf") { - val key = "spark.sql.SQLConfEntrySuite.enum" - val confEntry = SQLConfEntry.enumConf(key, v => v, Set("a", "b", "c"), defaultValue = Some("a")) - assert(conf.getConf(confEntry) === "a") - - conf.setConf(confEntry, "b") - assert(conf.getConf(confEntry) === "b") - - conf.setConfString(key, "c") - assert(conf.getConfString(key, "a") === "c") - assert(conf.getConfString(key) === "c") - assert(conf.getConf(confEntry) === "c") - - val e = intercept[IllegalArgumentException] { - conf.setConfString(key, "d") - } - assert(e.getMessage === s"The value of $key should be one of a, b, c, but was d") - } - - test("stringSeqConf") { - val key = "spark.sql.SQLConfEntrySuite.stringSeq" - val confEntry = SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq", - defaultValue = Some(Nil)) - assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c")) - - conf.setConf(confEntry, Seq("a", "b", "c", "d")) - assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d")) - - conf.setConfString(key, "a,b,c,d,e") - assert(conf.getConfString(key, "a,b,c") === "a,b,c,d,e") - assert(conf.getConfString(key) === "a,b,c,d,e") - assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e")) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala deleted file mode 100644 index cf0701e..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql - -import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} - -class SQLConfSuite extends QueryTest with SharedSQLContext { - private val testKey = "test.key.0" - private val testVal = "test.val.0" - - test("propagate from spark conf") { - // We create a new context here to avoid order dependence with other tests that might call - // clear(). - val newContext = new SQLContext(sparkContext) - assert(newContext.getConf("spark.sql.testkey", "false") === "true") - } - - test("programmatic ways of basic setting and getting") { - // Set a conf first. - sqlContext.setConf(testKey, testVal) - // Clear the conf. - sqlContext.conf.clear() - // After clear, only overrideConfs used by unit test should be in the SQLConf. - assert(sqlContext.getAllConfs === TestSQLContext.overrideConfs) - - sqlContext.setConf(testKey, testVal) - assert(sqlContext.getConf(testKey) === testVal) - assert(sqlContext.getConf(testKey, testVal + "_") === testVal) - assert(sqlContext.getAllConfs.contains(testKey)) - - // Tests SQLConf as accessed from a SQLContext is mutable after - // the latter is initialized, unlike SparkConf inside a SparkContext. - assert(sqlContext.getConf(testKey) === testVal) - assert(sqlContext.getConf(testKey, testVal + "_") === testVal) - assert(sqlContext.getAllConfs.contains(testKey)) - - sqlContext.conf.clear() - } - - test("parse SQL set commands") { - sqlContext.conf.clear() - sql(s"set $testKey=$testVal") - assert(sqlContext.getConf(testKey, testVal + "_") === testVal) - assert(sqlContext.getConf(testKey, testVal + "_") === testVal) - - sql("set some.property=20") - assert(sqlContext.getConf("some.property", "0") === "20") - sql("set some.property = 40") - assert(sqlContext.getConf("some.property", "0") === "40") - - val key = "spark.sql.key" - val vs = "val0,val_1,val2.3,my_table" - sql(s"set $key=$vs") - assert(sqlContext.getConf(key, "0") === vs) - - sql(s"set $key=") - assert(sqlContext.getConf(key, "0") === "") - - sqlContext.conf.clear() - } - - test("deprecated property") { - sqlContext.conf.clear() - val original = sqlContext.conf.numShufflePartitions - try{ - sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10") - assert(sqlContext.conf.numShufflePartitions === 10) - } finally { - sql(s"set ${SQLConf.SHUFFLE_PARTITIONS}=$original") - } - } - - test("invalid conf value") { - sqlContext.conf.clear() - val e = intercept[IllegalArgumentException] { - sql(s"set ${SQLConf.CASE_SENSITIVE.key}=10") - } - assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10") - } - - test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") { - sqlContext.conf.clear() - - sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100") - assert(sqlContext.conf.targetPostShuffleInputSize === 100) - - sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1k") - assert(sqlContext.conf.targetPostShuffleInputSize === 1024) - - sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1M") - assert(sqlContext.conf.targetPostShuffleInputSize === 1048576) - - sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1g") - assert(sqlContext.conf.targetPostShuffleInputSize === 1073741824) - - sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1") - assert(sqlContext.conf.targetPostShuffleInputSize === -1) - - // Test overflow exception - intercept[IllegalArgumentException] { - // This value exceeds Long.MaxValue - sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "90000000000g") - } - - intercept[IllegalArgumentException] { - // This value less than Int.MinValue - sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g") - } - - // Test invalid input - intercept[IllegalArgumentException] { - // This value exceeds Long.MaxValue - sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g") - } - sqlContext.conf.clear() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index 14b9448..ec19d97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.apache.spark.{SharedSparkContext, SparkFunSuite} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf class SQLContextSuite extends SparkFunSuite with SharedSparkContext{ http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 13ff4a2..16e769f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.AccumulatorSuite import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin} import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala index b1c588a..4f01e46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.{MapOutputStatistics, SparkConf, SparkContext, SparkFunS import org.apache.spark.sql._ import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ShuffleExchange} import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.TestSQLContext class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 4de5678..f66e08e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{execution, Row, SQLConf} +import org.apache.spark.sql.{execution, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMem import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchange} import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin} import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala index 86c2c25..d19fec6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index dd83a0e..c7f33e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 3ded32c..fbffe86 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation} import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 41a9404..3c74464 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 3d1677b..8bc5c89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.execution.datasources.{LogicalRelation, Partition, PartitioningUtils, PartitionSpec} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index b123d2b..acfc1a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala index e889307..dafc589 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala @@ -22,7 +22,8 @@ import scala.collection.JavaConverters._ import scala.util.Try import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{SQLConf, SQLContext} +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{Benchmark, Utils} /** http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 5cbcccb..e8c524e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -30,7 +30,8 @@ import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter} import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetMetadata} import org.apache.parquet.schema.MessageType -import org.apache.spark.sql.{DataFrame, SaveMode, SQLConf} +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 6dfff37..b748229 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.execution.joins -import org.apache.spark.sql.{DataFrame, Row, SQLConf} +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.EnsureRequirements +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructType} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index cd6b6fc..22fe8ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.execution.joins -import org.apache.spark.sql.{DataFrame, Row, SQLConf} +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{And, Expression, LessThan} import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.exchange.EnsureRequirements +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala index f3ad840..5c98288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.execution.joins -import org.apache.spark.sql.{DataFrame, Row, SQLConf} +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{And, Expression, LessThan} import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.exchange.EnsureRequirements +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala index efc3227..cd9277d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.execution.local -import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.internal.SQLConf /** * A dummy [[LocalNode]] that just returns rows from a [[LocalRelation]]. http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala index eb70747..268f2aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution.local import org.mockito.Mockito.{mock, when} import org.apache.spark.broadcast.TorrentBroadcast -import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedMutableProjection, UnsafeProjection} import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation} +import org.apache.spark.sql.internal.SQLConf class HashJoinNodeSuite extends LocalNodeTest { http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala index 1a485f9..cd67a66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.execution.local import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType} class LocalNodeTest extends SparkFunSuite { http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala index 45df2ea..bcc87a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.execution.local -import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.internal.SQLConf class NestedLoopJoinNodeSuite extends LocalNodeTest { http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 46bb699..c49f243 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.SparkPlanGraph import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.{JsonProtocol, Utils} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala new file mode 100644 index 0000000..2b89fa9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.internal.SQLConf._ + +class SQLConfEntrySuite extends SparkFunSuite { + + val conf = new SQLConf + + test("intConf") { + val key = "spark.sql.SQLConfEntrySuite.int" + val confEntry = SQLConfEntry.intConf(key) + assert(conf.getConf(confEntry, 5) === 5) + + conf.setConf(confEntry, 10) + assert(conf.getConf(confEntry, 5) === 10) + + conf.setConfString(key, "20") + assert(conf.getConfString(key, "5") === "20") + assert(conf.getConfString(key) === "20") + assert(conf.getConf(confEntry, 5) === 20) + + val e = intercept[IllegalArgumentException] { + conf.setConfString(key, "abc") + } + assert(e.getMessage === s"$key should be int, but was abc") + } + + test("longConf") { + val key = "spark.sql.SQLConfEntrySuite.long" + val confEntry = SQLConfEntry.longConf(key) + assert(conf.getConf(confEntry, 5L) === 5L) + + conf.setConf(confEntry, 10L) + assert(conf.getConf(confEntry, 5L) === 10L) + + conf.setConfString(key, "20") + assert(conf.getConfString(key, "5") === "20") + assert(conf.getConfString(key) === "20") + assert(conf.getConf(confEntry, 5L) === 20L) + + val e = intercept[IllegalArgumentException] { + conf.setConfString(key, "abc") + } + assert(e.getMessage === s"$key should be long, but was abc") + } + + test("booleanConf") { + val key = "spark.sql.SQLConfEntrySuite.boolean" + val confEntry = SQLConfEntry.booleanConf(key) + assert(conf.getConf(confEntry, false) === false) + + conf.setConf(confEntry, true) + assert(conf.getConf(confEntry, false) === true) + + conf.setConfString(key, "true") + assert(conf.getConfString(key, "false") === "true") + assert(conf.getConfString(key) === "true") + assert(conf.getConf(confEntry, false) === true) + + val e = intercept[IllegalArgumentException] { + conf.setConfString(key, "abc") + } + assert(e.getMessage === s"$key should be boolean, but was abc") + } + + test("doubleConf") { + val key = "spark.sql.SQLConfEntrySuite.double" + val confEntry = SQLConfEntry.doubleConf(key) + assert(conf.getConf(confEntry, 5.0) === 5.0) + + conf.setConf(confEntry, 10.0) + assert(conf.getConf(confEntry, 5.0) === 10.0) + + conf.setConfString(key, "20.0") + assert(conf.getConfString(key, "5.0") === "20.0") + assert(conf.getConfString(key) === "20.0") + assert(conf.getConf(confEntry, 5.0) === 20.0) + + val e = intercept[IllegalArgumentException] { + conf.setConfString(key, "abc") + } + assert(e.getMessage === s"$key should be double, but was abc") + } + + test("stringConf") { + val key = "spark.sql.SQLConfEntrySuite.string" + val confEntry = SQLConfEntry.stringConf(key) + assert(conf.getConf(confEntry, "abc") === "abc") + + conf.setConf(confEntry, "abcd") + assert(conf.getConf(confEntry, "abc") === "abcd") + + conf.setConfString(key, "abcde") + assert(conf.getConfString(key, "abc") === "abcde") + assert(conf.getConfString(key) === "abcde") + assert(conf.getConf(confEntry, "abc") === "abcde") + } + + test("enumConf") { + val key = "spark.sql.SQLConfEntrySuite.enum" + val confEntry = SQLConfEntry.enumConf(key, v => v, Set("a", "b", "c"), defaultValue = Some("a")) + assert(conf.getConf(confEntry) === "a") + + conf.setConf(confEntry, "b") + assert(conf.getConf(confEntry) === "b") + + conf.setConfString(key, "c") + assert(conf.getConfString(key, "a") === "c") + assert(conf.getConfString(key) === "c") + assert(conf.getConf(confEntry) === "c") + + val e = intercept[IllegalArgumentException] { + conf.setConfString(key, "d") + } + assert(e.getMessage === s"The value of $key should be one of a, b, c, but was d") + } + + test("stringSeqConf") { + val key = "spark.sql.SQLConfEntrySuite.stringSeq" + val confEntry = SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq", + defaultValue = Some(Nil)) + assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c")) + + conf.setConf(confEntry, Seq("a", "b", "c", "d")) + assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d")) + + conf.setConfString(key, "a,b,c,d,e") + assert(conf.getConfString(key, "a,b,c") === "a,b,c,d,e") + assert(conf.getConfString(key) === "a,b,c,d,e") + assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e")) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala new file mode 100644 index 0000000..e944d32 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -0,0 +1,133 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.internal + +import org.apache.spark.sql.{QueryTest, SQLContext} +import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} + +class SQLConfSuite extends QueryTest with SharedSQLContext { + private val testKey = "test.key.0" + private val testVal = "test.val.0" + + test("propagate from spark conf") { + // We create a new context here to avoid order dependence with other tests that might call + // clear(). + val newContext = new SQLContext(sparkContext) + assert(newContext.getConf("spark.sql.testkey", "false") === "true") + } + + test("programmatic ways of basic setting and getting") { + // Set a conf first. + sqlContext.setConf(testKey, testVal) + // Clear the conf. + sqlContext.conf.clear() + // After clear, only overrideConfs used by unit test should be in the SQLConf. + assert(sqlContext.getAllConfs === TestSQLContext.overrideConfs) + + sqlContext.setConf(testKey, testVal) + assert(sqlContext.getConf(testKey) === testVal) + assert(sqlContext.getConf(testKey, testVal + "_") === testVal) + assert(sqlContext.getAllConfs.contains(testKey)) + + // Tests SQLConf as accessed from a SQLContext is mutable after + // the latter is initialized, unlike SparkConf inside a SparkContext. + assert(sqlContext.getConf(testKey) === testVal) + assert(sqlContext.getConf(testKey, testVal + "_") === testVal) + assert(sqlContext.getAllConfs.contains(testKey)) + + sqlContext.conf.clear() + } + + test("parse SQL set commands") { + sqlContext.conf.clear() + sql(s"set $testKey=$testVal") + assert(sqlContext.getConf(testKey, testVal + "_") === testVal) + assert(sqlContext.getConf(testKey, testVal + "_") === testVal) + + sql("set some.property=20") + assert(sqlContext.getConf("some.property", "0") === "20") + sql("set some.property = 40") + assert(sqlContext.getConf("some.property", "0") === "40") + + val key = "spark.sql.key" + val vs = "val0,val_1,val2.3,my_table" + sql(s"set $key=$vs") + assert(sqlContext.getConf(key, "0") === vs) + + sql(s"set $key=") + assert(sqlContext.getConf(key, "0") === "") + + sqlContext.conf.clear() + } + + test("deprecated property") { + sqlContext.conf.clear() + val original = sqlContext.conf.numShufflePartitions + try{ + sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10") + assert(sqlContext.conf.numShufflePartitions === 10) + } finally { + sql(s"set ${SQLConf.SHUFFLE_PARTITIONS}=$original") + } + } + + test("invalid conf value") { + sqlContext.conf.clear() + val e = intercept[IllegalArgumentException] { + sql(s"set ${SQLConf.CASE_SENSITIVE.key}=10") + } + assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10") + } + + test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") { + sqlContext.conf.clear() + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100") + assert(sqlContext.conf.targetPostShuffleInputSize === 100) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1k") + assert(sqlContext.conf.targetPostShuffleInputSize === 1024) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1M") + assert(sqlContext.conf.targetPostShuffleInputSize === 1048576) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1g") + assert(sqlContext.conf.targetPostShuffleInputSize === 1073741824) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1") + assert(sqlContext.conf.targetPostShuffleInputSize === -1) + + // Test overflow exception + intercept[IllegalArgumentException] { + // This value exceeds Long.MaxValue + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "90000000000g") + } + + intercept[IllegalArgumentException] { + // This value less than Int.MinValue + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g") + } + + // Test invalid input + intercept[IllegalArgumentException] { + // This value exceeds Long.MaxValue + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g") + } + sqlContext.conf.clear() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala index af04079..9206113 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import org.apache.spark.sql._ +import org.apache.spark.sql.internal.SQLConf private[sql] abstract class DataSourceTest extends QueryTest { http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 4b578a6..2ff79a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index 02e8ab1..db72297 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -21,6 +21,7 @@ import scala.language.existentials import org.apache.spark.rdd.RDD import org.apache.spark.sql._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c8c33/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index e055da9..588f6e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -21,7 +21,8 @@ import java.io.File import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLConf} +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org