Repository: spark Updated Branches: refs/heads/branch-2.0 0d16b7f3a -> 5625b037a
[SPARK-14422][SQL] Improve handling of optional configs in SQLConf ## What changes were proposed in this pull request? Create a new API for handling Optional Configs in SQLConf. Right now `getConf` for `OptionalConfigEntry[T]` returns value of type `T`, if doesn't exist throws an exception. Add new method `getOptionalConf`(suggestions on naming) which will now returns value of type `Option[T]`(so if doesn't exist it returns `None`). ## How was this patch tested? Add test and ran tests locally. Author: Sandeep Singh <sand...@techaddict.me> Closes #12846 from techaddict/SPARK-14422. (cherry picked from commit a8d56f538878443da6eae69449858ad4e2274151) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5625b037 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5625b037 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5625b037 Branch: refs/heads/branch-2.0 Commit: 5625b037a0c952b97e1afa6a44443113c0847ade Parents: 0d16b7f Author: Sandeep Singh <sand...@techaddict.me> Authored: Tue May 3 18:02:57 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Tue May 3 18:03:05 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 9 +++++---- .../main/scala/org/apache/spark/sql/RuntimeConfig.scala | 6 +++++- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 ++++----- .../apache/spark/sql/internal/SQLConfEntrySuite.scala | 11 +++++++++++ 4 files changed, 25 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5625b037/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a8f96a9..0793b62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -296,7 +296,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { new Path(userSpecified).toUri.toString }.orElse { val checkpointConfig: Option[String] = - df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION, None) + df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION) checkpointConfig.map { location => new Path(location, queryName).toUri.toString @@ -334,9 +334,10 @@ final class DataFrameWriter private[sql](df: DataFrame) { partitionColumns = normalizedParCols.getOrElse(Nil)) val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName) - val checkpointLocation = extraOptions.getOrElse("checkpointLocation", { - new Path(df.sparkSession.sessionState.conf.checkpointLocation, queryName).toUri.toString - }) + val checkpointLocation = extraOptions.getOrElse("checkpointLocation", + new Path(df.sparkSession.sessionState.conf.checkpointLocation.get, queryName).toUri.toString + ) + df.sparkSession.sessionState.continuousQueryManager.startQuery( queryName, checkpointLocation, http://git-wip-us.apache.org/repos/asf/spark/blob/5625b037/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala index 670288b..4fd6e42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} import org.apache.spark.sql.internal.SQLConf @@ -86,6 +86,10 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { sqlConf.getConf(entry) } + protected[sql] def get[T](entry: OptionalConfigEntry[T]): Option[T] = { + sqlConf.getConf(entry) + } + /** * Returns the value of Spark runtime configuration property for the given key. */ http://git-wip-us.apache.org/repos/asf/spark/blob/5625b037/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0bcf0f8..5e19984 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -546,7 +546,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD) - def checkpointLocation: String = getConf(CHECKPOINT_LOCATION) + def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION) def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) @@ -717,12 +717,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { /** * Return the value of an optional Spark SQL configuration property for the given key. If the key - * is not set yet, throw an exception. + * is not set yet, returns None. */ - def getConf[T](entry: OptionalConfigEntry[T]): T = { + def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = { require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") - Option(settings.get(entry.key)).map(entry.rawValueConverter). - getOrElse(throw new NoSuchElementException(entry.key)) + Option(settings.get(entry.key)).map(entry.rawValueConverter) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/5625b037/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala index cc69199..95bfd05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala @@ -153,6 +153,17 @@ class SQLConfEntrySuite extends SparkFunSuite { assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e")) } + test("optionalConf") { + val key = "spark.sql.SQLConfEntrySuite.optional" + val confEntry = SQLConfigBuilder(key) + .stringConf + .createOptional + + assert(conf.getConf(confEntry) === None) + conf.setConfString(key, "a") + assert(conf.getConf(confEntry) === Some("a")) + } + test("duplicate entry") { val key = "spark.sql.SQLConfEntrySuite.duplicate" SQLConfigBuilder(key).stringConf.createOptional --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org