Repository: spark Updated Branches: refs/heads/master b120fba6a -> 01e7b9c85
[SPARK-15345][SQL][PYSPARK] SparkSession's conf doesn't take effect when this already an existing SparkContext ## What changes were proposed in this pull request? Override the existing SparkContext is the provided SparkConf is different. PySpark part hasn't been fixed yet, will do that after the first round of review to ensure this is the correct approach. ## How was this patch tested? Manually verify it in spark-shell. rxin Please help review it, I think this is a very critical issue for spark 2.0 Author: Jeff Zhang <zjf...@apache.org> Closes #13160 from zjffdu/SPARK-15345. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01e7b9c8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01e7b9c8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01e7b9c8 Branch: refs/heads/master Commit: 01e7b9c85bb84924e279021f9748774dce9702c8 Parents: b120fba Author: Jeff Zhang <zjf...@apache.org> Authored: Wed May 25 10:46:51 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Wed May 25 10:46:51 2016 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkContext.scala | 3 +++ .../scala/org/apache/spark/sql/SparkSession.scala | 14 ++++++++++++-- .../apache/spark/sql/SparkSessionBuilderSuite.scala | 14 +++++++++++++- 3 files changed, 28 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/01e7b9c8/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 36aa3be..5018eb3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2254,6 +2254,9 @@ object SparkContext extends Logging { if (activeContext.get() == null) { setActiveContext(new SparkContext(config), allowMultipleContexts = false) } + if (config.getAll.nonEmpty) { + logWarning("Use an existing SparkContext, some configuration may not take effect.") + } activeContext.get() } } http://git-wip-us.apache.org/repos/asf/spark/blob/01e7b9c8/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 5c87c84..86c97b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -636,7 +636,7 @@ object SparkSession { /** * Builder for [[SparkSession]]. */ - class Builder { + class Builder extends Logging { private[this] val options = new scala.collection.mutable.HashMap[String, String] @@ -753,6 +753,9 @@ object SparkSession { var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } + if (options.nonEmpty) { + logWarning("Use an existing SparkSession, some configuration may not take effect.") + } return session } @@ -762,6 +765,9 @@ object SparkSession { session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } + if (options.nonEmpty) { + logWarning("Use an existing SparkSession, some configuration may not take effect.") + } return session } @@ -774,7 +780,11 @@ object SparkSession { val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } - SparkContext.getOrCreate(sparkConf) + val sc = SparkContext.getOrCreate(sparkConf) + // maybe this is an existing SparkContext, update its SparkConf which maybe used + // by SparkSession + options.foreach { case (k, v) => sc.conf.set(k, v) } + sc } session = new SparkSession(sparkContext) options.foreach { case (k, v) => session.conf.set(k, v) } http://git-wip-us.apache.org/repos/asf/spark/blob/01e7b9c8/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index ec6a2b3..786956d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} /** * Test cases for the builder pattern of [[SparkSession]]. @@ -90,4 +90,16 @@ class SparkSessionBuilderSuite extends SparkFunSuite { assert(newSession != activeSession) newSession.stop() } + + test("create SparkContext first then SparkSession") { + sparkContext.stop() + val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1") + val sparkContext2 = new SparkContext(conf) + val session = SparkSession.builder().config("key2", "value2").getOrCreate() + assert(session.conf.get("key1") == "value1") + assert(session.conf.get("key2") == "value2") + assert(session.sparkContext.conf.get("key1") == "value1") + assert(session.sparkContext.conf.get("key2") == "value2") + session.stop() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org