http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala new file mode 100644 index 0000000..f966f25 --- /dev/null +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import java.io._ +import java.net.URLClassLoader + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.tools.nsc.interpreter.SparkILoop + +import com.google.common.io.Files +import org.scalatest.FunSuite +import org.apache.commons.lang3.StringEscapeUtils +import org.apache.spark.SparkContext +import org.apache.spark.util.Utils + + + +class ReplSuite extends FunSuite { + + def runInterpreter(master: String, input: String): String = { + val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath" + + val in = new BufferedReader(new StringReader(input + "\n")) + val out = new StringWriter() + val cl = getClass.getClassLoader + var paths = new ArrayBuffer[String] + if (cl.isInstanceOf[URLClassLoader]) { + val urlLoader = cl.asInstanceOf[URLClassLoader] + for (url <- urlLoader.getURLs) { + if (url.getProtocol == "file") { + paths += url.getFile + } + } + } + val classpath = paths.mkString(File.pathSeparator) + + val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH) + System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath) + + System.setProperty("spark.master", master) + val interp = { + new SparkILoop(in, new PrintWriter(out)) + } + org.apache.spark.repl.Main.interp = interp + Main.s.processArguments(List("-classpath", classpath), true) + Main.main(Array()) // call main + org.apache.spark.repl.Main.interp = null + + if (oldExecutorClasspath != null) { + System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath) + } else { + System.clearProperty(CONF_EXECUTOR_CLASSPATH) + } + return out.toString + } + + def assertContains(message: String, output: String) { + val isContain = output.contains(message) + assert(isContain, + "Interpreter output did not contain '" + message + "':\n" + output) + } + + def assertDoesNotContain(message: String, output: String) { + val isContain = output.contains(message) + assert(!isContain, + "Interpreter output contained '" + message + "':\n" + output) + } + + test("propagation of local properties") { + // A mock ILoop that doesn't install the SIGINT handler. + class ILoop(out: PrintWriter) extends SparkILoop(None, out) { + settings = new scala.tools.nsc.Settings + settings.usejavacp.value = true + org.apache.spark.repl.Main.interp = this + override def createInterpreter() { + intp = new SparkILoopInterpreter + intp.setContextClassLoader() + } + } + + val out = new StringWriter() + Main.interp = new ILoop(new PrintWriter(out)) + Main.sparkContext = new SparkContext("local", "repl-test") + Main.interp.createInterpreter() + + Main.sparkContext.setLocalProperty("someKey", "someValue") + + // Make sure the value we set in the caller to interpret is propagated in the thread that + // interprets the command. + Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")") + assert(out.toString.contains("someValue")) + + Main.sparkContext.stop() + System.clearProperty("spark.driver.port") + } + + test("simple foreach with accumulator") { + val output = runInterpreter("local", + """ + |val accum = sc.accumulator(0) + |sc.parallelize(1 to 10).foreach(x => accum += x) + |accum.value + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res1: Int = 55", output) + } + + test("external vars") { + val output = runInterpreter("local", + """ + |var v = 7 + |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) + |v = 10 + |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + } + + test("external classes") { + val output = runInterpreter("local", + """ + |class C { + |def foo = 5 + |} + |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_) + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 50", output) + } + + test("external functions") { + val output = runInterpreter("local", + """ + |def double(x: Int) = x + x + |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_) + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 110", output) + } + + test("external functions that access vars") { + val output = runInterpreter("local", + """ + |var v = 7 + |def getV() = v + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |v = 10 + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + } + + test("broadcast vars") { + // Test that the value that a broadcast var had when it was created is used, + // even if that variable is then modified in the driver program + // TODO: This doesn't actually work for arrays when we run in local mode! + val output = runInterpreter("local", + """ + |var array = new Array[Int](5) + |val broadcastArray = sc.broadcast(array) + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |array(0) = 5 + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output) + assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output) + } + + test("interacting with files") { + val tempDir = Files.createTempDir() + tempDir.deleteOnExit() + val out = new FileWriter(tempDir + "/input") + out.write("Hello world!\n") + out.write("What's up?\n") + out.write("Goodbye\n") + out.close() + val output = runInterpreter("local", + """ + |var file = sc.textFile("%s").cache() + |file.count() + |file.count() + |file.count() + """.stripMargin.format(StringEscapeUtils.escapeJava( + tempDir.getAbsolutePath + File.separator + "input"))) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Long = 3", output) + assertContains("res1: Long = 3", output) + assertContains("res2: Long = 3", output) + Utils.deleteRecursively(tempDir) + } + + test("local-cluster mode") { + val output = runInterpreter("local-cluster[1,1,512]", + """ + |var v = 7 + |def getV() = v + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |v = 10 + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |var array = new Array[Int](5) + |val broadcastArray = sc.broadcast(array) + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |array(0) = 5 + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) + assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) + } + + test("SPARK-1199 two instances of same class don't type check.") { + val output = runInterpreter("local-cluster[1,1,512]", + """ + |case class Sum(exp: String, exp2: String) + |val a = Sum("A", "B") + |def b(a: Sum): String = a match { case Sum(_, _) => "Found Sum" } + |b(a) + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + } + + test("SPARK-2452 compound statements.") { + val output = runInterpreter("local", + """ + |val x = 4 ; def f() = x + |f() + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + } + + test("SPARK-2576 importing SQLContext.createSchemaRDD.") { + // We need to use local-cluster to test this case. + val output = runInterpreter("local-cluster[1,1,512]", + """ + |val sqlContext = new org.apache.spark.sql.SQLContext(sc) + |import sqlContext.createSchemaRDD + |case class TestCaseClass(value: Int) + |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toSchemaRDD.collect + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + } + + test("SPARK-2632 importing a method from non serializable class and not using it.") { + val output = runInterpreter("local", + """ + |class TestClass() { def testMethod = 3 } + |val t = new TestClass + |import t.testMethod + |case class TestCaseClass(value: Int) + |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + } + + if (System.getenv("MESOS_NATIVE_LIBRARY") != null) { + test("running on Mesos") { + val output = runInterpreter("localquiet", + """ + |var v = 7 + |def getV() = v + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |v = 10 + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |var array = new Array[Int](5) + |val broadcastArray = sc.broadcast(array) + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |array(0) = 5 + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) + assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) + } + } + + test("collecting objects of class defined in repl") { + val output = runInterpreter("local[2]", + """ + |case class Foo(i: Int) + |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("ret: Array[Foo] = Array(Foo(1),", output) + } +}
http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/src/main/scala/org/apache/spark/repl/Main.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/Main.scala b/repl/src/main/scala/org/apache/spark/repl/Main.scala deleted file mode 100644 index 14b448d..0000000 --- a/repl/src/main/scala/org/apache/spark/repl/Main.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.repl - -import scala.collection.mutable.Set - -object Main { - private var _interp: SparkILoop = _ - - def interp = _interp - - def interp_=(i: SparkILoop) { _interp = i } - - def main(args: Array[String]) { - _interp = new SparkILoop - _interp.process(args) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala b/repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala deleted file mode 100644 index 0581694..0000000 --- a/repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.repl - -import scala.tools.nsc.{Settings, CompilerCommand} -import scala.Predef._ - -/** - * Command class enabling Spark-specific command line options (provided by - * <i>org.apache.spark.repl.SparkRunnerSettings</i>). - */ -class SparkCommandLine(args: List[String], override val settings: Settings) - extends CompilerCommand(args, settings) { - - def this(args: List[String], error: String => Unit) { - this(args, new SparkRunnerSettings(error)) - } - - def this(args: List[String]) { - this(args, str => Console.println("Error: " + str)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala deleted file mode 100644 index f8432c8..0000000 --- a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala +++ /dev/null @@ -1,114 +0,0 @@ -// scalastyle:off - -/* NSC -- new Scala compiler - * Copyright 2005-2013 LAMP/EPFL - * @author Paul Phillips - */ - -package org.apache.spark.repl - -import scala.tools.nsc._ -import scala.tools.nsc.interpreter._ - -import scala.reflect.internal.util.BatchSourceFile -import scala.tools.nsc.ast.parser.Tokens.EOF - -import org.apache.spark.Logging - -trait SparkExprTyper extends Logging { - val repl: SparkIMain - - import repl._ - import global.{ reporter => _, Import => _, _ } - import definitions._ - import syntaxAnalyzer.{ UnitParser, UnitScanner, token2name } - import naming.freshInternalVarName - - object codeParser extends { val global: repl.global.type = repl.global } with CodeHandlers[Tree] { - def applyRule[T](code: String, rule: UnitParser => T): T = { - reporter.reset() - val scanner = newUnitParser(code) - val result = rule(scanner) - - if (!reporter.hasErrors) - scanner.accept(EOF) - - result - } - - def defns(code: String) = stmts(code) collect { case x: DefTree => x } - def expr(code: String) = applyRule(code, _.expr()) - def stmts(code: String) = applyRule(code, _.templateStats()) - def stmt(code: String) = stmts(code).last // guaranteed nonempty - } - - /** Parse a line into a sequence of trees. Returns None if the input is incomplete. */ - def parse(line: String): Option[List[Tree]] = debugging(s"""parse("$line")""") { - var isIncomplete = false - reporter.withIncompleteHandler((_, _) => isIncomplete = true) { - val trees = codeParser.stmts(line) - if (reporter.hasErrors) { - Some(Nil) - } else if (isIncomplete) { - None - } else { - Some(trees) - } - } - } - // def parsesAsExpr(line: String) = { - // import codeParser._ - // (opt expr line).isDefined - // } - - def symbolOfLine(code: String): Symbol = { - def asExpr(): Symbol = { - val name = freshInternalVarName() - // Typing it with a lazy val would give us the right type, but runs - // into compiler bugs with things like existentials, so we compile it - // behind a def and strip the NullaryMethodType which wraps the expr. - val line = "def " + name + " = {\n" + code + "\n}" - - interpretSynthetic(line) match { - case IR.Success => - val sym0 = symbolOfTerm(name) - // drop NullaryMethodType - val sym = sym0.cloneSymbol setInfo afterTyper(sym0.info.finalResultType) - if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym - case _ => NoSymbol - } - } - def asDefn(): Symbol = { - val old = repl.definedSymbolList.toSet - - interpretSynthetic(code) match { - case IR.Success => - repl.definedSymbolList filterNot old match { - case Nil => NoSymbol - case sym :: Nil => sym - case syms => NoSymbol.newOverloaded(NoPrefix, syms) - } - case _ => NoSymbol - } - } - beQuietDuring(asExpr()) orElse beQuietDuring(asDefn()) - } - - private var typeOfExpressionDepth = 0 - def typeOfExpression(expr: String, silent: Boolean = true): Type = { - if (typeOfExpressionDepth > 2) { - logDebug("Terminating typeOfExpression recursion for expression: " + expr) - return NoType - } - typeOfExpressionDepth += 1 - // Don't presently have a good way to suppress undesirable success output - // while letting errors through, so it is first trying it silently: if there - // is an error, and errors are desired, then it re-evaluates non-silently - // to induce the error message. - try beSilentDuring(symbolOfLine(expr).tpe) match { - case NoType if !silent => symbolOfLine(expr).tpe // generate error - case tpe => tpe - } - finally typeOfExpressionDepth -= 1 - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala deleted file mode 100644 index 5340951..0000000 --- a/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package scala.tools.nsc - -object SparkHelper { - def explicitParentLoader(settings: Settings) = settings.explicitParentLoader -} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala deleted file mode 100644 index e56b74e..0000000 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ /dev/null @@ -1,1091 +0,0 @@ -// scalastyle:off - -/* NSC -- new Scala compiler - * Copyright 2005-2013 LAMP/EPFL - * @author Alexander Spoon - */ - -package org.apache.spark.repl - - -import java.net.URL - -import scala.reflect.io.AbstractFile -import scala.tools.nsc._ -import scala.tools.nsc.backend.JavaPlatform -import scala.tools.nsc.interpreter._ - -import scala.tools.nsc.interpreter.{Results => IR} -import Predef.{println => _, _} -import java.io.{BufferedReader, FileReader} -import java.net.URI -import java.util.concurrent.locks.ReentrantLock -import scala.sys.process.Process -import scala.tools.nsc.interpreter.session._ -import scala.util.Properties.{jdkHome, javaVersion} -import scala.tools.util.{Javap} -import scala.annotation.tailrec -import scala.collection.mutable.ListBuffer -import scala.concurrent.ops -import scala.tools.nsc.util._ -import scala.tools.nsc.interpreter._ -import scala.tools.nsc.io.{File, Directory} -import scala.reflect.NameTransformer._ -import scala.tools.nsc.util.ScalaClassLoader._ -import scala.tools.util._ -import scala.language.{implicitConversions, existentials, postfixOps} -import scala.reflect.{ClassTag, classTag} -import scala.tools.reflect.StdRuntimeTags._ - -import java.lang.{Class => jClass} -import scala.reflect.api.{Mirror, TypeCreator, Universe => ApiUniverse} - -import org.apache.spark.Logging -import org.apache.spark.SparkConf -import org.apache.spark.SparkContext -import org.apache.spark.util.Utils - -/** The Scala interactive shell. It provides a read-eval-print loop - * around the Interpreter class. - * After instantiation, clients should call the main() method. - * - * If no in0 is specified, then input will come from the console, and - * the class will attempt to provide input editing feature such as - * input history. - * - * @author Moez A. Abdel-Gawad - * @author Lex Spoon - * @version 1.2 - */ -class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, - val master: Option[String]) - extends AnyRef - with LoopCommands - with SparkILoopInit - with Logging -{ - def this(in0: BufferedReader, out: JPrintWriter, master: String) = this(Some(in0), out, Some(master)) - def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out, None) - def this() = this(None, new JPrintWriter(Console.out, true), None) - - var in: InteractiveReader = _ // the input stream from which commands come - var settings: Settings = _ - var intp: SparkIMain = _ - - @deprecated("Use `intp` instead.", "2.9.0") def interpreter = intp - @deprecated("Use `intp` instead.", "2.9.0") def interpreter_= (i: SparkIMain): Unit = intp = i - - /** Having inherited the difficult "var-ness" of the repl instance, - * I'm trying to work around it by moving operations into a class from - * which it will appear a stable prefix. - */ - private def onIntp[T](f: SparkIMain => T): T = f(intp) - - class IMainOps[T <: SparkIMain](val intp: T) { - import intp._ - import global._ - - def printAfterTyper(msg: => String) = - intp.reporter printMessage afterTyper(msg) - - /** Strip NullaryMethodType artifacts. */ - private def replInfo(sym: Symbol) = { - sym.info match { - case NullaryMethodType(restpe) if sym.isAccessor => restpe - case info => info - } - } - def echoTypeStructure(sym: Symbol) = - printAfterTyper("" + deconstruct.show(replInfo(sym))) - - def echoTypeSignature(sym: Symbol, verbose: Boolean) = { - if (verbose) SparkILoop.this.echo("// Type signature") - printAfterTyper("" + replInfo(sym)) - - if (verbose) { - SparkILoop.this.echo("\n// Internal Type structure") - echoTypeStructure(sym) - } - } - } - implicit def stabilizeIMain(intp: SparkIMain) = new IMainOps[intp.type](intp) - - /** TODO - - * -n normalize - * -l label with case class parameter names - * -c complete - leave nothing out - */ - private def typeCommandInternal(expr: String, verbose: Boolean): Result = { - onIntp { intp => - val sym = intp.symbolOfLine(expr) - if (sym.exists) intp.echoTypeSignature(sym, verbose) - else "" - } - } - - var sparkContext: SparkContext = _ - - override def echoCommandMessage(msg: String) { - intp.reporter printMessage msg - } - - // def isAsync = !settings.Yreplsync.value - def isAsync = false - // lazy val power = new Power(intp, new StdReplVals(this))(tagOfStdReplVals, classTag[StdReplVals]) - def history = in.history - - /** The context class loader at the time this object was created */ - protected val originalClassLoader = Utils.getContextOrSparkClassLoader - - // classpath entries added via :cp - var addedClasspath: String = "" - - /** A reverse list of commands to replay if the user requests a :replay */ - var replayCommandStack: List[String] = Nil - - /** A list of commands to replay if the user requests a :replay */ - def replayCommands = replayCommandStack.reverse - - /** Record a command for replay should the user request a :replay */ - def addReplay(cmd: String) = replayCommandStack ::= cmd - - def savingReplayStack[T](body: => T): T = { - val saved = replayCommandStack - try body - finally replayCommandStack = saved - } - def savingReader[T](body: => T): T = { - val saved = in - try body - finally in = saved - } - - - def sparkCleanUp(){ - echo("Stopping spark context.") - intp.beQuietDuring { - command("sc.stop()") - } - } - /** Close the interpreter and set the var to null. */ - def closeInterpreter() { - if (intp ne null) { - sparkCleanUp() - intp.close() - intp = null - } - } - - class SparkILoopInterpreter extends SparkIMain(settings, out) { - outer => - - override lazy val formatting = new Formatting { - def prompt = SparkILoop.this.prompt - } - override protected def parentClassLoader = SparkHelper.explicitParentLoader(settings).getOrElse(classOf[SparkILoop].getClassLoader) - } - - /** Create a new interpreter. */ - def createInterpreter() { - require(settings != null) - - if (addedClasspath != "") settings.classpath.append(addedClasspath) - val addedJars = - if (Utils.isWindows) { - // Strip any URI scheme prefix so we can add the correct path to the classpath - // e.g. file:/C:/my/path.jar -> C:/my/path.jar - SparkILoop.getAddedJars.map { jar => new URI(jar).getPath.stripPrefix("/") } - } else { - SparkILoop.getAddedJars - } - // work around for Scala bug - val totalClassPath = addedJars.foldLeft( - settings.classpath.value)((l, r) => ClassPath.join(l, r)) - this.settings.classpath.value = totalClassPath - - intp = new SparkILoopInterpreter - } - - /** print a friendly help message */ - def helpCommand(line: String): Result = { - if (line == "") helpSummary() - else uniqueCommand(line) match { - case Some(lc) => echo("\n" + lc.longHelp) - case _ => ambiguousError(line) - } - } - private def helpSummary() = { - val usageWidth = commands map (_.usageMsg.length) max - val formatStr = "%-" + usageWidth + "s %s %s" - - echo("All commands can be abbreviated, e.g. :he instead of :help.") - echo("Those marked with a * have more detailed help, e.g. :help imports.\n") - - commands foreach { cmd => - val star = if (cmd.hasLongHelp) "*" else " " - echo(formatStr.format(cmd.usageMsg, star, cmd.help)) - } - } - private def ambiguousError(cmd: String): Result = { - matchingCommands(cmd) match { - case Nil => echo(cmd + ": no such command. Type :help for help.") - case xs => echo(cmd + " is ambiguous: did you mean " + xs.map(":" + _.name).mkString(" or ") + "?") - } - Result(true, None) - } - private def matchingCommands(cmd: String) = commands filter (_.name startsWith cmd) - private def uniqueCommand(cmd: String): Option[LoopCommand] = { - // this lets us add commands willy-nilly and only requires enough command to disambiguate - matchingCommands(cmd) match { - case List(x) => Some(x) - // exact match OK even if otherwise appears ambiguous - case xs => xs find (_.name == cmd) - } - } - private var fallbackMode = false - - private def toggleFallbackMode() { - val old = fallbackMode - fallbackMode = !old - System.setProperty("spark.repl.fallback", fallbackMode.toString) - echo(s""" - |Switched ${if (old) "off" else "on"} fallback mode without restarting. - | If you have defined classes in the repl, it would - |be good to redefine them incase you plan to use them. If you still run - |into issues it would be good to restart the repl and turn on `:fallback` - |mode as first command. - """.stripMargin) - } - - /** Show the history */ - lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") { - override def usage = "[num]" - def defaultLines = 20 - - def apply(line: String): Result = { - if (history eq NoHistory) - return "No history available." - - val xs = words(line) - val current = history.index - val count = try xs.head.toInt catch { case _: Exception => defaultLines } - val lines = history.asStrings takeRight count - val offset = current - lines.size + 1 - - for ((line, index) <- lines.zipWithIndex) - echo("%3d %s".format(index + offset, line)) - } - } - - // When you know you are most likely breaking into the middle - // of a line being typed. This softens the blow. - protected def echoAndRefresh(msg: String) = { - echo("\n" + msg) - in.redrawLine() - } - protected def echo(msg: String) = { - out println msg - out.flush() - } - protected def echoNoNL(msg: String) = { - out print msg - out.flush() - } - - /** Search the history */ - def searchHistory(_cmdline: String) { - val cmdline = _cmdline.toLowerCase - val offset = history.index - history.size + 1 - - for ((line, index) <- history.asStrings.zipWithIndex ; if line.toLowerCase contains cmdline) - echo("%d %s".format(index + offset, line)) - } - - private var currentPrompt = Properties.shellPromptString - def setPrompt(prompt: String) = currentPrompt = prompt - /** Prompt to print when awaiting input */ - def prompt = currentPrompt - - import LoopCommand.{ cmd, nullary } - - /** Standard commands */ - lazy val standardCommands = List( - cmd("cp", "<path>", "add a jar or directory to the classpath", addClasspath), - cmd("help", "[command]", "print this summary or command-specific help", helpCommand), - historyCommand, - cmd("h?", "<string>", "search the history", searchHistory), - cmd("imports", "[name name ...]", "show import history, identifying sources of names", importsCommand), - cmd("implicits", "[-v]", "show the implicits in scope", implicitsCommand), - cmd("javap", "<path|class>", "disassemble a file or class name", javapCommand), - cmd("load", "<path>", "load and interpret a Scala file", loadCommand), - nullary("paste", "enter paste mode: all input up to ctrl-D compiled together", pasteCommand), -// nullary("power", "enable power user mode", powerCmd), - nullary("quit", "exit the repl", () => Result(false, None)), - nullary("replay", "reset execution and replay all previous commands", replay), - nullary("reset", "reset the repl to its initial state, forgetting all session entries", resetCommand), - shCommand, - nullary("silent", "disable/enable automatic printing of results", verbosity), - nullary("fallback", """ - |disable/enable advanced repl changes, these fix some issues but may introduce others. - |This mode will be removed once these fixes stablize""".stripMargin, toggleFallbackMode), - cmd("type", "[-v] <expr>", "display the type of an expression without evaluating it", typeCommand), - nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand) - ) - - /** Power user commands */ - lazy val powerCommands: List[LoopCommand] = List( - // cmd("phase", "<phase>", "set the implicit phase for power commands", phaseCommand) - ) - - // private def dumpCommand(): Result = { - // echo("" + power) - // history.asStrings takeRight 30 foreach echo - // in.redrawLine() - // } - // private def valsCommand(): Result = power.valsDescription - - private val typeTransforms = List( - "scala.collection.immutable." -> "immutable.", - "scala.collection.mutable." -> "mutable.", - "scala.collection.generic." -> "generic.", - "java.lang." -> "jl.", - "scala.runtime." -> "runtime." - ) - - private def importsCommand(line: String): Result = { - val tokens = words(line) - val handlers = intp.languageWildcardHandlers ++ intp.importHandlers - val isVerbose = tokens contains "-v" - - handlers.filterNot(_.importedSymbols.isEmpty).zipWithIndex foreach { - case (handler, idx) => - val (types, terms) = handler.importedSymbols partition (_.name.isTypeName) - val imps = handler.implicitSymbols - val found = tokens filter (handler importsSymbolNamed _) - val typeMsg = if (types.isEmpty) "" else types.size + " types" - val termMsg = if (terms.isEmpty) "" else terms.size + " terms" - val implicitMsg = if (imps.isEmpty) "" else imps.size + " are implicit" - val foundMsg = if (found.isEmpty) "" else found.mkString(" // imports: ", ", ", "") - val statsMsg = List(typeMsg, termMsg, implicitMsg) filterNot (_ == "") mkString ("(", ", ", ")") - - intp.reporter.printMessage("%2d) %-30s %s%s".format( - idx + 1, - handler.importString, - statsMsg, - foundMsg - )) - } - } - - private def implicitsCommand(line: String): Result = onIntp { intp => - import intp._ - import global._ - - def p(x: Any) = intp.reporter.printMessage("" + x) - - // If an argument is given, only show a source with that - // in its name somewhere. - val args = line split "\\s+" - val filtered = intp.implicitSymbolsBySource filter { - case (source, syms) => - (args contains "-v") || { - if (line == "") (source.fullName.toString != "scala.Predef") - else (args exists (source.name.toString contains _)) - } - } - - if (filtered.isEmpty) - return "No implicits have been imported other than those in Predef." - - filtered foreach { - case (source, syms) => - p("/* " + syms.size + " implicit members imported from " + source.fullName + " */") - - // This groups the members by where the symbol is defined - val byOwner = syms groupBy (_.owner) - val sortedOwners = byOwner.toList sortBy { case (owner, _) => afterTyper(source.info.baseClasses indexOf owner) } - - sortedOwners foreach { - case (owner, members) => - // Within each owner, we cluster results based on the final result type - // if there are more than a couple, and sort each cluster based on name. - // This is really just trying to make the 100 or so implicits imported - // by default into something readable. - val memberGroups: List[List[Symbol]] = { - val groups = members groupBy (_.tpe.finalResultType) toList - val (big, small) = groups partition (_._2.size > 3) - val xss = ( - (big sortBy (_._1.toString) map (_._2)) :+ - (small flatMap (_._2)) - ) - - xss map (xs => xs sortBy (_.name.toString)) - } - - val ownerMessage = if (owner == source) " defined in " else " inherited from " - p(" /* " + members.size + ownerMessage + owner.fullName + " */") - - memberGroups foreach { group => - group foreach (s => p(" " + intp.symbolDefString(s))) - p("") - } - } - p("") - } - } - - private def findToolsJar() = { - val jdkPath = Directory(jdkHome) - val jar = jdkPath / "lib" / "tools.jar" toFile; - - if (jar isFile) - Some(jar) - else if (jdkPath.isDirectory) - jdkPath.deepFiles find (_.name == "tools.jar") - else None - } - private def addToolsJarToLoader() = { - val cl = findToolsJar match { - case Some(tools) => ScalaClassLoader.fromURLs(Seq(tools.toURL), intp.classLoader) - case _ => intp.classLoader - } - if (Javap.isAvailable(cl)) { - logDebug(":javap available.") - cl - } - else { - logDebug(":javap unavailable: no tools.jar at " + jdkHome) - intp.classLoader - } - } - - protected def newJavap() = new JavapClass(addToolsJarToLoader(), new SparkIMain.ReplStrippingWriter(intp)) { - override def tryClass(path: String): Array[Byte] = { - val hd :: rest = path split '.' toList; - // If there are dots in the name, the first segment is the - // key to finding it. - if (rest.nonEmpty) { - intp optFlatName hd match { - case Some(flat) => - val clazz = flat :: rest mkString NAME_JOIN_STRING - val bytes = super.tryClass(clazz) - if (bytes.nonEmpty) bytes - else super.tryClass(clazz + MODULE_SUFFIX_STRING) - case _ => super.tryClass(path) - } - } - else { - // Look for Foo first, then Foo$, but if Foo$ is given explicitly, - // we have to drop the $ to find object Foo, then tack it back onto - // the end of the flattened name. - def className = intp flatName path - def moduleName = (intp flatName path.stripSuffix(MODULE_SUFFIX_STRING)) + MODULE_SUFFIX_STRING - - val bytes = super.tryClass(className) - if (bytes.nonEmpty) bytes - else super.tryClass(moduleName) - } - } - } - // private lazy val javap = substituteAndLog[Javap]("javap", NoJavap)(newJavap()) - private lazy val javap = - try newJavap() - catch { case _: Exception => null } - - // Still todo: modules. - private def typeCommand(line0: String): Result = { - line0.trim match { - case "" => ":type [-v] <expression>" - case s if s startsWith "-v " => typeCommandInternal(s stripPrefix "-v " trim, true) - case s => typeCommandInternal(s, false) - } - } - - private def warningsCommand(): Result = { - if (intp.lastWarnings.isEmpty) - "Can't find any cached warnings." - else - intp.lastWarnings foreach { case (pos, msg) => intp.reporter.warning(pos, msg) } - } - - private def javapCommand(line: String): Result = { - if (javap == null) - ":javap unavailable, no tools.jar at %s. Set JDK_HOME.".format(jdkHome) - else if (javaVersion startsWith "1.7") - ":javap not yet working with java 1.7" - else if (line == "") - ":javap [-lcsvp] [path1 path2 ...]" - else - javap(words(line)) foreach { res => - if (res.isError) return "Failed: " + res.value - else res.show() - } - } - - private def wrapCommand(line: String): Result = { - def failMsg = "Argument to :wrap must be the name of a method with signature [T](=> T): T" - onIntp { intp => - import intp._ - import global._ - - words(line) match { - case Nil => - intp.executionWrapper match { - case "" => "No execution wrapper is set." - case s => "Current execution wrapper: " + s - } - case "clear" :: Nil => - intp.executionWrapper match { - case "" => "No execution wrapper is set." - case s => intp.clearExecutionWrapper() ; "Cleared execution wrapper." - } - case wrapper :: Nil => - intp.typeOfExpression(wrapper) match { - case PolyType(List(targ), MethodType(List(arg), restpe)) => - intp setExecutionWrapper intp.pathToTerm(wrapper) - "Set wrapper to '" + wrapper + "'" - case tp => - failMsg + "\nFound: <unknown>" - } - case _ => failMsg - } - } - } - - private def pathToPhaseWrapper = intp.pathToTerm("$r") + ".phased.atCurrent" - // private def phaseCommand(name: String): Result = { - // val phased: Phased = power.phased - // import phased.NoPhaseName - - // if (name == "clear") { - // phased.set(NoPhaseName) - // intp.clearExecutionWrapper() - // "Cleared active phase." - // } - // else if (name == "") phased.get match { - // case NoPhaseName => "Usage: :phase <expr> (e.g. typer, erasure.next, erasure+3)" - // case ph => "Active phase is '%s'. (To clear, :phase clear)".format(phased.get) - // } - // else { - // val what = phased.parse(name) - // if (what.isEmpty || !phased.set(what)) - // "'" + name + "' does not appear to represent a valid phase." - // else { - // intp.setExecutionWrapper(pathToPhaseWrapper) - // val activeMessage = - // if (what.toString.length == name.length) "" + what - // else "%s (%s)".format(what, name) - - // "Active phase is now: " + activeMessage - // } - // } - // } - - /** Available commands */ - def commands: List[LoopCommand] = standardCommands /*++ ( - if (isReplPower) powerCommands else Nil - )*/ - - private val replayQuestionMessage = - """|That entry seems to have slain the compiler. Shall I replay - |your session? I can re-run each line except the last one. - |[y/n] - """.trim.stripMargin - - private def crashRecovery(ex: Throwable): Boolean = { - echo(ex.toString) - ex match { - case _: NoSuchMethodError | _: NoClassDefFoundError => - echo("\nUnrecoverable error.") - throw ex - case _ => - def fn(): Boolean = - try in.readYesOrNo(replayQuestionMessage, { echo("\nYou must enter y or n.") ; fn() }) - catch { case _: RuntimeException => false } - - if (fn()) replay() - else echo("\nAbandoning crashed session.") - } - true - } - - /** The main read-eval-print loop for the repl. It calls - * command() for each line of input, and stops when - * command() returns false. - */ - def loop() { - def readOneLine() = { - out.flush() - in readLine prompt - } - // return false if repl should exit - def processLine(line: String): Boolean = { - if (isAsync) { - if (!awaitInitialized()) return false - runThunks() - } - if (line eq null) false // assume null means EOF - else command(line) match { - case Result(false, _) => false - case Result(_, Some(finalLine)) => addReplay(finalLine) ; true - case _ => true - } - } - def innerLoop() { - val shouldContinue = try { - processLine(readOneLine()) - } catch {case t: Throwable => crashRecovery(t)} - if (shouldContinue) - innerLoop() - } - innerLoop() - } - - /** interpret all lines from a specified file */ - def interpretAllFrom(file: File) { - savingReader { - savingReplayStack { - file applyReader { reader => - in = SimpleReader(reader, out, false) - echo("Loading " + file + "...") - loop() - } - } - } - } - - /** create a new interpreter and replay the given commands */ - def replay() { - reset() - if (replayCommandStack.isEmpty) - echo("Nothing to replay.") - else for (cmd <- replayCommands) { - echo("Replaying: " + cmd) // flush because maybe cmd will have its own output - command(cmd) - echo("") - } - } - def resetCommand() { - echo("Resetting repl state.") - if (replayCommandStack.nonEmpty) { - echo("Forgetting this session history:\n") - replayCommands foreach echo - echo("") - replayCommandStack = Nil - } - if (intp.namedDefinedTerms.nonEmpty) - echo("Forgetting all expression results and named terms: " + intp.namedDefinedTerms.mkString(", ")) - if (intp.definedTypes.nonEmpty) - echo("Forgetting defined types: " + intp.definedTypes.mkString(", ")) - - reset() - } - - def reset() { - intp.reset() - // unleashAndSetPhase() - } - - /** fork a shell and run a command */ - lazy val shCommand = new LoopCommand("sh", "run a shell command (result is implicitly => List[String])") { - override def usage = "<command line>" - def apply(line: String): Result = line match { - case "" => showUsage() - case _ => - val toRun = classOf[ProcessResult].getName + "(" + string2codeQuoted(line) + ")" - intp interpret toRun - () - } - } - - def withFile(filename: String)(action: File => Unit) { - val f = File(filename) - - if (f.exists) action(f) - else echo("That file does not exist") - } - - def loadCommand(arg: String) = { - var shouldReplay: Option[String] = None - withFile(arg)(f => { - interpretAllFrom(f) - shouldReplay = Some(":load " + arg) - }) - Result(true, shouldReplay) - } - - def addAllClasspath(args: Seq[String]): Unit = { - var added = false - var totalClasspath = "" - for (arg <- args) { - val f = File(arg).normalize - if (f.exists) { - added = true - addedClasspath = ClassPath.join(addedClasspath, f.path) - totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath) - intp.addUrlsToClassPath(f.toURI.toURL) - sparkContext.addJar(f.toURI.toURL.getPath) - } - } - } - - def addClasspath(arg: String): Unit = { - val f = File(arg).normalize - if (f.exists) { - addedClasspath = ClassPath.join(addedClasspath, f.path) - intp.addUrlsToClassPath(f.toURI.toURL) - sparkContext.addJar(f.toURI.toURL.getPath) - echo("Added '%s'. Your new classpath is:\n\"%s\"".format(f.path, intp.global.classPath.asClasspathString)) - } - else echo("The path '" + f + "' doesn't seem to exist.") - } - - - def powerCmd(): Result = { - if (isReplPower) "Already in power mode." - else enablePowerMode(false) - } - - def enablePowerMode(isDuringInit: Boolean) = { - // replProps.power setValue true - // unleashAndSetPhase() - // asyncEcho(isDuringInit, power.banner) - } - // private def unleashAndSetPhase() { -// if (isReplPower) { -// // power.unleash() -// // Set the phase to "typer" -// intp beSilentDuring phaseCommand("typer") -// } -// } - - def asyncEcho(async: Boolean, msg: => String) { - if (async) asyncMessage(msg) - else echo(msg) - } - - def verbosity() = { - // val old = intp.printResults - // intp.printResults = !old - // echo("Switched " + (if (old) "off" else "on") + " result printing.") - } - - /** Run one command submitted by the user. Two values are returned: - * (1) whether to keep running, (2) the line to record for replay, - * if any. */ - def command(line: String): Result = { - if (line startsWith ":") { - val cmd = line.tail takeWhile (x => !x.isWhitespace) - uniqueCommand(cmd) match { - case Some(lc) => lc(line.tail stripPrefix cmd dropWhile (_.isWhitespace)) - case _ => ambiguousError(cmd) - } - } - else if (intp.global == null) Result(false, None) // Notice failure to create compiler - else Result(true, interpretStartingWith(line)) - } - - private def readWhile(cond: String => Boolean) = { - Iterator continually in.readLine("") takeWhile (x => x != null && cond(x)) - } - - def pasteCommand(): Result = { - echo("// Entering paste mode (ctrl-D to finish)\n") - val code = readWhile(_ => true) mkString "\n" - echo("\n// Exiting paste mode, now interpreting.\n") - intp interpret code - () - } - - private object paste extends Pasted { - val ContinueString = " | " - val PromptString = "scala> " - - def interpret(line: String): Unit = { - echo(line.trim) - intp interpret line - echo("") - } - - def transcript(start: String) = { - echo("\n// Detected repl transcript paste: ctrl-D to finish.\n") - apply(Iterator(start) ++ readWhile(_.trim != PromptString.trim)) - } - } - import paste.{ ContinueString, PromptString } - - /** Interpret expressions starting with the first line. - * Read lines until a complete compilation unit is available - * or until a syntax error has been seen. If a full unit is - * read, go ahead and interpret it. Return the full string - * to be recorded for replay, if any. - */ - def interpretStartingWith(code: String): Option[String] = { - // signal completion non-completion input has been received - in.completion.resetVerbosity() - - def reallyInterpret = { - val reallyResult = intp.interpret(code) - (reallyResult, reallyResult match { - case IR.Error => None - case IR.Success => Some(code) - case IR.Incomplete => - if (in.interactive && code.endsWith("\n\n")) { - echo("You typed two blank lines. Starting a new command.") - None - } - else in.readLine(ContinueString) match { - case null => - // we know compilation is going to fail since we're at EOF and the - // parser thinks the input is still incomplete, but since this is - // a file being read non-interactively we want to fail. So we send - // it straight to the compiler for the nice error message. - intp.compileString(code) - None - - case line => interpretStartingWith(code + "\n" + line) - } - }) - } - - /** Here we place ourselves between the user and the interpreter and examine - * the input they are ostensibly submitting. We intervene in several cases: - * - * 1) If the line starts with "scala> " it is assumed to be an interpreter paste. - * 2) If the line starts with "." (but not ".." or "./") it is treated as an invocation - * on the previous result. - * 3) If the Completion object's execute returns Some(_), we inject that value - * and avoid the interpreter, as it's likely not valid scala code. - */ - if (code == "") None - else if (!paste.running && code.trim.startsWith(PromptString)) { - paste.transcript(code) - None - } - else if (Completion.looksLikeInvocation(code) && intp.mostRecentVar != "") { - interpretStartingWith(intp.mostRecentVar + code) - } - else if (code.trim startsWith "//") { - // line comment, do nothing - None - } - else - reallyInterpret._2 - } - - // runs :load `file` on any files passed via -i - def loadFiles(settings: Settings) = settings match { - case settings: SparkRunnerSettings => - for (filename <- settings.loadfiles.value) { - val cmd = ":load " + filename - command(cmd) - addReplay(cmd) - echo("") - } - case _ => - } - - /** Tries to create a JLineReader, falling back to SimpleReader: - * unless settings or properties are such that it should start - * with SimpleReader. - */ - def chooseReader(settings: Settings): InteractiveReader = { - if (settings.Xnojline.value || Properties.isEmacsShell) - SimpleReader() - else try new SparkJLineReader( - if (settings.noCompletion.value) NoCompletion - else new SparkJLineCompletion(intp) - ) - catch { - case ex @ (_: Exception | _: NoClassDefFoundError) => - echo("Failed to created SparkJLineReader: " + ex + "\nFalling back to SimpleReader.") - SimpleReader() - } - } - - val u: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe - val m = u.runtimeMirror(Utils.getSparkClassLoader) - private def tagOfStaticClass[T: ClassTag]: u.TypeTag[T] = - u.TypeTag[T]( - m, - new TypeCreator { - def apply[U <: ApiUniverse with Singleton](m: Mirror[U]): U # Type = - m.staticClass(classTag[T].runtimeClass.getName).toTypeConstructor.asInstanceOf[U # Type] - }) - - def process(settings: Settings): Boolean = savingContextLoader { - if (getMaster() == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") - - this.settings = settings - createInterpreter() - - // sets in to some kind of reader depending on environmental cues - in = in0 match { - case Some(reader) => SimpleReader(reader, out, true) - case None => - // some post-initialization - chooseReader(settings) match { - case x: SparkJLineReader => addThunk(x.consoleReader.postInit) ; x - case x => x - } - } - lazy val tagOfSparkIMain = tagOfStaticClass[org.apache.spark.repl.SparkIMain] - // Bind intp somewhere out of the regular namespace where - // we can get at it in generated code. - addThunk(intp.quietBind(NamedParam[SparkIMain]("$intp", intp)(tagOfSparkIMain, classTag[SparkIMain]))) - addThunk({ - import scala.tools.nsc.io._ - import Properties.userHome - import scala.compat.Platform.EOL - val autorun = replProps.replAutorunCode.option flatMap (f => io.File(f).safeSlurp()) - if (autorun.isDefined) intp.quietRun(autorun.get) - }) - - addThunk(printWelcome()) - addThunk(initializeSpark()) - - // it is broken on startup; go ahead and exit - if (intp.reporter.hasErrors) - return false - - // This is about the illusion of snappiness. We call initialize() - // which spins off a separate thread, then print the prompt and try - // our best to look ready. The interlocking lazy vals tend to - // inter-deadlock, so we break the cycle with a single asynchronous - // message to an actor. - if (isAsync) { - intp initialize initializedCallback() - createAsyncListener() // listens for signal to run postInitialization - } - else { - intp.initializeSynchronous() - postInitialization() - } - // printWelcome() - - loadFiles(settings) - - try loop() - catch AbstractOrMissingHandler() - finally closeInterpreter() - - true - } - - def createSparkContext(): SparkContext = { - val execUri = System.getenv("SPARK_EXECUTOR_URI") - val jars = SparkILoop.getAddedJars - val conf = new SparkConf() - .setMaster(getMaster()) - .setAppName("Spark shell") - .setJars(jars) - .set("spark.repl.class.uri", intp.classServer.uri) - if (execUri != null) { - conf.set("spark.executor.uri", execUri) - } - sparkContext = new SparkContext(conf) - logInfo("Created spark context..") - sparkContext - } - - private def getMaster(): String = { - val master = this.master match { - case Some(m) => m - case None => - val envMaster = sys.env.get("MASTER") - val propMaster = sys.props.get("spark.master") - propMaster.orElse(envMaster).getOrElse("local[*]") - } - master - } - - /** process command-line arguments and do as they request */ - def process(args: Array[String]): Boolean = { - val command = new SparkCommandLine(args.toList, msg => echo(msg)) - def neededHelp(): String = - (if (command.settings.help.value) command.usageMsg + "\n" else "") + - (if (command.settings.Xhelp.value) command.xusageMsg + "\n" else "") - - // if they asked for no help and command is valid, we call the real main - neededHelp() match { - case "" => command.ok && process(command.settings) - case help => echoNoNL(help) ; true - } - } - - @deprecated("Use `process` instead", "2.9.0") - def main(settings: Settings): Unit = process(settings) -} - -object SparkILoop { - implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp - private def echo(msg: String) = Console println msg - - def getAddedJars: Array[String] = { - val envJars = sys.env.get("ADD_JARS") - val propJars = sys.props.get("spark.jars").flatMap { p => - if (p == "") None else Some(p) - } - val jars = propJars.orElse(envJars).getOrElse("") - Utils.resolveURIs(jars).split(",").filter(_.nonEmpty) - } - - // Designed primarily for use by test code: take a String with a - // bunch of code, and prints out a transcript of what it would look - // like if you'd just typed it into the repl. - def runForTranscript(code: String, settings: Settings): String = { - import java.io.{ BufferedReader, StringReader, OutputStreamWriter } - - stringFromStream { ostream => - Console.withOut(ostream) { - val output = new JPrintWriter(new OutputStreamWriter(ostream), true) { - override def write(str: String) = { - // completely skip continuation lines - if (str forall (ch => ch.isWhitespace || ch == '|')) () - // print a newline on empty scala prompts - else if ((str contains '\n') && (str.trim == "scala> ")) super.write("\n") - else super.write(str) - } - } - val input = new BufferedReader(new StringReader(code)) { - override def readLine(): String = { - val s = super.readLine() - // helping out by printing the line being interpreted. - if (s != null) - output.println(s) - s - } - } - val repl = new SparkILoop(input, output) - - if (settings.classpath.isDefault) - settings.classpath.value = sys.props("java.class.path") - - getAddedJars.foreach(settings.classpath.append(_)) - - repl process settings - } - } - } - - /** Creates an interpreter loop with default settings and feeds - * the given code to it as input. - */ - def run(code: String, sets: Settings = new Settings): String = { - import java.io.{ BufferedReader, StringReader, OutputStreamWriter } - - stringFromStream { ostream => - Console.withOut(ostream) { - val input = new BufferedReader(new StringReader(code)) - val output = new JPrintWriter(new OutputStreamWriter(ostream), true) - val repl = new ILoop(input, output) - - if (sets.classpath.isDefault) - sets.classpath.value = sys.props("java.class.path") - - repl process sets - } - } - } - def run(lines: List[String]): String = run(lines map (_ + "\n") mkString) -} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala deleted file mode 100644 index 7667a9c..0000000 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala +++ /dev/null @@ -1,147 +0,0 @@ -// scalastyle:off - -/* NSC -- new Scala compiler - * Copyright 2005-2013 LAMP/EPFL - * @author Paul Phillips - */ - -package org.apache.spark.repl - -import scala.tools.nsc._ -import scala.tools.nsc.interpreter._ - -import scala.reflect.internal.util.Position -import scala.util.control.Exception.ignoring -import scala.tools.nsc.util.stackTraceString - -import org.apache.spark.SPARK_VERSION - -/** - * Machinery for the asynchronous initialization of the repl. - */ -trait SparkILoopInit { - self: SparkILoop => - - /** Print a welcome message */ - def printWelcome() { - echo("""Welcome to - ____ __ - / __/__ ___ _____/ /__ - _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version %s - /_/ -""".format(SPARK_VERSION)) - import Properties._ - val welcomeMsg = "Using Scala %s (%s, Java %s)".format( - versionString, javaVmName, javaVersion) - echo(welcomeMsg) - echo("Type in expressions to have them evaluated.") - echo("Type :help for more information.") - } - - protected def asyncMessage(msg: String) { - if (isReplInfo || isReplPower) - echoAndRefresh(msg) - } - - private val initLock = new java.util.concurrent.locks.ReentrantLock() - private val initCompilerCondition = initLock.newCondition() // signal the compiler is initialized - private val initLoopCondition = initLock.newCondition() // signal the whole repl is initialized - private val initStart = System.nanoTime - - private def withLock[T](body: => T): T = { - initLock.lock() - try body - finally initLock.unlock() - } - // a condition used to ensure serial access to the compiler. - @volatile private var initIsComplete = false - @volatile private var initError: String = null - private def elapsed() = "%.3f".format((System.nanoTime - initStart).toDouble / 1000000000L) - - // the method to be called when the interpreter is initialized. - // Very important this method does nothing synchronous (i.e. do - // not try to use the interpreter) because until it returns, the - // repl's lazy val `global` is still locked. - protected def initializedCallback() = withLock(initCompilerCondition.signal()) - - // Spins off a thread which awaits a single message once the interpreter - // has been initialized. - protected def createAsyncListener() = { - io.spawn { - withLock(initCompilerCondition.await()) - asyncMessage("[info] compiler init time: " + elapsed() + " s.") - postInitialization() - } - } - - // called from main repl loop - protected def awaitInitialized(): Boolean = { - if (!initIsComplete) - withLock { while (!initIsComplete) initLoopCondition.await() } - if (initError != null) { - println(""" - |Failed to initialize the REPL due to an unexpected error. - |This is a bug, please, report it along with the error diagnostics printed below. - |%s.""".stripMargin.format(initError) - ) - false - } else true - } - // private def warningsThunks = List( - // () => intp.bind("lastWarnings", "" + typeTag[List[(Position, String)]], intp.lastWarnings _), - // ) - - protected def postInitThunks = List[Option[() => Unit]]( - Some(intp.setContextClassLoader _), - if (isReplPower) Some(() => enablePowerMode(true)) else None - ).flatten - // ++ ( - // warningsThunks - // ) - // called once after init condition is signalled - protected def postInitialization() { - try { - postInitThunks foreach (f => addThunk(f())) - runThunks() - } catch { - case ex: Throwable => - initError = stackTraceString(ex) - throw ex - } finally { - initIsComplete = true - - if (isAsync) { - asyncMessage("[info] total init time: " + elapsed() + " s.") - withLock(initLoopCondition.signal()) - } - } - } - - def initializeSpark() { - intp.beQuietDuring { - command(""" - @transient val sc = org.apache.spark.repl.Main.interp.createSparkContext(); - """) - command("import org.apache.spark.SparkContext._") - } - echo("Spark context available as sc.") - } - - // code to be executed only after the interpreter is initialized - // and the lazy val `global` can be accessed without risk of deadlock. - private var pendingThunks: List[() => Unit] = Nil - protected def addThunk(body: => Unit) = synchronized { - pendingThunks :+= (() => body) - } - protected def runThunks(): Unit = synchronized { - if (pendingThunks.nonEmpty) - logDebug("Clearing " + pendingThunks.size + " thunks.") - - while (pendingThunks.nonEmpty) { - val thunk = pendingThunks.head - pendingThunks = pendingThunks.tail - thunk() - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org