Repository: spark Updated Branches: refs/heads/master f8c7c1f21 -> c3dd2a26d
[SPARK-22779][SQL] Resolve default values for fallback configs. SQLConf allows some callers to define a custom default value for configs, and that complicates a little bit the handling of fallback config entries, since most of the default value resolution is hidden by the config code. This change peaks into the internals of these fallback configs to figure out the correct default value, and also returns the current human-readable default when showing the default value (e.g. through "set -v"). Author: Marcelo Vanzin <van...@cloudera.com> Closes #19974 from vanzin/SPARK-22779. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3dd2a26 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3dd2a26 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3dd2a26 Branch: refs/heads/master Commit: c3dd2a26deaadf508b4e163eab2c0544cd922540 Parents: f8c7c1f Author: Marcelo Vanzin <van...@cloudera.com> Authored: Wed Dec 13 22:46:20 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Wed Dec 13 22:46:20 2017 -0800 ---------------------------------------------------------------------- .../spark/internal/config/ConfigEntry.scala | 8 ++++-- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++++--- .../spark/sql/internal/SQLConfSuite.scala | 30 ++++++++++++++++++++ 3 files changed, 47 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c3dd2a26/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala index f119028..ede3ace 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala @@ -139,7 +139,7 @@ private[spark] class OptionalConfigEntry[T]( s => Some(rawValueConverter(s)), v => v.map(rawStringConverter).orNull, doc, isPublic) { - override def defaultValueString: String = "<undefined>" + override def defaultValueString: String = ConfigEntry.UNDEFINED override def readFrom(reader: ConfigReader): Option[T] = { readString(reader).map(rawValueConverter) @@ -149,12 +149,12 @@ private[spark] class OptionalConfigEntry[T]( /** * A config entry whose default value is defined by another config entry. */ -private class FallbackConfigEntry[T] ( +private[spark] class FallbackConfigEntry[T] ( key: String, alternatives: List[String], doc: String, isPublic: Boolean, - private[config] val fallback: ConfigEntry[T]) + val fallback: ConfigEntry[T]) extends ConfigEntry[T](key, alternatives, fallback.valueConverter, fallback.stringConverter, doc, isPublic) { @@ -167,6 +167,8 @@ private class FallbackConfigEntry[T] ( private[spark] object ConfigEntry { + val UNDEFINED = "<undefined>" + private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]() def registerEntry(entry: ConfigEntry[_]): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/c3dd2a26/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1121444..cf7e3eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1379,7 +1379,7 @@ class SQLConf extends Serializable with Logging { Option(settings.get(key)). orElse { // Try to use the default value - Option(sqlConfEntries.get(key)).map(_.defaultValueString) + Option(sqlConfEntries.get(key)).map { e => e.stringConverter(e.readFrom(reader)) } }. getOrElse(throw new NoSuchElementException(key)) } @@ -1417,14 +1417,21 @@ class SQLConf extends Serializable with Logging { * not set yet, return `defaultValue`. */ def getConfString(key: String, defaultValue: String): String = { - if (defaultValue != null && defaultValue != "<undefined>") { + if (defaultValue != null && defaultValue != ConfigEntry.UNDEFINED) { val entry = sqlConfEntries.get(key) if (entry != null) { // Only verify configs in the SQLConf object entry.valueConverter(defaultValue) } } - Option(settings.get(key)).getOrElse(defaultValue) + Option(settings.get(key)).getOrElse { + // If the key is not set, need to check whether the config entry is registered and is + // a fallback conf, so that we can check its parent. + sqlConfEntries.get(key) match { + case e: FallbackConfigEntry[_] => getConfString(e.fallback.key, defaultValue) + case _ => defaultValue + } + } } /** @@ -1440,7 +1447,8 @@ class SQLConf extends Serializable with Logging { */ def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized { sqlConfEntries.values.asScala.filter(_.isPublic).map { entry => - (entry.key, getConfString(entry.key, entry.defaultValueString), entry.doc) + val displayValue = Option(getConfString(entry.key, null)).getOrElse(entry.defaultValueString) + (entry.key, displayValue, entry.doc) }.toSeq } http://git-wip-us.apache.org/repos/asf/spark/blob/c3dd2a26/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 8b1521b..c9a6975 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -280,4 +280,34 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { spark.sessionState.conf.clear() } + + test("SPARK-22779: correctly compute default value for fallback configs") { + val fallback = SQLConf.buildConf("spark.sql.__test__.spark_22779") + .fallbackConf(SQLConf.PARQUET_COMPRESSION) + + assert(spark.sessionState.conf.getConfString(fallback.key) === + SQLConf.PARQUET_COMPRESSION.defaultValue.get) + assert(spark.sessionState.conf.getConfString(fallback.key, "lzo") === "lzo") + + val displayValue = spark.sessionState.conf.getAllDefinedConfs + .find { case (key, _, _) => key == fallback.key } + .map { case (_, v, _) => v } + .get + assert(displayValue === fallback.defaultValueString) + + spark.sessionState.conf.setConf(SQLConf.PARQUET_COMPRESSION, "gzip") + assert(spark.sessionState.conf.getConfString(fallback.key) === "gzip") + + spark.sessionState.conf.setConf(fallback, "lzo") + assert(spark.sessionState.conf.getConfString(fallback.key) === "lzo") + + val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs + .find { case (key, _, _) => key == fallback.key } + .map { case (_, v, _) => v } + .get + assert(newDisplayValue === "lzo") + + SQLConf.unregister(fallback) + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org