http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4e0cd6c..7bbde31 100644 --- a/pom.xml +++ b/pom.xml @@ -97,30 +97,26 @@ <module>sql/catalyst</module> <module>sql/core</module> <module>sql/hive</module> - <module>repl</module> <module>assembly</module> <module>external/twitter</module> - <module>external/kafka</module> <module>external/flume</module> <module>external/flume-sink</module> - <module>external/zeromq</module> <module>external/mqtt</module> + <module>external/zeromq</module> <module>examples</module> + <module>repl</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - + <akka.group>org.spark-project.akka</akka.group> + <akka.version>2.3.4-spark</akka.version> <java.version>1.6</java.version> <sbt.project.name>spark</sbt.project.name> - <scala.version>2.10.4</scala.version> - <scala.binary.version>2.10</scala.binary.version> <scala.macros.version>2.0.1</scala.macros.version> <mesos.version>0.18.1</mesos.version> <mesos.classifier>shaded-protobuf</mesos.classifier> - <akka.group>org.spark-project.akka</akka.group> - <akka.version>2.3.4-spark</akka.version> <slf4j.version>1.7.5</slf4j.version> <log4j.version>1.2.17</log4j.version> <hadoop.version>1.0.4</hadoop.version> @@ -137,7 +133,7 @@ <parquet.version>1.6.0rc3</parquet.version> <jblas.version>1.2.3</jblas.version> <jetty.version>8.1.14.v20131031</jetty.version> - <chill.version>0.3.6</chill.version> + <chill.version>0.5.0</chill.version> <codahale.metrics.version>3.0.0</codahale.metrics.version> <avro.version>1.7.6</avro.version> <avro.mapred.classifier></avro.mapred.classifier> @@ -146,9 +142,13 @@ <aws.kinesis.client.version>1.1.0</aws.kinesis.client.version> <commons.httpclient.version>4.2.6</commons.httpclient.version> <commons.math3.version>3.1.1</commons.math3.version> - + <test_classpath_file>${project.build.directory}/spark-test-classpath.txt</test_classpath_file> <PermGen>64m</PermGen> <MaxPermGen>512m</MaxPermGen> + <scala.version>2.10.4</scala.version> + <scala.binary.version>2.10</scala.binary.version> + <jline.version>${scala.version}</jline.version> + <jline.groupid>org.scala-lang</jline.groupid> </properties> <repositories> @@ -267,20 +267,67 @@ </snapshots> </pluginRepository> </pluginRepositories> + + <dependencies> <!-- This is a dummy dependency that is used along with the shading plug-in to create effective poms on publishing (see SPARK-3812). --> - <dependencies> <dependency> <groupId>org.spark-project.spark</groupId> <artifactId>unused</artifactId> <version>1.0.0</version> </dependency> + <!-- + This depndency has been added to provided scope as it is needed for excuting build + specific groovy scripts using gmaven+ and not required for downstream project building + with spark. + --> + <dependency> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy-all</artifactId> + <version>2.3.7</version> + <scope>provided</scope> + </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> + <groupId>${jline.groupid}</groupId> + <artifactId>jline</artifactId> + <version>${jline.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill_${scala.binary.version}</artifactId> + <version>${chill.version}</version> + <exclusions> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm-commons</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill-java</artifactId> + <version>${chill.version}</version> + <exclusions> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm-commons</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> <version>${jetty.version}</version> @@ -396,36 +443,6 @@ <version>${protobuf.version}</version> </dependency> <dependency> - <groupId>com.twitter</groupId> - <artifactId>chill_${scala.binary.version}</artifactId> - <version>${chill.version}</version> - <exclusions> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm-commons</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>chill-java</artifactId> - <version>${chill.version}</version> - <exclusions> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm-commons</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>${akka.group}</groupId> <artifactId>akka-actor_${scala.binary.version}</artifactId> <version>${akka.version}</version> @@ -514,11 +531,6 @@ </dependency> <dependency> <groupId>org.scala-lang</groupId> - <artifactId>jline</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> @@ -965,6 +977,7 @@ <spark.test.home>${session.executionRootDirectory}</spark.test.home> <spark.testing>1</spark.testing> <spark.ui.enabled>false</spark.ui.enabled> + <spark.executor.extraClassPath>${test_classpath}</spark.executor.extraClassPath> </systemProperties> </configuration> <executions> @@ -1026,6 +1039,47 @@ </pluginManagement> <plugins> + <!-- This plugin dumps the test classpath into a file --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.9</version> + <executions> + <execution> + <phase>test-compile</phase> + <goals> + <goal>build-classpath</goal> + </goals> + <configuration> + <includeScope>test</includeScope> + <outputFile>${test_classpath_file}</outputFile> + </configuration> + </execution> + </executions> + </plugin> + + <!-- This plugin reads a file into maven property. And it lets us write groovy !! --> + <plugin> + <groupId>org.codehaus.gmavenplus</groupId> + <artifactId>gmavenplus-plugin</artifactId> + <version>1.2</version> + <executions> + <execution> + <phase>process-test-classes</phase> + <goals> + <goal>execute</goal> + </goals> + <configuration> + <scripts> + <script><![CDATA[ + def file = new File(project.properties.test_classpath_file) + project.properties.test_classpath = file.getText().split().join(":") + ]]></script> + </scripts> + </configuration> + </execution> + </executions> + </plugin> <!-- The shade plug-in is used here to create effective pom's (see SPARK-3812). --> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -1335,7 +1389,7 @@ </dependencies> </profile> <profile> - <id>hive</id> + <id>hive-thriftserver</id> <activation> <activeByDefault>false</activeByDefault> </activation> @@ -1365,5 +1419,35 @@ <derby.version>10.10.1.1</derby.version> </properties> </profile> + + <profile> + <id>scala-2.10</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <scala.version>2.10.4</scala.version> + <scala.binary.version>2.10</scala.binary.version> + <jline.version>${scala.version}</jline.version> + <jline.groupid>org.scala-lang</jline.groupid> + </properties> + <modules> + <module>external/kafka</module> + </modules> + </profile> + + <profile> + <id>scala-2.11</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <properties> + <scala.version>2.11.2</scala.version> + <scala.binary.version>2.11</scala.binary.version> + <jline.version>2.12</jline.version> + <jline.groupid>jline</jline.groupid> + </properties> + </profile> + </profiles> </project>
http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 351e57a..492607d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -31,8 +31,8 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, - sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka, - streamingMqtt, streamingTwitter, streamingZeromq) = + sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka, + streamingMqtt, streamingTwitter, streamingZeromq) = Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", @@ -68,8 +68,8 @@ object SparkBuild extends PomBuild { profiles ++= Seq("spark-ganglia-lgpl") } if (Properties.envOrNone("SPARK_HIVE").isDefined) { - println("NOTE: SPARK_HIVE is deprecated, please use -Phive flag.") - profiles ++= Seq("hive") + println("NOTE: SPARK_HIVE is deprecated, please use -Phive and -Phive-thriftserver flags.") + profiles ++= Seq("hive", "hive-thriftserver") } Properties.envOrNone("SPARK_HADOOP_VERSION") match { case Some(v) => @@ -91,13 +91,21 @@ object SparkBuild extends PomBuild { profiles } - override val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match { + override val profiles = { + val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match { case None => backwardCompatibility case Some(v) => if (backwardCompatibility.nonEmpty) println("Note: We ignore environment variables, when use of profile is detected in " + "conjunction with environment variable.") v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq + } + if (profiles.exists(_.contains("scala-"))) { + profiles + } else { + println("Enabled default scala profile") + profiles ++ Seq("scala-2.10") + } } Properties.envOrNone("SBT_MAVEN_PROPERTIES") match { @@ -136,7 +144,8 @@ object SparkBuild extends PomBuild { // Note ordering of these settings matter. /* Enable shared settings on all projects */ - (allProjects ++ optionallyEnabledProjects ++ assemblyProjects).foreach(enable(sharedSettings)) + (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ Seq(spark, tools)) + .foreach(enable(sharedSettings ++ ExludedDependencies.settings)) /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) @@ -179,6 +188,16 @@ object Flume { } /** + This excludes library dependencies in sbt, which are specified in maven but are + not needed by sbt build. + */ +object ExludedDependencies { + lazy val settings = Seq( + libraryDependencies ~= { libs => libs.filterNot(_.name == "groovy-all") } + ) +} + +/** * Following project only exists to pull previous artifacts of Spark for generating * Mima ignores. For more information see: SPARK 2071 */ @@ -353,8 +372,11 @@ object TestSettings { .map { case (k,v) => s"-D$k=$v" }.toSeq, javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g" .split(" ").toSeq, + // This places test scope jars on the classpath of executors during tests. + javaOptions in Test += + "-Dspark.executor.extraClassPath=" + (fullClasspath in Test).value.files. + map(_.getAbsolutePath).mkString(":").stripSuffix(":"), javaOptions += "-Xmx3g", - // Show full stack trace and duration in test cases. testOptions in Test += Tests.Argument("-oDF"), testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/project/project/SparkPluginBuild.scala ---------------------------------------------------------------------- diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala index 3ef2d54..8863f27 100644 --- a/project/project/SparkPluginBuild.scala +++ b/project/project/SparkPluginBuild.scala @@ -26,7 +26,7 @@ import sbt.Keys._ object SparkPluginDef extends Build { lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle, sbtPomReader) lazy val sparkStyle = Project("spark-style", file("spark-style"), settings = styleSettings) - lazy val sbtPomReader = uri("https://github.com/ScrapCodes/sbt-pom-reader.git") + lazy val sbtPomReader = uri("https://github.com/ScrapCodes/sbt-pom-reader.git#ignore_artifact_id") // There is actually no need to publish this artifact. def styleSettings = Defaults.defaultSettings ++ Seq ( http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/pom.xml ---------------------------------------------------------------------- diff --git a/repl/pom.xml b/repl/pom.xml index af528c8..bd688c8 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -39,6 +39,11 @@ <dependencies> <dependency> + <groupId>${jline.groupid}</groupId> + <artifactId>jline</artifactId> + <version>${jline.version}</version> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${project.version}</version> @@ -76,11 +81,6 @@ <version>${scala.version}</version> </dependency> <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>jline</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>jul-to-slf4j</artifactId> </dependency> @@ -124,4 +124,84 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>scala-2.10</id> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-scala-sources</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + <source>scala-2.10/src/main/scala</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-scala-test-sources</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + <source>scala-2.10/src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>scala-2.11</id> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-scala-sources</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + <source>scala-2.11/src/main/scala</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-scala-test-sources</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + <source>scala-2.11/src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala new file mode 100644 index 0000000..14b448d --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import scala.collection.mutable.Set + +object Main { + private var _interp: SparkILoop = _ + + def interp = _interp + + def interp_=(i: SparkILoop) { _interp = i } + + def main(args: Array[String]) { + _interp = new SparkILoop + _interp.process(args) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala new file mode 100644 index 0000000..0581694 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import scala.tools.nsc.{Settings, CompilerCommand} +import scala.Predef._ + +/** + * Command class enabling Spark-specific command line options (provided by + * <i>org.apache.spark.repl.SparkRunnerSettings</i>). + */ +class SparkCommandLine(args: List[String], override val settings: Settings) + extends CompilerCommand(args, settings) { + + def this(args: List[String], error: String => Unit) { + this(args, new SparkRunnerSettings(error)) + } + + def this(args: List[String]) { + this(args, str => Console.println("Error: " + str)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala new file mode 100644 index 0000000..f8432c8 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala @@ -0,0 +1,114 @@ +// scalastyle:off + +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Paul Phillips + */ + +package org.apache.spark.repl + +import scala.tools.nsc._ +import scala.tools.nsc.interpreter._ + +import scala.reflect.internal.util.BatchSourceFile +import scala.tools.nsc.ast.parser.Tokens.EOF + +import org.apache.spark.Logging + +trait SparkExprTyper extends Logging { + val repl: SparkIMain + + import repl._ + import global.{ reporter => _, Import => _, _ } + import definitions._ + import syntaxAnalyzer.{ UnitParser, UnitScanner, token2name } + import naming.freshInternalVarName + + object codeParser extends { val global: repl.global.type = repl.global } with CodeHandlers[Tree] { + def applyRule[T](code: String, rule: UnitParser => T): T = { + reporter.reset() + val scanner = newUnitParser(code) + val result = rule(scanner) + + if (!reporter.hasErrors) + scanner.accept(EOF) + + result + } + + def defns(code: String) = stmts(code) collect { case x: DefTree => x } + def expr(code: String) = applyRule(code, _.expr()) + def stmts(code: String) = applyRule(code, _.templateStats()) + def stmt(code: String) = stmts(code).last // guaranteed nonempty + } + + /** Parse a line into a sequence of trees. Returns None if the input is incomplete. */ + def parse(line: String): Option[List[Tree]] = debugging(s"""parse("$line")""") { + var isIncomplete = false + reporter.withIncompleteHandler((_, _) => isIncomplete = true) { + val trees = codeParser.stmts(line) + if (reporter.hasErrors) { + Some(Nil) + } else if (isIncomplete) { + None + } else { + Some(trees) + } + } + } + // def parsesAsExpr(line: String) = { + // import codeParser._ + // (opt expr line).isDefined + // } + + def symbolOfLine(code: String): Symbol = { + def asExpr(): Symbol = { + val name = freshInternalVarName() + // Typing it with a lazy val would give us the right type, but runs + // into compiler bugs with things like existentials, so we compile it + // behind a def and strip the NullaryMethodType which wraps the expr. + val line = "def " + name + " = {\n" + code + "\n}" + + interpretSynthetic(line) match { + case IR.Success => + val sym0 = symbolOfTerm(name) + // drop NullaryMethodType + val sym = sym0.cloneSymbol setInfo afterTyper(sym0.info.finalResultType) + if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym + case _ => NoSymbol + } + } + def asDefn(): Symbol = { + val old = repl.definedSymbolList.toSet + + interpretSynthetic(code) match { + case IR.Success => + repl.definedSymbolList filterNot old match { + case Nil => NoSymbol + case sym :: Nil => sym + case syms => NoSymbol.newOverloaded(NoPrefix, syms) + } + case _ => NoSymbol + } + } + beQuietDuring(asExpr()) orElse beQuietDuring(asDefn()) + } + + private var typeOfExpressionDepth = 0 + def typeOfExpression(expr: String, silent: Boolean = true): Type = { + if (typeOfExpressionDepth > 2) { + logDebug("Terminating typeOfExpression recursion for expression: " + expr) + return NoType + } + typeOfExpressionDepth += 1 + // Don't presently have a good way to suppress undesirable success output + // while letting errors through, so it is first trying it silently: if there + // is an error, and errors are desired, then it re-evaluates non-silently + // to induce the error message. + try beSilentDuring(symbolOfLine(expr).tpe) match { + case NoType if !silent => symbolOfLine(expr).tpe // generate error + case tpe => tpe + } + finally typeOfExpressionDepth -= 1 + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala new file mode 100644 index 0000000..5340951 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package scala.tools.nsc + +object SparkHelper { + def explicitParentLoader(settings: Settings) = settings.explicitParentLoader +} http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala new file mode 100644 index 0000000..e56b74e --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -0,0 +1,1091 @@ +// scalastyle:off + +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Alexander Spoon + */ + +package org.apache.spark.repl + + +import java.net.URL + +import scala.reflect.io.AbstractFile +import scala.tools.nsc._ +import scala.tools.nsc.backend.JavaPlatform +import scala.tools.nsc.interpreter._ + +import scala.tools.nsc.interpreter.{Results => IR} +import Predef.{println => _, _} +import java.io.{BufferedReader, FileReader} +import java.net.URI +import java.util.concurrent.locks.ReentrantLock +import scala.sys.process.Process +import scala.tools.nsc.interpreter.session._ +import scala.util.Properties.{jdkHome, javaVersion} +import scala.tools.util.{Javap} +import scala.annotation.tailrec +import scala.collection.mutable.ListBuffer +import scala.concurrent.ops +import scala.tools.nsc.util._ +import scala.tools.nsc.interpreter._ +import scala.tools.nsc.io.{File, Directory} +import scala.reflect.NameTransformer._ +import scala.tools.nsc.util.ScalaClassLoader._ +import scala.tools.util._ +import scala.language.{implicitConversions, existentials, postfixOps} +import scala.reflect.{ClassTag, classTag} +import scala.tools.reflect.StdRuntimeTags._ + +import java.lang.{Class => jClass} +import scala.reflect.api.{Mirror, TypeCreator, Universe => ApiUniverse} + +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.apache.spark.util.Utils + +/** The Scala interactive shell. It provides a read-eval-print loop + * around the Interpreter class. + * After instantiation, clients should call the main() method. + * + * If no in0 is specified, then input will come from the console, and + * the class will attempt to provide input editing feature such as + * input history. + * + * @author Moez A. Abdel-Gawad + * @author Lex Spoon + * @version 1.2 + */ +class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, + val master: Option[String]) + extends AnyRef + with LoopCommands + with SparkILoopInit + with Logging +{ + def this(in0: BufferedReader, out: JPrintWriter, master: String) = this(Some(in0), out, Some(master)) + def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out, None) + def this() = this(None, new JPrintWriter(Console.out, true), None) + + var in: InteractiveReader = _ // the input stream from which commands come + var settings: Settings = _ + var intp: SparkIMain = _ + + @deprecated("Use `intp` instead.", "2.9.0") def interpreter = intp + @deprecated("Use `intp` instead.", "2.9.0") def interpreter_= (i: SparkIMain): Unit = intp = i + + /** Having inherited the difficult "var-ness" of the repl instance, + * I'm trying to work around it by moving operations into a class from + * which it will appear a stable prefix. + */ + private def onIntp[T](f: SparkIMain => T): T = f(intp) + + class IMainOps[T <: SparkIMain](val intp: T) { + import intp._ + import global._ + + def printAfterTyper(msg: => String) = + intp.reporter printMessage afterTyper(msg) + + /** Strip NullaryMethodType artifacts. */ + private def replInfo(sym: Symbol) = { + sym.info match { + case NullaryMethodType(restpe) if sym.isAccessor => restpe + case info => info + } + } + def echoTypeStructure(sym: Symbol) = + printAfterTyper("" + deconstruct.show(replInfo(sym))) + + def echoTypeSignature(sym: Symbol, verbose: Boolean) = { + if (verbose) SparkILoop.this.echo("// Type signature") + printAfterTyper("" + replInfo(sym)) + + if (verbose) { + SparkILoop.this.echo("\n// Internal Type structure") + echoTypeStructure(sym) + } + } + } + implicit def stabilizeIMain(intp: SparkIMain) = new IMainOps[intp.type](intp) + + /** TODO - + * -n normalize + * -l label with case class parameter names + * -c complete - leave nothing out + */ + private def typeCommandInternal(expr: String, verbose: Boolean): Result = { + onIntp { intp => + val sym = intp.symbolOfLine(expr) + if (sym.exists) intp.echoTypeSignature(sym, verbose) + else "" + } + } + + var sparkContext: SparkContext = _ + + override def echoCommandMessage(msg: String) { + intp.reporter printMessage msg + } + + // def isAsync = !settings.Yreplsync.value + def isAsync = false + // lazy val power = new Power(intp, new StdReplVals(this))(tagOfStdReplVals, classTag[StdReplVals]) + def history = in.history + + /** The context class loader at the time this object was created */ + protected val originalClassLoader = Utils.getContextOrSparkClassLoader + + // classpath entries added via :cp + var addedClasspath: String = "" + + /** A reverse list of commands to replay if the user requests a :replay */ + var replayCommandStack: List[String] = Nil + + /** A list of commands to replay if the user requests a :replay */ + def replayCommands = replayCommandStack.reverse + + /** Record a command for replay should the user request a :replay */ + def addReplay(cmd: String) = replayCommandStack ::= cmd + + def savingReplayStack[T](body: => T): T = { + val saved = replayCommandStack + try body + finally replayCommandStack = saved + } + def savingReader[T](body: => T): T = { + val saved = in + try body + finally in = saved + } + + + def sparkCleanUp(){ + echo("Stopping spark context.") + intp.beQuietDuring { + command("sc.stop()") + } + } + /** Close the interpreter and set the var to null. */ + def closeInterpreter() { + if (intp ne null) { + sparkCleanUp() + intp.close() + intp = null + } + } + + class SparkILoopInterpreter extends SparkIMain(settings, out) { + outer => + + override lazy val formatting = new Formatting { + def prompt = SparkILoop.this.prompt + } + override protected def parentClassLoader = SparkHelper.explicitParentLoader(settings).getOrElse(classOf[SparkILoop].getClassLoader) + } + + /** Create a new interpreter. */ + def createInterpreter() { + require(settings != null) + + if (addedClasspath != "") settings.classpath.append(addedClasspath) + val addedJars = + if (Utils.isWindows) { + // Strip any URI scheme prefix so we can add the correct path to the classpath + // e.g. file:/C:/my/path.jar -> C:/my/path.jar + SparkILoop.getAddedJars.map { jar => new URI(jar).getPath.stripPrefix("/") } + } else { + SparkILoop.getAddedJars + } + // work around for Scala bug + val totalClassPath = addedJars.foldLeft( + settings.classpath.value)((l, r) => ClassPath.join(l, r)) + this.settings.classpath.value = totalClassPath + + intp = new SparkILoopInterpreter + } + + /** print a friendly help message */ + def helpCommand(line: String): Result = { + if (line == "") helpSummary() + else uniqueCommand(line) match { + case Some(lc) => echo("\n" + lc.longHelp) + case _ => ambiguousError(line) + } + } + private def helpSummary() = { + val usageWidth = commands map (_.usageMsg.length) max + val formatStr = "%-" + usageWidth + "s %s %s" + + echo("All commands can be abbreviated, e.g. :he instead of :help.") + echo("Those marked with a * have more detailed help, e.g. :help imports.\n") + + commands foreach { cmd => + val star = if (cmd.hasLongHelp) "*" else " " + echo(formatStr.format(cmd.usageMsg, star, cmd.help)) + } + } + private def ambiguousError(cmd: String): Result = { + matchingCommands(cmd) match { + case Nil => echo(cmd + ": no such command. Type :help for help.") + case xs => echo(cmd + " is ambiguous: did you mean " + xs.map(":" + _.name).mkString(" or ") + "?") + } + Result(true, None) + } + private def matchingCommands(cmd: String) = commands filter (_.name startsWith cmd) + private def uniqueCommand(cmd: String): Option[LoopCommand] = { + // this lets us add commands willy-nilly and only requires enough command to disambiguate + matchingCommands(cmd) match { + case List(x) => Some(x) + // exact match OK even if otherwise appears ambiguous + case xs => xs find (_.name == cmd) + } + } + private var fallbackMode = false + + private def toggleFallbackMode() { + val old = fallbackMode + fallbackMode = !old + System.setProperty("spark.repl.fallback", fallbackMode.toString) + echo(s""" + |Switched ${if (old) "off" else "on"} fallback mode without restarting. + | If you have defined classes in the repl, it would + |be good to redefine them incase you plan to use them. If you still run + |into issues it would be good to restart the repl and turn on `:fallback` + |mode as first command. + """.stripMargin) + } + + /** Show the history */ + lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") { + override def usage = "[num]" + def defaultLines = 20 + + def apply(line: String): Result = { + if (history eq NoHistory) + return "No history available." + + val xs = words(line) + val current = history.index + val count = try xs.head.toInt catch { case _: Exception => defaultLines } + val lines = history.asStrings takeRight count + val offset = current - lines.size + 1 + + for ((line, index) <- lines.zipWithIndex) + echo("%3d %s".format(index + offset, line)) + } + } + + // When you know you are most likely breaking into the middle + // of a line being typed. This softens the blow. + protected def echoAndRefresh(msg: String) = { + echo("\n" + msg) + in.redrawLine() + } + protected def echo(msg: String) = { + out println msg + out.flush() + } + protected def echoNoNL(msg: String) = { + out print msg + out.flush() + } + + /** Search the history */ + def searchHistory(_cmdline: String) { + val cmdline = _cmdline.toLowerCase + val offset = history.index - history.size + 1 + + for ((line, index) <- history.asStrings.zipWithIndex ; if line.toLowerCase contains cmdline) + echo("%d %s".format(index + offset, line)) + } + + private var currentPrompt = Properties.shellPromptString + def setPrompt(prompt: String) = currentPrompt = prompt + /** Prompt to print when awaiting input */ + def prompt = currentPrompt + + import LoopCommand.{ cmd, nullary } + + /** Standard commands */ + lazy val standardCommands = List( + cmd("cp", "<path>", "add a jar or directory to the classpath", addClasspath), + cmd("help", "[command]", "print this summary or command-specific help", helpCommand), + historyCommand, + cmd("h?", "<string>", "search the history", searchHistory), + cmd("imports", "[name name ...]", "show import history, identifying sources of names", importsCommand), + cmd("implicits", "[-v]", "show the implicits in scope", implicitsCommand), + cmd("javap", "<path|class>", "disassemble a file or class name", javapCommand), + cmd("load", "<path>", "load and interpret a Scala file", loadCommand), + nullary("paste", "enter paste mode: all input up to ctrl-D compiled together", pasteCommand), +// nullary("power", "enable power user mode", powerCmd), + nullary("quit", "exit the repl", () => Result(false, None)), + nullary("replay", "reset execution and replay all previous commands", replay), + nullary("reset", "reset the repl to its initial state, forgetting all session entries", resetCommand), + shCommand, + nullary("silent", "disable/enable automatic printing of results", verbosity), + nullary("fallback", """ + |disable/enable advanced repl changes, these fix some issues but may introduce others. + |This mode will be removed once these fixes stablize""".stripMargin, toggleFallbackMode), + cmd("type", "[-v] <expr>", "display the type of an expression without evaluating it", typeCommand), + nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand) + ) + + /** Power user commands */ + lazy val powerCommands: List[LoopCommand] = List( + // cmd("phase", "<phase>", "set the implicit phase for power commands", phaseCommand) + ) + + // private def dumpCommand(): Result = { + // echo("" + power) + // history.asStrings takeRight 30 foreach echo + // in.redrawLine() + // } + // private def valsCommand(): Result = power.valsDescription + + private val typeTransforms = List( + "scala.collection.immutable." -> "immutable.", + "scala.collection.mutable." -> "mutable.", + "scala.collection.generic." -> "generic.", + "java.lang." -> "jl.", + "scala.runtime." -> "runtime." + ) + + private def importsCommand(line: String): Result = { + val tokens = words(line) + val handlers = intp.languageWildcardHandlers ++ intp.importHandlers + val isVerbose = tokens contains "-v" + + handlers.filterNot(_.importedSymbols.isEmpty).zipWithIndex foreach { + case (handler, idx) => + val (types, terms) = handler.importedSymbols partition (_.name.isTypeName) + val imps = handler.implicitSymbols + val found = tokens filter (handler importsSymbolNamed _) + val typeMsg = if (types.isEmpty) "" else types.size + " types" + val termMsg = if (terms.isEmpty) "" else terms.size + " terms" + val implicitMsg = if (imps.isEmpty) "" else imps.size + " are implicit" + val foundMsg = if (found.isEmpty) "" else found.mkString(" // imports: ", ", ", "") + val statsMsg = List(typeMsg, termMsg, implicitMsg) filterNot (_ == "") mkString ("(", ", ", ")") + + intp.reporter.printMessage("%2d) %-30s %s%s".format( + idx + 1, + handler.importString, + statsMsg, + foundMsg + )) + } + } + + private def implicitsCommand(line: String): Result = onIntp { intp => + import intp._ + import global._ + + def p(x: Any) = intp.reporter.printMessage("" + x) + + // If an argument is given, only show a source with that + // in its name somewhere. + val args = line split "\\s+" + val filtered = intp.implicitSymbolsBySource filter { + case (source, syms) => + (args contains "-v") || { + if (line == "") (source.fullName.toString != "scala.Predef") + else (args exists (source.name.toString contains _)) + } + } + + if (filtered.isEmpty) + return "No implicits have been imported other than those in Predef." + + filtered foreach { + case (source, syms) => + p("/* " + syms.size + " implicit members imported from " + source.fullName + " */") + + // This groups the members by where the symbol is defined + val byOwner = syms groupBy (_.owner) + val sortedOwners = byOwner.toList sortBy { case (owner, _) => afterTyper(source.info.baseClasses indexOf owner) } + + sortedOwners foreach { + case (owner, members) => + // Within each owner, we cluster results based on the final result type + // if there are more than a couple, and sort each cluster based on name. + // This is really just trying to make the 100 or so implicits imported + // by default into something readable. + val memberGroups: List[List[Symbol]] = { + val groups = members groupBy (_.tpe.finalResultType) toList + val (big, small) = groups partition (_._2.size > 3) + val xss = ( + (big sortBy (_._1.toString) map (_._2)) :+ + (small flatMap (_._2)) + ) + + xss map (xs => xs sortBy (_.name.toString)) + } + + val ownerMessage = if (owner == source) " defined in " else " inherited from " + p(" /* " + members.size + ownerMessage + owner.fullName + " */") + + memberGroups foreach { group => + group foreach (s => p(" " + intp.symbolDefString(s))) + p("") + } + } + p("") + } + } + + private def findToolsJar() = { + val jdkPath = Directory(jdkHome) + val jar = jdkPath / "lib" / "tools.jar" toFile; + + if (jar isFile) + Some(jar) + else if (jdkPath.isDirectory) + jdkPath.deepFiles find (_.name == "tools.jar") + else None + } + private def addToolsJarToLoader() = { + val cl = findToolsJar match { + case Some(tools) => ScalaClassLoader.fromURLs(Seq(tools.toURL), intp.classLoader) + case _ => intp.classLoader + } + if (Javap.isAvailable(cl)) { + logDebug(":javap available.") + cl + } + else { + logDebug(":javap unavailable: no tools.jar at " + jdkHome) + intp.classLoader + } + } + + protected def newJavap() = new JavapClass(addToolsJarToLoader(), new SparkIMain.ReplStrippingWriter(intp)) { + override def tryClass(path: String): Array[Byte] = { + val hd :: rest = path split '.' toList; + // If there are dots in the name, the first segment is the + // key to finding it. + if (rest.nonEmpty) { + intp optFlatName hd match { + case Some(flat) => + val clazz = flat :: rest mkString NAME_JOIN_STRING + val bytes = super.tryClass(clazz) + if (bytes.nonEmpty) bytes + else super.tryClass(clazz + MODULE_SUFFIX_STRING) + case _ => super.tryClass(path) + } + } + else { + // Look for Foo first, then Foo$, but if Foo$ is given explicitly, + // we have to drop the $ to find object Foo, then tack it back onto + // the end of the flattened name. + def className = intp flatName path + def moduleName = (intp flatName path.stripSuffix(MODULE_SUFFIX_STRING)) + MODULE_SUFFIX_STRING + + val bytes = super.tryClass(className) + if (bytes.nonEmpty) bytes + else super.tryClass(moduleName) + } + } + } + // private lazy val javap = substituteAndLog[Javap]("javap", NoJavap)(newJavap()) + private lazy val javap = + try newJavap() + catch { case _: Exception => null } + + // Still todo: modules. + private def typeCommand(line0: String): Result = { + line0.trim match { + case "" => ":type [-v] <expression>" + case s if s startsWith "-v " => typeCommandInternal(s stripPrefix "-v " trim, true) + case s => typeCommandInternal(s, false) + } + } + + private def warningsCommand(): Result = { + if (intp.lastWarnings.isEmpty) + "Can't find any cached warnings." + else + intp.lastWarnings foreach { case (pos, msg) => intp.reporter.warning(pos, msg) } + } + + private def javapCommand(line: String): Result = { + if (javap == null) + ":javap unavailable, no tools.jar at %s. Set JDK_HOME.".format(jdkHome) + else if (javaVersion startsWith "1.7") + ":javap not yet working with java 1.7" + else if (line == "") + ":javap [-lcsvp] [path1 path2 ...]" + else + javap(words(line)) foreach { res => + if (res.isError) return "Failed: " + res.value + else res.show() + } + } + + private def wrapCommand(line: String): Result = { + def failMsg = "Argument to :wrap must be the name of a method with signature [T](=> T): T" + onIntp { intp => + import intp._ + import global._ + + words(line) match { + case Nil => + intp.executionWrapper match { + case "" => "No execution wrapper is set." + case s => "Current execution wrapper: " + s + } + case "clear" :: Nil => + intp.executionWrapper match { + case "" => "No execution wrapper is set." + case s => intp.clearExecutionWrapper() ; "Cleared execution wrapper." + } + case wrapper :: Nil => + intp.typeOfExpression(wrapper) match { + case PolyType(List(targ), MethodType(List(arg), restpe)) => + intp setExecutionWrapper intp.pathToTerm(wrapper) + "Set wrapper to '" + wrapper + "'" + case tp => + failMsg + "\nFound: <unknown>" + } + case _ => failMsg + } + } + } + + private def pathToPhaseWrapper = intp.pathToTerm("$r") + ".phased.atCurrent" + // private def phaseCommand(name: String): Result = { + // val phased: Phased = power.phased + // import phased.NoPhaseName + + // if (name == "clear") { + // phased.set(NoPhaseName) + // intp.clearExecutionWrapper() + // "Cleared active phase." + // } + // else if (name == "") phased.get match { + // case NoPhaseName => "Usage: :phase <expr> (e.g. typer, erasure.next, erasure+3)" + // case ph => "Active phase is '%s'. (To clear, :phase clear)".format(phased.get) + // } + // else { + // val what = phased.parse(name) + // if (what.isEmpty || !phased.set(what)) + // "'" + name + "' does not appear to represent a valid phase." + // else { + // intp.setExecutionWrapper(pathToPhaseWrapper) + // val activeMessage = + // if (what.toString.length == name.length) "" + what + // else "%s (%s)".format(what, name) + + // "Active phase is now: " + activeMessage + // } + // } + // } + + /** Available commands */ + def commands: List[LoopCommand] = standardCommands /*++ ( + if (isReplPower) powerCommands else Nil + )*/ + + private val replayQuestionMessage = + """|That entry seems to have slain the compiler. Shall I replay + |your session? I can re-run each line except the last one. + |[y/n] + """.trim.stripMargin + + private def crashRecovery(ex: Throwable): Boolean = { + echo(ex.toString) + ex match { + case _: NoSuchMethodError | _: NoClassDefFoundError => + echo("\nUnrecoverable error.") + throw ex + case _ => + def fn(): Boolean = + try in.readYesOrNo(replayQuestionMessage, { echo("\nYou must enter y or n.") ; fn() }) + catch { case _: RuntimeException => false } + + if (fn()) replay() + else echo("\nAbandoning crashed session.") + } + true + } + + /** The main read-eval-print loop for the repl. It calls + * command() for each line of input, and stops when + * command() returns false. + */ + def loop() { + def readOneLine() = { + out.flush() + in readLine prompt + } + // return false if repl should exit + def processLine(line: String): Boolean = { + if (isAsync) { + if (!awaitInitialized()) return false + runThunks() + } + if (line eq null) false // assume null means EOF + else command(line) match { + case Result(false, _) => false + case Result(_, Some(finalLine)) => addReplay(finalLine) ; true + case _ => true + } + } + def innerLoop() { + val shouldContinue = try { + processLine(readOneLine()) + } catch {case t: Throwable => crashRecovery(t)} + if (shouldContinue) + innerLoop() + } + innerLoop() + } + + /** interpret all lines from a specified file */ + def interpretAllFrom(file: File) { + savingReader { + savingReplayStack { + file applyReader { reader => + in = SimpleReader(reader, out, false) + echo("Loading " + file + "...") + loop() + } + } + } + } + + /** create a new interpreter and replay the given commands */ + def replay() { + reset() + if (replayCommandStack.isEmpty) + echo("Nothing to replay.") + else for (cmd <- replayCommands) { + echo("Replaying: " + cmd) // flush because maybe cmd will have its own output + command(cmd) + echo("") + } + } + def resetCommand() { + echo("Resetting repl state.") + if (replayCommandStack.nonEmpty) { + echo("Forgetting this session history:\n") + replayCommands foreach echo + echo("") + replayCommandStack = Nil + } + if (intp.namedDefinedTerms.nonEmpty) + echo("Forgetting all expression results and named terms: " + intp.namedDefinedTerms.mkString(", ")) + if (intp.definedTypes.nonEmpty) + echo("Forgetting defined types: " + intp.definedTypes.mkString(", ")) + + reset() + } + + def reset() { + intp.reset() + // unleashAndSetPhase() + } + + /** fork a shell and run a command */ + lazy val shCommand = new LoopCommand("sh", "run a shell command (result is implicitly => List[String])") { + override def usage = "<command line>" + def apply(line: String): Result = line match { + case "" => showUsage() + case _ => + val toRun = classOf[ProcessResult].getName + "(" + string2codeQuoted(line) + ")" + intp interpret toRun + () + } + } + + def withFile(filename: String)(action: File => Unit) { + val f = File(filename) + + if (f.exists) action(f) + else echo("That file does not exist") + } + + def loadCommand(arg: String) = { + var shouldReplay: Option[String] = None + withFile(arg)(f => { + interpretAllFrom(f) + shouldReplay = Some(":load " + arg) + }) + Result(true, shouldReplay) + } + + def addAllClasspath(args: Seq[String]): Unit = { + var added = false + var totalClasspath = "" + for (arg <- args) { + val f = File(arg).normalize + if (f.exists) { + added = true + addedClasspath = ClassPath.join(addedClasspath, f.path) + totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath) + intp.addUrlsToClassPath(f.toURI.toURL) + sparkContext.addJar(f.toURI.toURL.getPath) + } + } + } + + def addClasspath(arg: String): Unit = { + val f = File(arg).normalize + if (f.exists) { + addedClasspath = ClassPath.join(addedClasspath, f.path) + intp.addUrlsToClassPath(f.toURI.toURL) + sparkContext.addJar(f.toURI.toURL.getPath) + echo("Added '%s'. Your new classpath is:\n\"%s\"".format(f.path, intp.global.classPath.asClasspathString)) + } + else echo("The path '" + f + "' doesn't seem to exist.") + } + + + def powerCmd(): Result = { + if (isReplPower) "Already in power mode." + else enablePowerMode(false) + } + + def enablePowerMode(isDuringInit: Boolean) = { + // replProps.power setValue true + // unleashAndSetPhase() + // asyncEcho(isDuringInit, power.banner) + } + // private def unleashAndSetPhase() { +// if (isReplPower) { +// // power.unleash() +// // Set the phase to "typer" +// intp beSilentDuring phaseCommand("typer") +// } +// } + + def asyncEcho(async: Boolean, msg: => String) { + if (async) asyncMessage(msg) + else echo(msg) + } + + def verbosity() = { + // val old = intp.printResults + // intp.printResults = !old + // echo("Switched " + (if (old) "off" else "on") + " result printing.") + } + + /** Run one command submitted by the user. Two values are returned: + * (1) whether to keep running, (2) the line to record for replay, + * if any. */ + def command(line: String): Result = { + if (line startsWith ":") { + val cmd = line.tail takeWhile (x => !x.isWhitespace) + uniqueCommand(cmd) match { + case Some(lc) => lc(line.tail stripPrefix cmd dropWhile (_.isWhitespace)) + case _ => ambiguousError(cmd) + } + } + else if (intp.global == null) Result(false, None) // Notice failure to create compiler + else Result(true, interpretStartingWith(line)) + } + + private def readWhile(cond: String => Boolean) = { + Iterator continually in.readLine("") takeWhile (x => x != null && cond(x)) + } + + def pasteCommand(): Result = { + echo("// Entering paste mode (ctrl-D to finish)\n") + val code = readWhile(_ => true) mkString "\n" + echo("\n// Exiting paste mode, now interpreting.\n") + intp interpret code + () + } + + private object paste extends Pasted { + val ContinueString = " | " + val PromptString = "scala> " + + def interpret(line: String): Unit = { + echo(line.trim) + intp interpret line + echo("") + } + + def transcript(start: String) = { + echo("\n// Detected repl transcript paste: ctrl-D to finish.\n") + apply(Iterator(start) ++ readWhile(_.trim != PromptString.trim)) + } + } + import paste.{ ContinueString, PromptString } + + /** Interpret expressions starting with the first line. + * Read lines until a complete compilation unit is available + * or until a syntax error has been seen. If a full unit is + * read, go ahead and interpret it. Return the full string + * to be recorded for replay, if any. + */ + def interpretStartingWith(code: String): Option[String] = { + // signal completion non-completion input has been received + in.completion.resetVerbosity() + + def reallyInterpret = { + val reallyResult = intp.interpret(code) + (reallyResult, reallyResult match { + case IR.Error => None + case IR.Success => Some(code) + case IR.Incomplete => + if (in.interactive && code.endsWith("\n\n")) { + echo("You typed two blank lines. Starting a new command.") + None + } + else in.readLine(ContinueString) match { + case null => + // we know compilation is going to fail since we're at EOF and the + // parser thinks the input is still incomplete, but since this is + // a file being read non-interactively we want to fail. So we send + // it straight to the compiler for the nice error message. + intp.compileString(code) + None + + case line => interpretStartingWith(code + "\n" + line) + } + }) + } + + /** Here we place ourselves between the user and the interpreter and examine + * the input they are ostensibly submitting. We intervene in several cases: + * + * 1) If the line starts with "scala> " it is assumed to be an interpreter paste. + * 2) If the line starts with "." (but not ".." or "./") it is treated as an invocation + * on the previous result. + * 3) If the Completion object's execute returns Some(_), we inject that value + * and avoid the interpreter, as it's likely not valid scala code. + */ + if (code == "") None + else if (!paste.running && code.trim.startsWith(PromptString)) { + paste.transcript(code) + None + } + else if (Completion.looksLikeInvocation(code) && intp.mostRecentVar != "") { + interpretStartingWith(intp.mostRecentVar + code) + } + else if (code.trim startsWith "//") { + // line comment, do nothing + None + } + else + reallyInterpret._2 + } + + // runs :load `file` on any files passed via -i + def loadFiles(settings: Settings) = settings match { + case settings: SparkRunnerSettings => + for (filename <- settings.loadfiles.value) { + val cmd = ":load " + filename + command(cmd) + addReplay(cmd) + echo("") + } + case _ => + } + + /** Tries to create a JLineReader, falling back to SimpleReader: + * unless settings or properties are such that it should start + * with SimpleReader. + */ + def chooseReader(settings: Settings): InteractiveReader = { + if (settings.Xnojline.value || Properties.isEmacsShell) + SimpleReader() + else try new SparkJLineReader( + if (settings.noCompletion.value) NoCompletion + else new SparkJLineCompletion(intp) + ) + catch { + case ex @ (_: Exception | _: NoClassDefFoundError) => + echo("Failed to created SparkJLineReader: " + ex + "\nFalling back to SimpleReader.") + SimpleReader() + } + } + + val u: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe + val m = u.runtimeMirror(Utils.getSparkClassLoader) + private def tagOfStaticClass[T: ClassTag]: u.TypeTag[T] = + u.TypeTag[T]( + m, + new TypeCreator { + def apply[U <: ApiUniverse with Singleton](m: Mirror[U]): U # Type = + m.staticClass(classTag[T].runtimeClass.getName).toTypeConstructor.asInstanceOf[U # Type] + }) + + def process(settings: Settings): Boolean = savingContextLoader { + if (getMaster() == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") + + this.settings = settings + createInterpreter() + + // sets in to some kind of reader depending on environmental cues + in = in0 match { + case Some(reader) => SimpleReader(reader, out, true) + case None => + // some post-initialization + chooseReader(settings) match { + case x: SparkJLineReader => addThunk(x.consoleReader.postInit) ; x + case x => x + } + } + lazy val tagOfSparkIMain = tagOfStaticClass[org.apache.spark.repl.SparkIMain] + // Bind intp somewhere out of the regular namespace where + // we can get at it in generated code. + addThunk(intp.quietBind(NamedParam[SparkIMain]("$intp", intp)(tagOfSparkIMain, classTag[SparkIMain]))) + addThunk({ + import scala.tools.nsc.io._ + import Properties.userHome + import scala.compat.Platform.EOL + val autorun = replProps.replAutorunCode.option flatMap (f => io.File(f).safeSlurp()) + if (autorun.isDefined) intp.quietRun(autorun.get) + }) + + addThunk(printWelcome()) + addThunk(initializeSpark()) + + // it is broken on startup; go ahead and exit + if (intp.reporter.hasErrors) + return false + + // This is about the illusion of snappiness. We call initialize() + // which spins off a separate thread, then print the prompt and try + // our best to look ready. The interlocking lazy vals tend to + // inter-deadlock, so we break the cycle with a single asynchronous + // message to an actor. + if (isAsync) { + intp initialize initializedCallback() + createAsyncListener() // listens for signal to run postInitialization + } + else { + intp.initializeSynchronous() + postInitialization() + } + // printWelcome() + + loadFiles(settings) + + try loop() + catch AbstractOrMissingHandler() + finally closeInterpreter() + + true + } + + def createSparkContext(): SparkContext = { + val execUri = System.getenv("SPARK_EXECUTOR_URI") + val jars = SparkILoop.getAddedJars + val conf = new SparkConf() + .setMaster(getMaster()) + .setAppName("Spark shell") + .setJars(jars) + .set("spark.repl.class.uri", intp.classServer.uri) + if (execUri != null) { + conf.set("spark.executor.uri", execUri) + } + sparkContext = new SparkContext(conf) + logInfo("Created spark context..") + sparkContext + } + + private def getMaster(): String = { + val master = this.master match { + case Some(m) => m + case None => + val envMaster = sys.env.get("MASTER") + val propMaster = sys.props.get("spark.master") + propMaster.orElse(envMaster).getOrElse("local[*]") + } + master + } + + /** process command-line arguments and do as they request */ + def process(args: Array[String]): Boolean = { + val command = new SparkCommandLine(args.toList, msg => echo(msg)) + def neededHelp(): String = + (if (command.settings.help.value) command.usageMsg + "\n" else "") + + (if (command.settings.Xhelp.value) command.xusageMsg + "\n" else "") + + // if they asked for no help and command is valid, we call the real main + neededHelp() match { + case "" => command.ok && process(command.settings) + case help => echoNoNL(help) ; true + } + } + + @deprecated("Use `process` instead", "2.9.0") + def main(settings: Settings): Unit = process(settings) +} + +object SparkILoop { + implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp + private def echo(msg: String) = Console println msg + + def getAddedJars: Array[String] = { + val envJars = sys.env.get("ADD_JARS") + val propJars = sys.props.get("spark.jars").flatMap { p => + if (p == "") None else Some(p) + } + val jars = propJars.orElse(envJars).getOrElse("") + Utils.resolveURIs(jars).split(",").filter(_.nonEmpty) + } + + // Designed primarily for use by test code: take a String with a + // bunch of code, and prints out a transcript of what it would look + // like if you'd just typed it into the repl. + def runForTranscript(code: String, settings: Settings): String = { + import java.io.{ BufferedReader, StringReader, OutputStreamWriter } + + stringFromStream { ostream => + Console.withOut(ostream) { + val output = new JPrintWriter(new OutputStreamWriter(ostream), true) { + override def write(str: String) = { + // completely skip continuation lines + if (str forall (ch => ch.isWhitespace || ch == '|')) () + // print a newline on empty scala prompts + else if ((str contains '\n') && (str.trim == "scala> ")) super.write("\n") + else super.write(str) + } + } + val input = new BufferedReader(new StringReader(code)) { + override def readLine(): String = { + val s = super.readLine() + // helping out by printing the line being interpreted. + if (s != null) + output.println(s) + s + } + } + val repl = new SparkILoop(input, output) + + if (settings.classpath.isDefault) + settings.classpath.value = sys.props("java.class.path") + + getAddedJars.foreach(settings.classpath.append(_)) + + repl process settings + } + } + } + + /** Creates an interpreter loop with default settings and feeds + * the given code to it as input. + */ + def run(code: String, sets: Settings = new Settings): String = { + import java.io.{ BufferedReader, StringReader, OutputStreamWriter } + + stringFromStream { ostream => + Console.withOut(ostream) { + val input = new BufferedReader(new StringReader(code)) + val output = new JPrintWriter(new OutputStreamWriter(ostream), true) + val repl = new ILoop(input, output) + + if (sets.classpath.isDefault) + sets.classpath.value = sys.props("java.class.path") + + repl process sets + } + } + } + def run(lines: List[String]): String = run(lines map (_ + "\n") mkString) +} http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala new file mode 100644 index 0000000..7667a9c --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -0,0 +1,147 @@ +// scalastyle:off + +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Paul Phillips + */ + +package org.apache.spark.repl + +import scala.tools.nsc._ +import scala.tools.nsc.interpreter._ + +import scala.reflect.internal.util.Position +import scala.util.control.Exception.ignoring +import scala.tools.nsc.util.stackTraceString + +import org.apache.spark.SPARK_VERSION + +/** + * Machinery for the asynchronous initialization of the repl. + */ +trait SparkILoopInit { + self: SparkILoop => + + /** Print a welcome message */ + def printWelcome() { + echo("""Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /___/ .__/\_,_/_/ /_/\_\ version %s + /_/ +""".format(SPARK_VERSION)) + import Properties._ + val welcomeMsg = "Using Scala %s (%s, Java %s)".format( + versionString, javaVmName, javaVersion) + echo(welcomeMsg) + echo("Type in expressions to have them evaluated.") + echo("Type :help for more information.") + } + + protected def asyncMessage(msg: String) { + if (isReplInfo || isReplPower) + echoAndRefresh(msg) + } + + private val initLock = new java.util.concurrent.locks.ReentrantLock() + private val initCompilerCondition = initLock.newCondition() // signal the compiler is initialized + private val initLoopCondition = initLock.newCondition() // signal the whole repl is initialized + private val initStart = System.nanoTime + + private def withLock[T](body: => T): T = { + initLock.lock() + try body + finally initLock.unlock() + } + // a condition used to ensure serial access to the compiler. + @volatile private var initIsComplete = false + @volatile private var initError: String = null + private def elapsed() = "%.3f".format((System.nanoTime - initStart).toDouble / 1000000000L) + + // the method to be called when the interpreter is initialized. + // Very important this method does nothing synchronous (i.e. do + // not try to use the interpreter) because until it returns, the + // repl's lazy val `global` is still locked. + protected def initializedCallback() = withLock(initCompilerCondition.signal()) + + // Spins off a thread which awaits a single message once the interpreter + // has been initialized. + protected def createAsyncListener() = { + io.spawn { + withLock(initCompilerCondition.await()) + asyncMessage("[info] compiler init time: " + elapsed() + " s.") + postInitialization() + } + } + + // called from main repl loop + protected def awaitInitialized(): Boolean = { + if (!initIsComplete) + withLock { while (!initIsComplete) initLoopCondition.await() } + if (initError != null) { + println(""" + |Failed to initialize the REPL due to an unexpected error. + |This is a bug, please, report it along with the error diagnostics printed below. + |%s.""".stripMargin.format(initError) + ) + false + } else true + } + // private def warningsThunks = List( + // () => intp.bind("lastWarnings", "" + typeTag[List[(Position, String)]], intp.lastWarnings _), + // ) + + protected def postInitThunks = List[Option[() => Unit]]( + Some(intp.setContextClassLoader _), + if (isReplPower) Some(() => enablePowerMode(true)) else None + ).flatten + // ++ ( + // warningsThunks + // ) + // called once after init condition is signalled + protected def postInitialization() { + try { + postInitThunks foreach (f => addThunk(f())) + runThunks() + } catch { + case ex: Throwable => + initError = stackTraceString(ex) + throw ex + } finally { + initIsComplete = true + + if (isAsync) { + asyncMessage("[info] total init time: " + elapsed() + " s.") + withLock(initLoopCondition.signal()) + } + } + } + + def initializeSpark() { + intp.beQuietDuring { + command(""" + @transient val sc = org.apache.spark.repl.Main.interp.createSparkContext(); + """) + command("import org.apache.spark.SparkContext._") + } + echo("Spark context available as sc.") + } + + // code to be executed only after the interpreter is initialized + // and the lazy val `global` can be accessed without risk of deadlock. + private var pendingThunks: List[() => Unit] = Nil + protected def addThunk(body: => Unit) = synchronized { + pendingThunks :+= (() => body) + } + protected def runThunks(): Unit = synchronized { + if (pendingThunks.nonEmpty) + logDebug("Clearing " + pendingThunks.size + " thunks.") + + while (pendingThunks.nonEmpty) { + val thunk = pendingThunks.head + pendingThunks = pendingThunks.tail + thunk() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org