Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0d16b7f3a -> 5625b037a


[SPARK-14422][SQL] Improve handling of optional configs in SQLConf

## What changes were proposed in this pull request?
Create a new API for handling Optional Configs in SQLConf.
Right now `getConf` for `OptionalConfigEntry[T]` returns value of type `T`, if 
doesn't exist throws an exception. Add new method `getOptionalConf`(suggestions 
on naming) which will now returns value of type `Option[T]`(so if doesn't exist 
it returns `None`).

## How was this patch tested?
Add test and ran tests locally.

Author: Sandeep Singh <sand...@techaddict.me>

Closes #12846 from techaddict/SPARK-14422.

(cherry picked from commit a8d56f538878443da6eae69449858ad4e2274151)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5625b037
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5625b037
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5625b037

Branch: refs/heads/branch-2.0
Commit: 5625b037a0c952b97e1afa6a44443113c0847ade
Parents: 0d16b7f
Author: Sandeep Singh <sand...@techaddict.me>
Authored: Tue May 3 18:02:57 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue May 3 18:03:05 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrameWriter.scala     |  9 +++++----
 .../main/scala/org/apache/spark/sql/RuntimeConfig.scala  |  6 +++++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala    |  9 ++++-----
 .../apache/spark/sql/internal/SQLConfEntrySuite.scala    | 11 +++++++++++
 4 files changed, 25 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5625b037/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index a8f96a9..0793b62 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -296,7 +296,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
         new Path(userSpecified).toUri.toString
       }.orElse {
         val checkpointConfig: Option[String] =
-          df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION, None)
+          df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION)
 
         checkpointConfig.map { location =>
           new Path(location, queryName).toUri.toString
@@ -334,9 +334,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
           partitionColumns = normalizedParCols.getOrElse(Nil))
 
       val queryName = extraOptions.getOrElse("queryName", 
StreamExecution.nextName)
-      val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
-        new Path(df.sparkSession.sessionState.conf.checkpointLocation, 
queryName).toUri.toString
-      })
+      val checkpointLocation = extraOptions.getOrElse("checkpointLocation",
+        new Path(df.sparkSession.sessionState.conf.checkpointLocation.get, 
queryName).toUri.toString
+      )
+
       df.sparkSession.sessionState.continuousQueryManager.startQuery(
         queryName,
         checkpointLocation,

http://git-wip-us.apache.org/repos/asf/spark/blob/5625b037/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
index 670288b..4fd6e42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
 import org.apache.spark.sql.internal.SQLConf
 
 
@@ -86,6 +86,10 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new 
SQLConf) {
     sqlConf.getConf(entry)
   }
 
+  protected[sql] def get[T](entry: OptionalConfigEntry[T]): Option[T] = {
+    sqlConf.getConf(entry)
+  }
+
   /**
    * Returns the value of Spark runtime configuration property for the given 
key.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/5625b037/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0bcf0f8..5e19984 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -546,7 +546,7 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   def optimizerInSetConversionThreshold: Int = 
getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
 
-  def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)
+  def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
 
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
 
@@ -717,12 +717,11 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   /**
    * Return the value of an optional Spark SQL configuration property for the 
given key. If the key
-   * is not set yet, throw an exception.
+   * is not set yet, returns None.
    */
-  def getConf[T](entry: OptionalConfigEntry[T]): T = {
+  def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = {
     require(sqlConfEntries.get(entry.key) == entry, s"$entry is not 
registered")
-    Option(settings.get(entry.key)).map(entry.rawValueConverter).
-      getOrElse(throw new NoSuchElementException(entry.key))
+    Option(settings.get(entry.key)).map(entry.rawValueConverter)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5625b037/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
index cc69199..95bfd05 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
@@ -153,6 +153,17 @@ class SQLConfEntrySuite extends SparkFunSuite {
     assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", 
"d", "e"))
   }
 
+  test("optionalConf") {
+    val key = "spark.sql.SQLConfEntrySuite.optional"
+    val confEntry = SQLConfigBuilder(key)
+      .stringConf
+      .createOptional
+
+    assert(conf.getConf(confEntry) === None)
+    conf.setConfString(key, "a")
+    assert(conf.getConf(confEntry) === Some("a"))
+  }
+
   test("duplicate entry") {
     val key = "spark.sql.SQLConfEntrySuite.duplicate"
     SQLConfigBuilder(key).stringConf.createOptional


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to