Repository: spark
Updated Branches:
  refs/heads/branch-2.0 dc85bd0a0 -> 2c1c337ba


[SPARK-15942][REPL] Unblock `:reset` command in REPL.

## What changes were proposed in this pull
(Paste from JIRA issue.)
As a follow up for SPARK-15697, I have following semantics for `:reset` command.
On `:reset` we forget all that user has done but not the initialization of 
spark. To avoid confusion or make it more clear, we show the message `spark` 
and `sc` are not erased, infact they are in same state as they were left by 
previous operations done by the user.
While doing above, somewhere I felt that this is not usually what reset means. 
But an accidental shutdown of a cluster can be very costly, so may be in that 
sense this is less surprising and still useful.

## How was this patch tested?

Manually, by calling `:reset` command, by both altering the state of 
SparkContext and creating some local variables.

Author: Prashant Sharma <prash...@apache.org>
Author: Prashant Sharma <prash...@in.ibm.com>

Closes #13661 from ScrapCodes/repl-reset-command.

(cherry picked from commit 1b3a9b966a7813e2406dfb020e83605af22f9ef3)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c1c337b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c1c337b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c1c337b

Branch: refs/heads/branch-2.0
Commit: 2c1c337ba5984b9e495b4d02bf865e56fd83ab03
Parents: dc85bd0
Author: Prashant Sharma <prash...@apache.org>
Authored: Sun Jun 19 20:12:00 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Jun 19 20:12:08 2016 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/repl/SparkILoop.scala    | 16 ++++++++++++++--
 .../scala/org/apache/spark/repl/ReplSuite.scala     |  3 ++-
 2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c1c337b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index dcf3209..2707b08 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -36,7 +36,11 @@ class SparkILoop(in0: Option[BufferedReader], out: 
JPrintWriter)
   def initializeSpark() {
     intp.beQuietDuring {
       processLine("""
-        @transient val spark = org.apache.spark.repl.Main.createSparkSession()
+        @transient val spark = if (org.apache.spark.repl.Main.sparkSession != 
null) {
+            org.apache.spark.repl.Main.sparkSession
+          } else {
+            org.apache.spark.repl.Main.createSparkSession()
+          }
         @transient val sc = {
           val _sc = spark.sparkContext
           _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI 
available at ${webUrl}"))
@@ -50,6 +54,7 @@ class SparkILoop(in0: Option[BufferedReader], out: 
JPrintWriter)
       processLine("import spark.implicits._")
       processLine("import spark.sql")
       processLine("import org.apache.spark.sql.functions._")
+      replayCommandStack = Nil // remove above commands from session history.
     }
   }
 
@@ -70,7 +75,8 @@ class SparkILoop(in0: Option[BufferedReader], out: 
JPrintWriter)
     echo("Type :help for more information.")
   }
 
-  private val blockedCommands = Set[String]("reset")
+  /** Add repl commands that needs to be blocked. e.g. reset */
+  private val blockedCommands = Set[String]()
 
   /** Standard commands */
   lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] =
@@ -88,6 +94,12 @@ class SparkILoop(in0: Option[BufferedReader], out: 
JPrintWriter)
     initializeSpark()
     super.loadFiles(settings)
   }
+
+  override def resetCommand(line: String): Unit = {
+    super.resetCommand(line)
+    initializeSpark()
+    echo("Note that after :reset, state of SparkSession and SparkContext is 
unchanged.")
+  }
 }
 
 object SparkILoop {

http://git-wip-us.apache.org/repos/asf/spark/blob/2c1c337b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 2444e93..c10db94 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -49,7 +49,8 @@ class ReplSuite extends SparkFunSuite {
 
     val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
     System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
-
+    Main.sparkContext = null
+    Main.sparkSession = null // causes recreation of SparkContext for each 
test.
     Main.conf.set("spark.master", master)
     Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new 
PrintWriter(out)))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to