Repository: spark Updated Branches: refs/heads/master 572af5027 -> 327d25fe1
[SPARK-22572][SPARK SHELL] spark-shell does not re-initialize on :replay ## What changes were proposed in this pull request? Ticket: [SPARK-22572](https://issues.apache.org/jira/browse/SPARK-22572) ## How was this patch tested? Added a new test case to `org.apache.spark.repl.ReplSuite` Author: Mark Petruska <petruska.m...@gmail.com> Closes #19791 from mpetruska/SPARK-22572. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/327d25fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/327d25fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/327d25fe Branch: refs/heads/master Commit: 327d25fe1741f62cd84097e94739f82ecb05383a Parents: 572af50 Author: Mark Petruska <petruska.m...@gmail.com> Authored: Wed Nov 22 21:35:47 2017 +0900 Committer: hyukjinkwon <gurwls...@gmail.com> Committed: Wed Nov 22 21:35:47 2017 +0900 ---------------------------------------------------------------------- .../org/apache/spark/repl/SparkILoop.scala | 75 +++++++++++--------- .../org/apache/spark/repl/SparkILoop.scala | 74 +++++++++++-------- .../scala/org/apache/spark/repl/ReplSuite.scala | 10 +++ 3 files changed, 96 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/327d25fe/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index ea279e4..3ce7cc7 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -35,40 +35,45 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) def this() = this(None, new JPrintWriter(Console.out, true)) + val initializationCommands: Seq[String] = Seq( + """ + @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { + org.apache.spark.repl.Main.sparkSession + } else { + org.apache.spark.repl.Main.createSparkSession() + } + @transient val sc = { + val _sc = spark.sparkContext + if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { + val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) + if (proxyUrl != null) { + println( + s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") + } else { + println(s"Spark Context Web UI is available at Spark Master Public URL") + } + } else { + _sc.uiWebUrl.foreach { + webUrl => println(s"Spark context Web UI available at ${webUrl}") + } + } + println("Spark context available as 'sc' " + + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") + println("Spark session available as 'spark'.") + _sc + } + """, + "import org.apache.spark.SparkContext._", + "import spark.implicits._", + "import spark.sql", + "import org.apache.spark.sql.functions._" + ) + def initializeSpark() { intp.beQuietDuring { - processLine(""" - @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { - org.apache.spark.repl.Main.sparkSession - } else { - org.apache.spark.repl.Main.createSparkSession() - } - @transient val sc = { - val _sc = spark.sparkContext - if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { - val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) - if (proxyUrl != null) { - println( - s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") - } else { - println(s"Spark Context Web UI is available at Spark Master Public URL") - } - } else { - _sc.uiWebUrl.foreach { - webUrl => println(s"Spark context Web UI available at ${webUrl}") - } - } - println("Spark context available as 'sc' " + - s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") - println("Spark session available as 'spark'.") - _sc - } - """) - processLine("import org.apache.spark.SparkContext._") - processLine("import spark.implicits._") - processLine("import spark.sql") - processLine("import org.apache.spark.sql.functions._") - replayCommandStack = Nil // remove above commands from session history. + savingReplayStack { // remove the commands from session history. + initializationCommands.foreach(processLine) + } } } @@ -107,6 +112,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) initializeSpark() echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.") } + + override def replay(): Unit = { + initializeSpark() + super.replay() + } + } object SparkILoop { http://git-wip-us.apache.org/repos/asf/spark/blob/327d25fe/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 900edd6..ffb2e5f 100644 --- a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -32,39 +32,45 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) def this() = this(None, new JPrintWriter(Console.out, true)) + val initializationCommands: Seq[String] = Seq( + """ + @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { + org.apache.spark.repl.Main.sparkSession + } else { + org.apache.spark.repl.Main.createSparkSession() + } + @transient val sc = { + val _sc = spark.sparkContext + if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { + val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) + if (proxyUrl != null) { + println( + s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") + } else { + println(s"Spark Context Web UI is available at Spark Master Public URL") + } + } else { + _sc.uiWebUrl.foreach { + webUrl => println(s"Spark context Web UI available at ${webUrl}") + } + } + println("Spark context available as 'sc' " + + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") + println("Spark session available as 'spark'.") + _sc + } + """, + "import org.apache.spark.SparkContext._", + "import spark.implicits._", + "import spark.sql", + "import org.apache.spark.sql.functions._" + ) + def initializeSpark() { intp.beQuietDuring { - command(""" - @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { - org.apache.spark.repl.Main.sparkSession - } else { - org.apache.spark.repl.Main.createSparkSession() - } - @transient val sc = { - val _sc = spark.sparkContext - if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { - val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) - if (proxyUrl != null) { - println( - s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") - } else { - println(s"Spark Context Web UI is available at Spark Master Public URL") - } - } else { - _sc.uiWebUrl.foreach { - webUrl => println(s"Spark context Web UI available at ${webUrl}") - } - } - println("Spark context available as 'sc' " + - s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") - println("Spark session available as 'spark'.") - _sc - } - """) - command("import org.apache.spark.SparkContext._") - command("import spark.implicits._") - command("import spark.sql") - command("import org.apache.spark.sql.functions._") + savingReplayStack { // remove the commands from session history. + initializationCommands.foreach(command) + } } } @@ -103,6 +109,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) initializeSpark() echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.") } + + override def replay(): Unit = { + initializeSpark() + super.replay() + } + } object SparkILoop { http://git-wip-us.apache.org/repos/asf/spark/blob/327d25fe/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala ---------------------------------------------------------------------- diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index c7ae194..905b41c 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -217,4 +217,14 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) } + + test(":replay should work correctly") { + val output = runInterpreter("local", + """ + |sc + |:replay + """.stripMargin) + assertDoesNotContain("error: not found: value sc", output) + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org