http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4e0cd6c..7bbde31 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,30 +97,26 @@
     <module>sql/catalyst</module>
     <module>sql/core</module>
     <module>sql/hive</module>
-    <module>repl</module>
     <module>assembly</module>
     <module>external/twitter</module>
-    <module>external/kafka</module>
     <module>external/flume</module>
     <module>external/flume-sink</module>
-    <module>external/zeromq</module>
     <module>external/mqtt</module>
+    <module>external/zeromq</module>
     <module>examples</module>
+    <module>repl</module>
   </modules>
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-
+    <akka.group>org.spark-project.akka</akka.group>
+    <akka.version>2.3.4-spark</akka.version>
     <java.version>1.6</java.version>
     <sbt.project.name>spark</sbt.project.name>
-    <scala.version>2.10.4</scala.version>
-    <scala.binary.version>2.10</scala.binary.version>
     <scala.macros.version>2.0.1</scala.macros.version>
     <mesos.version>0.18.1</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
-    <akka.group>org.spark-project.akka</akka.group>
-    <akka.version>2.3.4-spark</akka.version>
     <slf4j.version>1.7.5</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <hadoop.version>1.0.4</hadoop.version>
@@ -137,7 +133,7 @@
     <parquet.version>1.6.0rc3</parquet.version>
     <jblas.version>1.2.3</jblas.version>
     <jetty.version>8.1.14.v20131031</jetty.version>
-    <chill.version>0.3.6</chill.version>
+    <chill.version>0.5.0</chill.version>
     <codahale.metrics.version>3.0.0</codahale.metrics.version>
     <avro.version>1.7.6</avro.version>
     <avro.mapred.classifier></avro.mapred.classifier>
@@ -146,9 +142,13 @@
     <aws.kinesis.client.version>1.1.0</aws.kinesis.client.version>
     <commons.httpclient.version>4.2.6</commons.httpclient.version>
     <commons.math3.version>3.1.1</commons.math3.version>
-
+    
<test_classpath_file>${project.build.directory}/spark-test-classpath.txt</test_classpath_file>
     <PermGen>64m</PermGen>
     <MaxPermGen>512m</MaxPermGen>
+    <scala.version>2.10.4</scala.version>
+    <scala.binary.version>2.10</scala.binary.version>
+    <jline.version>${scala.version}</jline.version>
+    <jline.groupid>org.scala-lang</jline.groupid>
   </properties>
 
   <repositories>
@@ -267,20 +267,67 @@
       </snapshots>
     </pluginRepository>
   </pluginRepositories>
+
+  <dependencies>
   <!-- 
        This is a dummy dependency that is used along with the shading plug-in
        to create effective poms on publishing (see SPARK-3812).
   -->
-  <dependencies>
     <dependency>
       <groupId>org.spark-project.spark</groupId>
       <artifactId>unused</artifactId>
       <version>1.0.0</version>
     </dependency>
+    <!-- 
+         This depndency has been added to provided scope as it is needed for 
excuting build
+         specific groovy scripts using gmaven+ and not required for downstream 
project building
+         with spark.
+    -->
+    <dependency>
+      <groupId>org.codehaus.groovy</groupId>
+      <artifactId>groovy-all</artifactId>
+      <version>2.3.7</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
   <dependencyManagement>
     <dependencies>
       <dependency>
+        <groupId>${jline.groupid}</groupId>
+        <artifactId>jline</artifactId>
+        <version>${jline.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.twitter</groupId>
+        <artifactId>chill_${scala.binary.version}</artifactId>
+        <version>${chill.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm-commons</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>com.twitter</groupId>
+        <artifactId>chill-java</artifactId>
+        <version>${chill.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm-commons</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
         <groupId>org.eclipse.jetty</groupId>
         <artifactId>jetty-util</artifactId>
         <version>${jetty.version}</version>
@@ -396,36 +443,6 @@
         <version>${protobuf.version}</version>
       </dependency>
       <dependency>
-        <groupId>com.twitter</groupId>
-        <artifactId>chill_${scala.binary.version}</artifactId>
-        <version>${chill.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm-commons</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>com.twitter</groupId>
-        <artifactId>chill-java</artifactId>
-        <version>${chill.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm-commons</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
         <groupId>${akka.group}</groupId>
         <artifactId>akka-actor_${scala.binary.version}</artifactId>
         <version>${akka.version}</version>
@@ -514,11 +531,6 @@
       </dependency>
       <dependency>
         <groupId>org.scala-lang</groupId>
-        <artifactId>jline</artifactId>
-        <version>${scala.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.scala-lang</groupId>
         <artifactId>scala-library</artifactId>
         <version>${scala.version}</version>
       </dependency>
@@ -965,6 +977,7 @@
               
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
               <spark.testing>1</spark.testing>
               <spark.ui.enabled>false</spark.ui.enabled>
+              
<spark.executor.extraClassPath>${test_classpath}</spark.executor.extraClassPath>
             </systemProperties>
           </configuration>
           <executions>
@@ -1026,6 +1039,47 @@
     </pluginManagement>
 
     <plugins>
+      <!-- This plugin dumps the test classpath into a file -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.9</version>
+        <executions>
+          <execution>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>build-classpath</goal>
+            </goals>
+            <configuration>
+              <includeScope>test</includeScope>
+              <outputFile>${test_classpath_file}</outputFile>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- This plugin reads a file into maven property. And it lets us write 
groovy !! -->
+      <plugin>
+        <groupId>org.codehaus.gmavenplus</groupId>
+        <artifactId>gmavenplus-plugin</artifactId>
+        <version>1.2</version>
+        <executions>
+          <execution>
+            <phase>process-test-classes</phase>
+            <goals>
+              <goal>execute</goal>
+            </goals>
+            <configuration>
+              <scripts>
+                <script><![CDATA[
+                def file = new File(project.properties.test_classpath_file)
+                project.properties.test_classpath = 
file.getText().split().join(":")
+                ]]></script>
+              </scripts>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
       <!-- The shade plug-in is used here to create effective pom's (see 
SPARK-3812). -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -1335,7 +1389,7 @@
       </dependencies>
     </profile>
     <profile>
-      <id>hive</id>
+      <id>hive-thriftserver</id>
       <activation>
         <activeByDefault>false</activeByDefault>
       </activation>
@@ -1365,5 +1419,35 @@
         <derby.version>10.10.1.1</derby.version>
       </properties>
     </profile>
+
+    <profile>
+      <id>scala-2.10</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <properties>
+        <scala.version>2.10.4</scala.version>
+        <scala.binary.version>2.10</scala.binary.version>
+        <jline.version>${scala.version}</jline.version>
+        <jline.groupid>org.scala-lang</jline.groupid>
+      </properties>
+      <modules>
+        <module>external/kafka</module>
+      </modules>
+    </profile>
+
+    <profile>
+      <id>scala-2.11</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <properties>
+        <scala.version>2.11.2</scala.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <jline.version>2.12</jline.version>
+        <jline.groupid>jline</jline.groupid>
+      </properties>
+    </profile>
+
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 351e57a..492607d 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -31,8 +31,8 @@ object BuildCommons {
   private val buildLocation = file(".").getAbsoluteFile.getParentFile
 
   val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, 
mllib, repl,
-  sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, 
streamingFlume, streamingKafka,
-  streamingMqtt, streamingTwitter, streamingZeromq) =
+    sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, 
streamingFlume, streamingKafka,
+    streamingMqtt, streamingTwitter, streamingZeromq) =
     Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", 
"mllib", "repl",
       "sql", "network-common", "network-shuffle", "streaming", 
"streaming-flume-sink",
       "streaming-flume", "streaming-kafka", "streaming-mqtt", 
"streaming-twitter",
@@ -68,8 +68,8 @@ object SparkBuild extends PomBuild {
       profiles ++= Seq("spark-ganglia-lgpl")
     }
     if (Properties.envOrNone("SPARK_HIVE").isDefined) {
-      println("NOTE: SPARK_HIVE is deprecated, please use -Phive flag.")
-      profiles ++= Seq("hive")
+      println("NOTE: SPARK_HIVE is deprecated, please use -Phive and 
-Phive-thriftserver flags.")
+      profiles ++= Seq("hive", "hive-thriftserver")
     }
     Properties.envOrNone("SPARK_HADOOP_VERSION") match {
       case Some(v) =>
@@ -91,13 +91,21 @@ object SparkBuild extends PomBuild {
     profiles
   }
 
-  override val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match {
+  override val profiles = {
+    val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match {
     case None => backwardCompatibility
     case Some(v) =>
       if (backwardCompatibility.nonEmpty)
         println("Note: We ignore environment variables, when use of profile is 
detected in " +
           "conjunction with environment variable.")
       v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", 
"")).toSeq
+    }
+    if (profiles.exists(_.contains("scala-"))) {
+      profiles
+    } else {
+      println("Enabled default scala profile")
+      profiles ++ Seq("scala-2.10")
+    }
   }
 
   Properties.envOrNone("SBT_MAVEN_PROPERTIES") match {
@@ -136,7 +144,8 @@ object SparkBuild extends PomBuild {
 
   // Note ordering of these settings matter.
   /* Enable shared settings on all projects */
-  (allProjects ++ optionallyEnabledProjects ++ 
assemblyProjects).foreach(enable(sharedSettings))
+  (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ Seq(spark, 
tools))
+    .foreach(enable(sharedSettings ++ ExludedDependencies.settings))
 
   /* Enable tests settings for all projects except examples, assembly and 
tools */
   (allProjects ++ 
optionallyEnabledProjects).foreach(enable(TestSettings.settings))
@@ -179,6 +188,16 @@ object Flume {
 }
 
 /**
+  This excludes library dependencies in sbt, which are specified in maven but 
are
+  not needed by sbt build.
+  */
+object ExludedDependencies {
+  lazy val settings = Seq(
+    libraryDependencies ~= { libs => libs.filterNot(_.name == "groovy-all") }
+  )
+}
+
+/**
  * Following project only exists to pull previous artifacts of Spark for 
generating
  * Mima ignores. For more information see: SPARK 2071
  */
@@ -353,8 +372,11 @@ object TestSettings {
       .map { case (k,v) => s"-D$k=$v" }.toSeq,
     javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m 
-XX:MaxPermSize=1g"
       .split(" ").toSeq,
+    // This places test scope jars on the classpath of executors during tests.
+    javaOptions in Test += 
+      "-Dspark.executor.extraClassPath=" + (fullClasspath in Test).value.files.
+      map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
     javaOptions += "-Xmx3g",
-
     // Show full stack trace and duration in test cases.
     testOptions in Test += Tests.Argument("-oDF"),
     testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),

http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/project/project/SparkPluginBuild.scala
----------------------------------------------------------------------
diff --git a/project/project/SparkPluginBuild.scala 
b/project/project/SparkPluginBuild.scala
index 3ef2d54..8863f27 100644
--- a/project/project/SparkPluginBuild.scala
+++ b/project/project/SparkPluginBuild.scala
@@ -26,7 +26,7 @@ import sbt.Keys._
 object SparkPluginDef extends Build {
   lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle, 
sbtPomReader)
   lazy val sparkStyle = Project("spark-style", file("spark-style"), settings = 
styleSettings)
-  lazy val sbtPomReader = 
uri("https://github.com/ScrapCodes/sbt-pom-reader.git";)
+  lazy val sbtPomReader = 
uri("https://github.com/ScrapCodes/sbt-pom-reader.git#ignore_artifact_id";)
 
   // There is actually no need to publish this artifact.
   def styleSettings = Defaults.defaultSettings ++ Seq (

http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/pom.xml
----------------------------------------------------------------------
diff --git a/repl/pom.xml b/repl/pom.xml
index af528c8..bd688c8 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -39,6 +39,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>${jline.groupid}</groupId>
+      <artifactId>jline</artifactId>
+      <version>${jline.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
@@ -76,11 +81,6 @@
       <version>${scala.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>jline</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>jul-to-slf4j</artifactId>
     </dependency>
@@ -124,4 +124,84 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>scala-2.10</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-scala-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/scala</source>
+                    <source>scala-2.10/src/main/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-scala-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/test/scala</source>
+                    <source>scala-2.10/src/test/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>scala-2.11</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-scala-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/scala</source>
+                    <source>scala-2.11/src/main/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-scala-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/test/scala</source>
+                    <source>scala-2.11/src/test/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
new file mode 100644
index 0000000..14b448d
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import scala.collection.mutable.Set
+
+object Main {
+  private var _interp: SparkILoop = _
+
+  def interp = _interp
+
+  def interp_=(i: SparkILoop) { _interp = i }
+
+  def main(args: Array[String]) {
+    _interp = new SparkILoop
+    _interp.process(args)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala
new file mode 100644
index 0000000..0581694
--- /dev/null
+++ 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import scala.tools.nsc.{Settings, CompilerCommand}
+import scala.Predef._
+
+/**
+ * Command class enabling Spark-specific command line options (provided by
+ * <i>org.apache.spark.repl.SparkRunnerSettings</i>).
+ */
+class SparkCommandLine(args: List[String], override val settings: Settings)
+    extends CompilerCommand(args, settings) {
+
+  def this(args: List[String], error: String => Unit) {
+    this(args, new SparkRunnerSettings(error))
+  }
+
+  def this(args: List[String]) {
+    this(args, str => Console.println("Error: " + str))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
new file mode 100644
index 0000000..f8432c8
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
@@ -0,0 +1,114 @@
+// scalastyle:off
+
+/* NSC -- new Scala compiler
+ * Copyright 2005-2013 LAMP/EPFL
+ * @author  Paul Phillips
+ */
+
+package org.apache.spark.repl
+
+import scala.tools.nsc._
+import scala.tools.nsc.interpreter._
+
+import scala.reflect.internal.util.BatchSourceFile
+import scala.tools.nsc.ast.parser.Tokens.EOF
+
+import org.apache.spark.Logging
+
+trait SparkExprTyper extends Logging {
+  val repl: SparkIMain
+
+  import repl._
+  import global.{ reporter => _, Import => _, _ }
+  import definitions._
+  import syntaxAnalyzer.{ UnitParser, UnitScanner, token2name }
+  import naming.freshInternalVarName
+
+  object codeParser extends { val global: repl.global.type = repl.global } 
with CodeHandlers[Tree] {
+    def applyRule[T](code: String, rule: UnitParser => T): T = {
+      reporter.reset()
+      val scanner = newUnitParser(code)
+      val result  = rule(scanner)
+
+      if (!reporter.hasErrors)
+        scanner.accept(EOF)
+
+      result
+    }
+
+    def defns(code: String) = stmts(code) collect { case x: DefTree => x }
+    def expr(code: String)  = applyRule(code, _.expr())
+    def stmts(code: String) = applyRule(code, _.templateStats())
+    def stmt(code: String)  = stmts(code).last  // guaranteed nonempty
+  }
+
+  /** Parse a line into a sequence of trees. Returns None if the input is 
incomplete. */
+  def parse(line: String): Option[List[Tree]] = 
debugging(s"""parse("$line")""")  {
+    var isIncomplete = false
+    reporter.withIncompleteHandler((_, _) => isIncomplete = true) {
+      val trees = codeParser.stmts(line)
+      if (reporter.hasErrors) {
+        Some(Nil)
+      } else if (isIncomplete) {
+        None
+      } else {
+        Some(trees)
+      }
+    }
+  }
+  // def parsesAsExpr(line: String) = {
+  //   import codeParser._
+  //   (opt expr line).isDefined
+  // }
+
+  def symbolOfLine(code: String): Symbol = {
+    def asExpr(): Symbol = {
+      val name  = freshInternalVarName()
+      // Typing it with a lazy val would give us the right type, but runs
+      // into compiler bugs with things like existentials, so we compile it
+      // behind a def and strip the NullaryMethodType which wraps the expr.
+      val line = "def " + name + " = {\n" + code + "\n}"
+
+      interpretSynthetic(line) match {
+        case IR.Success =>
+          val sym0 = symbolOfTerm(name)
+          // drop NullaryMethodType
+          val sym = sym0.cloneSymbol setInfo 
afterTyper(sym0.info.finalResultType)
+          if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym
+        case _          => NoSymbol
+      }
+    }
+    def asDefn(): Symbol = {
+      val old = repl.definedSymbolList.toSet
+
+      interpretSynthetic(code) match {
+        case IR.Success =>
+          repl.definedSymbolList filterNot old match {
+            case Nil        => NoSymbol
+            case sym :: Nil => sym
+            case syms       => NoSymbol.newOverloaded(NoPrefix, syms)
+          }
+        case _ => NoSymbol
+      }
+    }
+    beQuietDuring(asExpr()) orElse beQuietDuring(asDefn())
+  }
+
+  private var typeOfExpressionDepth = 0
+  def typeOfExpression(expr: String, silent: Boolean = true): Type = {
+    if (typeOfExpressionDepth > 2) {
+      logDebug("Terminating typeOfExpression recursion for expression: " + 
expr)
+      return NoType
+    }
+    typeOfExpressionDepth += 1
+    // Don't presently have a good way to suppress undesirable success output
+    // while letting errors through, so it is first trying it silently: if 
there
+    // is an error, and errors are desired, then it re-evaluates non-silently
+    // to induce the error message.
+    try beSilentDuring(symbolOfLine(expr).tpe) match {
+      case NoType if !silent => symbolOfLine(expr).tpe // generate error
+      case tpe               => tpe
+    }
+    finally typeOfExpressionDepth -= 1
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala
new file mode 100644
index 0000000..5340951
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package scala.tools.nsc
+
+object SparkHelper {
+  def explicitParentLoader(settings: Settings) = settings.explicitParentLoader
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
new file mode 100644
index 0000000..e56b74e
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -0,0 +1,1091 @@
+// scalastyle:off
+
+/* NSC -- new Scala compiler
+ * Copyright 2005-2013 LAMP/EPFL
+ * @author Alexander Spoon
+ */
+
+package org.apache.spark.repl
+
+
+import java.net.URL
+
+import scala.reflect.io.AbstractFile
+import scala.tools.nsc._
+import scala.tools.nsc.backend.JavaPlatform
+import scala.tools.nsc.interpreter._
+
+import scala.tools.nsc.interpreter.{Results => IR}
+import Predef.{println => _, _}
+import java.io.{BufferedReader, FileReader}
+import java.net.URI
+import java.util.concurrent.locks.ReentrantLock
+import scala.sys.process.Process
+import scala.tools.nsc.interpreter.session._
+import scala.util.Properties.{jdkHome, javaVersion}
+import scala.tools.util.{Javap}
+import scala.annotation.tailrec
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.ops
+import scala.tools.nsc.util._
+import scala.tools.nsc.interpreter._
+import scala.tools.nsc.io.{File, Directory}
+import scala.reflect.NameTransformer._
+import scala.tools.nsc.util.ScalaClassLoader._
+import scala.tools.util._
+import scala.language.{implicitConversions, existentials, postfixOps}
+import scala.reflect.{ClassTag, classTag}
+import scala.tools.reflect.StdRuntimeTags._
+
+import java.lang.{Class => jClass}
+import scala.reflect.api.{Mirror, TypeCreator, Universe => ApiUniverse}
+
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.util.Utils
+
+/** The Scala interactive shell.  It provides a read-eval-print loop
+ *  around the Interpreter class.
+ *  After instantiation, clients should call the main() method.
+ *
+ *  If no in0 is specified, then input will come from the console, and
+ *  the class will attempt to provide input editing feature such as
+ *  input history.
+ *
+ *  @author Moez A. Abdel-Gawad
+ *  @author  Lex Spoon
+ *  @version 1.2
+ */
+class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
+               val master: Option[String])
+                extends AnyRef
+                   with LoopCommands
+                   with SparkILoopInit
+                   with Logging
+{
+  def this(in0: BufferedReader, out: JPrintWriter, master: String) = 
this(Some(in0), out, Some(master))
+  def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out, None)
+  def this() = this(None, new JPrintWriter(Console.out, true), None)
+
+  var in: InteractiveReader = _   // the input stream from which commands come
+  var settings: Settings = _
+  var intp: SparkIMain = _
+
+  @deprecated("Use `intp` instead.", "2.9.0") def interpreter = intp
+  @deprecated("Use `intp` instead.", "2.9.0") def interpreter_= (i: 
SparkIMain): Unit = intp = i
+
+  /** Having inherited the difficult "var-ness" of the repl instance,
+   *  I'm trying to work around it by moving operations into a class from
+   *  which it will appear a stable prefix.
+   */
+  private def onIntp[T](f: SparkIMain => T): T = f(intp)
+
+  class IMainOps[T <: SparkIMain](val intp: T) {
+    import intp._
+    import global._
+
+    def printAfterTyper(msg: => String) =
+      intp.reporter printMessage afterTyper(msg)
+
+    /** Strip NullaryMethodType artifacts. */
+    private def replInfo(sym: Symbol) = {
+      sym.info match {
+        case NullaryMethodType(restpe) if sym.isAccessor  => restpe
+        case info                                         => info
+      }
+    }
+    def echoTypeStructure(sym: Symbol) =
+      printAfterTyper("" + deconstruct.show(replInfo(sym)))
+
+    def echoTypeSignature(sym: Symbol, verbose: Boolean) = {
+      if (verbose) SparkILoop.this.echo("// Type signature")
+      printAfterTyper("" + replInfo(sym))
+
+      if (verbose) {
+        SparkILoop.this.echo("\n// Internal Type structure")
+        echoTypeStructure(sym)
+      }
+    }
+  }
+  implicit def stabilizeIMain(intp: SparkIMain) = new IMainOps[intp.type](intp)
+
+  /** TODO -
+   *  -n normalize
+   *  -l label with case class parameter names
+   *  -c complete - leave nothing out
+   */
+  private def typeCommandInternal(expr: String, verbose: Boolean): Result = {
+    onIntp { intp =>
+      val sym = intp.symbolOfLine(expr)
+      if (sym.exists) intp.echoTypeSignature(sym, verbose)
+      else ""
+    }
+  }
+
+  var sparkContext: SparkContext = _
+
+  override def echoCommandMessage(msg: String) {
+    intp.reporter printMessage msg
+  }
+
+  // def isAsync = !settings.Yreplsync.value
+  def isAsync = false
+  // lazy val power = new Power(intp, new StdReplVals(this))(tagOfStdReplVals, 
classTag[StdReplVals])
+  def history = in.history
+
+  /** The context class loader at the time this object was created */
+  protected val originalClassLoader = Utils.getContextOrSparkClassLoader
+
+  // classpath entries added via :cp
+  var addedClasspath: String = ""
+
+  /** A reverse list of commands to replay if the user requests a :replay */
+  var replayCommandStack: List[String] = Nil
+
+  /** A list of commands to replay if the user requests a :replay */
+  def replayCommands = replayCommandStack.reverse
+
+  /** Record a command for replay should the user request a :replay */
+  def addReplay(cmd: String) = replayCommandStack ::= cmd
+
+  def savingReplayStack[T](body: => T): T = {
+    val saved = replayCommandStack
+    try body
+    finally replayCommandStack = saved
+  }
+  def savingReader[T](body: => T): T = {
+    val saved = in
+    try body
+    finally in = saved
+  }
+
+
+  def sparkCleanUp(){
+    echo("Stopping spark context.")
+    intp.beQuietDuring {
+      command("sc.stop()")
+    }
+  }
+  /** Close the interpreter and set the var to null. */
+  def closeInterpreter() {
+    if (intp ne null) {
+      sparkCleanUp()
+      intp.close()
+      intp = null
+    }
+  }
+
+  class SparkILoopInterpreter extends SparkIMain(settings, out) {
+    outer =>
+
+    override lazy val formatting = new Formatting {
+      def prompt = SparkILoop.this.prompt
+    }
+    override protected def parentClassLoader = 
SparkHelper.explicitParentLoader(settings).getOrElse(classOf[SparkILoop].getClassLoader)
+  }
+
+  /** Create a new interpreter. */
+  def createInterpreter() {
+    require(settings != null)
+
+    if (addedClasspath != "") settings.classpath.append(addedClasspath)
+    val addedJars =
+      if (Utils.isWindows) {
+        // Strip any URI scheme prefix so we can add the correct path to the 
classpath
+        // e.g. file:/C:/my/path.jar -> C:/my/path.jar
+        SparkILoop.getAddedJars.map { jar => new 
URI(jar).getPath.stripPrefix("/") }
+      } else {
+        SparkILoop.getAddedJars
+      }
+    // work around for Scala bug
+    val totalClassPath = addedJars.foldLeft(
+      settings.classpath.value)((l, r) => ClassPath.join(l, r))
+    this.settings.classpath.value = totalClassPath
+
+    intp = new SparkILoopInterpreter
+  }
+
+  /** print a friendly help message */
+  def helpCommand(line: String): Result = {
+    if (line == "") helpSummary()
+    else uniqueCommand(line) match {
+      case Some(lc) => echo("\n" + lc.longHelp)
+      case _        => ambiguousError(line)
+    }
+  }
+  private def helpSummary() = {
+    val usageWidth  = commands map (_.usageMsg.length) max
+    val formatStr   = "%-" + usageWidth + "s %s %s"
+
+    echo("All commands can be abbreviated, e.g. :he instead of :help.")
+    echo("Those marked with a * have more detailed help, e.g. :help 
imports.\n")
+
+    commands foreach { cmd =>
+      val star = if (cmd.hasLongHelp) "*" else " "
+      echo(formatStr.format(cmd.usageMsg, star, cmd.help))
+    }
+  }
+  private def ambiguousError(cmd: String): Result = {
+    matchingCommands(cmd) match {
+      case Nil  => echo(cmd + ": no such command.  Type :help for help.")
+      case xs   => echo(cmd + " is ambiguous: did you mean " + xs.map(":" + 
_.name).mkString(" or ") + "?")
+    }
+    Result(true, None)
+  }
+  private def matchingCommands(cmd: String) = commands filter (_.name 
startsWith cmd)
+  private def uniqueCommand(cmd: String): Option[LoopCommand] = {
+    // this lets us add commands willy-nilly and only requires enough command 
to disambiguate
+    matchingCommands(cmd) match {
+      case List(x)  => Some(x)
+      // exact match OK even if otherwise appears ambiguous
+      case xs       => xs find (_.name == cmd)
+    }
+  }
+  private var fallbackMode = false 
+
+  private def toggleFallbackMode() {
+    val old = fallbackMode
+    fallbackMode = !old
+    System.setProperty("spark.repl.fallback", fallbackMode.toString)
+    echo(s"""
+      |Switched ${if (old) "off" else "on"} fallback mode without restarting.
+      |       If you have defined classes in the repl, it would 
+      |be good to redefine them incase you plan to use them. If you still run
+      |into issues it would be good to restart the repl and turn on 
`:fallback` 
+      |mode as first command.
+      """.stripMargin)
+  }
+
+  /** Show the history */
+  lazy val historyCommand = new LoopCommand("history", "show the history 
(optional num is commands to show)") {
+    override def usage = "[num]"
+    def defaultLines = 20
+
+    def apply(line: String): Result = {
+      if (history eq NoHistory)
+        return "No history available."
+
+      val xs      = words(line)
+      val current = history.index
+      val count   = try xs.head.toInt catch { case _: Exception => 
defaultLines }
+      val lines   = history.asStrings takeRight count
+      val offset  = current - lines.size + 1
+
+      for ((line, index) <- lines.zipWithIndex)
+        echo("%3d  %s".format(index + offset, line))
+    }
+  }
+
+  // When you know you are most likely breaking into the middle
+  // of a line being typed.  This softens the blow.
+  protected def echoAndRefresh(msg: String) = {
+    echo("\n" + msg)
+    in.redrawLine()
+  }
+  protected def echo(msg: String) = {
+    out println msg
+    out.flush()
+  }
+  protected def echoNoNL(msg: String) = {
+    out print msg
+    out.flush()
+  }
+
+  /** Search the history */
+  def searchHistory(_cmdline: String) {
+    val cmdline = _cmdline.toLowerCase
+    val offset  = history.index - history.size + 1
+
+    for ((line, index) <- history.asStrings.zipWithIndex ; if line.toLowerCase 
contains cmdline)
+      echo("%d %s".format(index + offset, line))
+  }
+
+  private var currentPrompt = Properties.shellPromptString
+  def setPrompt(prompt: String) = currentPrompt = prompt
+  /** Prompt to print when awaiting input */
+  def prompt = currentPrompt
+
+  import LoopCommand.{ cmd, nullary }
+
+  /** Standard commands */
+  lazy val standardCommands = List(
+    cmd("cp", "<path>", "add a jar or directory to the classpath", 
addClasspath),
+    cmd("help", "[command]", "print this summary or command-specific help", 
helpCommand),
+    historyCommand,
+    cmd("h?", "<string>", "search the history", searchHistory),
+    cmd("imports", "[name name ...]", "show import history, identifying 
sources of names", importsCommand),
+    cmd("implicits", "[-v]", "show the implicits in scope", implicitsCommand),
+    cmd("javap", "<path|class>", "disassemble a file or class name", 
javapCommand),
+    cmd("load", "<path>", "load and interpret a Scala file", loadCommand),
+    nullary("paste", "enter paste mode: all input up to ctrl-D compiled 
together", pasteCommand),
+//    nullary("power", "enable power user mode", powerCmd),
+    nullary("quit", "exit the repl", () => Result(false, None)),
+    nullary("replay", "reset execution and replay all previous commands", 
replay),
+    nullary("reset", "reset the repl to its initial state, forgetting all 
session entries", resetCommand),
+    shCommand,
+    nullary("silent", "disable/enable automatic printing of results", 
verbosity),
+    nullary("fallback", """
+                           |disable/enable advanced repl changes, these fix 
some issues but may introduce others. 
+                           |This mode will be removed once these fixes 
stablize""".stripMargin, toggleFallbackMode),
+    cmd("type", "[-v] <expr>", "display the type of an expression without 
evaluating it", typeCommand),
+    nullary("warnings", "show the suppressed warnings from the most recent 
line which had any", warningsCommand)
+  )
+
+  /** Power user commands */
+  lazy val powerCommands: List[LoopCommand] = List(
+    // cmd("phase", "<phase>", "set the implicit phase for power commands", 
phaseCommand)
+  )
+
+  // private def dumpCommand(): Result = {
+  //   echo("" + power)
+  //   history.asStrings takeRight 30 foreach echo
+  //   in.redrawLine()
+  // }
+  // private def valsCommand(): Result = power.valsDescription
+
+  private val typeTransforms = List(
+    "scala.collection.immutable." -> "immutable.",
+    "scala.collection.mutable."   -> "mutable.",
+    "scala.collection.generic."   -> "generic.",
+    "java.lang."                  -> "jl.",
+    "scala.runtime."              -> "runtime."
+  )
+
+  private def importsCommand(line: String): Result = {
+    val tokens    = words(line)
+    val handlers  = intp.languageWildcardHandlers ++ intp.importHandlers
+    val isVerbose = tokens contains "-v"
+
+    handlers.filterNot(_.importedSymbols.isEmpty).zipWithIndex foreach {
+      case (handler, idx) =>
+        val (types, terms) = handler.importedSymbols partition 
(_.name.isTypeName)
+        val imps           = handler.implicitSymbols
+        val found          = tokens filter (handler importsSymbolNamed _)
+        val typeMsg        = if (types.isEmpty) "" else types.size + " types"
+        val termMsg        = if (terms.isEmpty) "" else terms.size + " terms"
+        val implicitMsg    = if (imps.isEmpty) "" else imps.size + " are 
implicit"
+        val foundMsg       = if (found.isEmpty) "" else found.mkString(" // 
imports: ", ", ", "")
+        val statsMsg       = List(typeMsg, termMsg, implicitMsg) filterNot (_ 
== "") mkString ("(", ", ", ")")
+
+        intp.reporter.printMessage("%2d) %-30s %s%s".format(
+          idx + 1,
+          handler.importString,
+          statsMsg,
+          foundMsg
+        ))
+    }
+  }
+
+  private def implicitsCommand(line: String): Result = onIntp { intp =>
+    import intp._
+    import global._
+
+    def p(x: Any) = intp.reporter.printMessage("" + x)
+
+    // If an argument is given, only show a source with that
+    // in its name somewhere.
+    val args     = line split "\\s+"
+    val filtered = intp.implicitSymbolsBySource filter {
+      case (source, syms) =>
+        (args contains "-v") || {
+          if (line == "") (source.fullName.toString != "scala.Predef")
+          else (args exists (source.name.toString contains _))
+        }
+    }
+
+    if (filtered.isEmpty)
+      return "No implicits have been imported other than those in Predef."
+
+    filtered foreach {
+      case (source, syms) =>
+        p("/* " + syms.size + " implicit members imported from " + 
source.fullName + " */")
+
+        // This groups the members by where the symbol is defined
+        val byOwner = syms groupBy (_.owner)
+        val sortedOwners = byOwner.toList sortBy { case (owner, _) => 
afterTyper(source.info.baseClasses indexOf owner) }
+
+        sortedOwners foreach {
+          case (owner, members) =>
+            // Within each owner, we cluster results based on the final result 
type
+            // if there are more than a couple, and sort each cluster based on 
name.
+            // This is really just trying to make the 100 or so implicits 
imported
+            // by default into something readable.
+            val memberGroups: List[List[Symbol]] = {
+              val groups = members groupBy (_.tpe.finalResultType) toList
+              val (big, small) = groups partition (_._2.size > 3)
+              val xss = (
+                (big sortBy (_._1.toString) map (_._2)) :+
+                (small flatMap (_._2))
+              )
+
+              xss map (xs => xs sortBy (_.name.toString))
+            }
+
+            val ownerMessage = if (owner == source) " defined in " else " 
inherited from "
+            p("  /* " + members.size + ownerMessage + owner.fullName + " */")
+
+            memberGroups foreach { group =>
+              group foreach (s => p("  " + intp.symbolDefString(s)))
+              p("")
+            }
+        }
+        p("")
+    }
+  }
+
+  private def findToolsJar() = {
+    val jdkPath = Directory(jdkHome)
+    val jar     = jdkPath / "lib" / "tools.jar" toFile;
+
+    if (jar isFile)
+      Some(jar)
+    else if (jdkPath.isDirectory)
+      jdkPath.deepFiles find (_.name == "tools.jar")
+    else None
+  }
+  private def addToolsJarToLoader() = {
+    val cl = findToolsJar match {
+      case Some(tools) => ScalaClassLoader.fromURLs(Seq(tools.toURL), 
intp.classLoader)
+      case _           => intp.classLoader
+    }
+    if (Javap.isAvailable(cl)) {
+      logDebug(":javap available.")
+      cl
+    }
+    else {
+      logDebug(":javap unavailable: no tools.jar at " + jdkHome)
+      intp.classLoader
+    }
+  }
+
+  protected def newJavap() = new JavapClass(addToolsJarToLoader(), new 
SparkIMain.ReplStrippingWriter(intp)) {
+    override def tryClass(path: String): Array[Byte] = {
+      val hd :: rest = path split '.' toList;
+      // If there are dots in the name, the first segment is the
+      // key to finding it.
+      if (rest.nonEmpty) {
+        intp optFlatName hd match {
+          case Some(flat) =>
+            val clazz = flat :: rest mkString NAME_JOIN_STRING
+            val bytes = super.tryClass(clazz)
+            if (bytes.nonEmpty) bytes
+            else super.tryClass(clazz + MODULE_SUFFIX_STRING)
+          case _          => super.tryClass(path)
+        }
+      }
+      else {
+        // Look for Foo first, then Foo$, but if Foo$ is given explicitly,
+        // we have to drop the $ to find object Foo, then tack it back onto
+        // the end of the flattened name.
+        def className  = intp flatName path
+        def moduleName = (intp flatName 
path.stripSuffix(MODULE_SUFFIX_STRING)) + MODULE_SUFFIX_STRING
+
+        val bytes = super.tryClass(className)
+        if (bytes.nonEmpty) bytes
+        else super.tryClass(moduleName)
+      }
+    }
+  }
+  // private lazy val javap = substituteAndLog[Javap]("javap", 
NoJavap)(newJavap())
+  private lazy val javap =
+    try newJavap()
+    catch { case _: Exception => null }
+
+  // Still todo: modules.
+  private def typeCommand(line0: String): Result = {
+    line0.trim match {
+      case ""                      => ":type [-v] <expression>"
+      case s if s startsWith "-v " => typeCommandInternal(s stripPrefix "-v " 
trim, true)
+      case s                       => typeCommandInternal(s, false)
+    }
+  }
+
+  private def warningsCommand(): Result = {
+    if (intp.lastWarnings.isEmpty)
+      "Can't find any cached warnings."
+    else
+      intp.lastWarnings foreach { case (pos, msg) => 
intp.reporter.warning(pos, msg) }
+  }
+
+  private def javapCommand(line: String): Result = {
+    if (javap == null)
+      ":javap unavailable, no tools.jar at %s.  Set JDK_HOME.".format(jdkHome)
+    else if (javaVersion startsWith "1.7")
+      ":javap not yet working with java 1.7"
+    else if (line == "")
+      ":javap [-lcsvp] [path1 path2 ...]"
+    else
+      javap(words(line)) foreach { res =>
+        if (res.isError) return "Failed: " + res.value
+        else res.show()
+      }
+  }
+
+  private def wrapCommand(line: String): Result = {
+    def failMsg = "Argument to :wrap must be the name of a method with 
signature [T](=> T): T"
+    onIntp { intp =>
+      import intp._
+      import global._
+
+      words(line) match {
+        case Nil            =>
+          intp.executionWrapper match {
+            case ""   => "No execution wrapper is set."
+            case s    => "Current execution wrapper: " + s
+          }
+        case "clear" :: Nil =>
+          intp.executionWrapper match {
+            case ""   => "No execution wrapper is set."
+            case s    => intp.clearExecutionWrapper() ; "Cleared execution 
wrapper."
+          }
+        case wrapper :: Nil =>
+          intp.typeOfExpression(wrapper) match {
+            case PolyType(List(targ), MethodType(List(arg), restpe)) =>
+              intp setExecutionWrapper intp.pathToTerm(wrapper)
+              "Set wrapper to '" + wrapper + "'"
+            case tp =>
+              failMsg + "\nFound: <unknown>"
+          }
+        case _ => failMsg
+      }
+    }
+  }
+
+  private def pathToPhaseWrapper = intp.pathToTerm("$r") + ".phased.atCurrent"
+  // private def phaseCommand(name: String): Result = {
+  //   val phased: Phased = power.phased
+  //   import phased.NoPhaseName
+
+  //   if (name == "clear") {
+  //     phased.set(NoPhaseName)
+  //     intp.clearExecutionWrapper()
+  //     "Cleared active phase."
+  //   }
+  //   else if (name == "") phased.get match {
+  //     case NoPhaseName => "Usage: :phase <expr> (e.g. typer, erasure.next, 
erasure+3)"
+  //     case ph          => "Active phase is '%s'.  (To clear, :phase 
clear)".format(phased.get)
+  //   }
+  //   else {
+  //     val what = phased.parse(name)
+  //     if (what.isEmpty || !phased.set(what))
+  //       "'" + name + "' does not appear to represent a valid phase."
+  //     else {
+  //       intp.setExecutionWrapper(pathToPhaseWrapper)
+  //       val activeMessage =
+  //         if (what.toString.length == name.length) "" + what
+  //         else "%s (%s)".format(what, name)
+
+  //       "Active phase is now: " + activeMessage
+  //     }
+  //   }
+  // }
+
+  /** Available commands */
+  def commands: List[LoopCommand] = standardCommands /*++ (
+    if (isReplPower) powerCommands else Nil
+  )*/
+
+  private val replayQuestionMessage =
+    """|That entry seems to have slain the compiler.  Shall I replay
+       |your session? I can re-run each line except the last one.
+       |[y/n]
+    """.trim.stripMargin
+
+  private def crashRecovery(ex: Throwable): Boolean = {
+    echo(ex.toString)
+    ex match {
+      case _: NoSuchMethodError | _: NoClassDefFoundError =>
+        echo("\nUnrecoverable error.")
+        throw ex
+      case _  =>
+        def fn(): Boolean =
+          try in.readYesOrNo(replayQuestionMessage, { echo("\nYou must enter y 
or n.") ; fn() })
+          catch { case _: RuntimeException => false }
+
+        if (fn()) replay()
+        else echo("\nAbandoning crashed session.")
+    }
+    true
+  }
+
+  /** The main read-eval-print loop for the repl.  It calls
+   *  command() for each line of input, and stops when
+   *  command() returns false.
+   */
+  def loop() {
+    def readOneLine() = {
+      out.flush()
+      in readLine prompt
+    }
+    // return false if repl should exit
+    def processLine(line: String): Boolean = {
+      if (isAsync) {
+        if (!awaitInitialized()) return false
+        runThunks()
+      }
+      if (line eq null) false               // assume null means EOF
+      else command(line) match {
+        case Result(false, _)           => false
+        case Result(_, Some(finalLine)) => addReplay(finalLine) ; true
+        case _                          => true
+      }
+    }
+    def innerLoop() {
+      val shouldContinue = try {
+        processLine(readOneLine())
+      } catch {case t: Throwable => crashRecovery(t)}
+      if (shouldContinue)
+        innerLoop()
+    }
+    innerLoop()
+  }
+
+  /** interpret all lines from a specified file */
+  def interpretAllFrom(file: File) {
+    savingReader {
+      savingReplayStack {
+        file applyReader { reader =>
+          in = SimpleReader(reader, out, false)
+          echo("Loading " + file + "...")
+          loop()
+        }
+      }
+    }
+  }
+
+  /** create a new interpreter and replay the given commands */
+  def replay() {
+    reset()
+    if (replayCommandStack.isEmpty)
+      echo("Nothing to replay.")
+    else for (cmd <- replayCommands) {
+      echo("Replaying: " + cmd)  // flush because maybe cmd will have its own 
output
+      command(cmd)
+      echo("")
+    }
+  }
+  def resetCommand() {
+    echo("Resetting repl state.")
+    if (replayCommandStack.nonEmpty) {
+      echo("Forgetting this session history:\n")
+      replayCommands foreach echo
+      echo("")
+      replayCommandStack = Nil
+    }
+    if (intp.namedDefinedTerms.nonEmpty)
+      echo("Forgetting all expression results and named terms: " + 
intp.namedDefinedTerms.mkString(", "))
+    if (intp.definedTypes.nonEmpty)
+      echo("Forgetting defined types: " + intp.definedTypes.mkString(", "))
+
+    reset()
+  }
+
+  def reset() {
+    intp.reset()
+    // unleashAndSetPhase()
+  }
+
+  /** fork a shell and run a command */
+  lazy val shCommand = new LoopCommand("sh", "run a shell command (result is 
implicitly => List[String])") {
+    override def usage = "<command line>"
+    def apply(line: String): Result = line match {
+      case ""   => showUsage()
+      case _    =>
+        val toRun = classOf[ProcessResult].getName + "(" + 
string2codeQuoted(line) + ")"
+        intp interpret toRun
+        ()
+    }
+  }
+
+  def withFile(filename: String)(action: File => Unit) {
+    val f = File(filename)
+
+    if (f.exists) action(f)
+    else echo("That file does not exist")
+  }
+
+  def loadCommand(arg: String) = {
+    var shouldReplay: Option[String] = None
+    withFile(arg)(f => {
+      interpretAllFrom(f)
+      shouldReplay = Some(":load " + arg)
+    })
+    Result(true, shouldReplay)
+  }
+
+  def addAllClasspath(args: Seq[String]): Unit = {
+    var added = false
+    var totalClasspath = ""
+    for (arg <- args) {
+      val f = File(arg).normalize
+      if (f.exists) {
+        added = true
+        addedClasspath = ClassPath.join(addedClasspath, f.path)
+        totalClasspath = ClassPath.join(settings.classpath.value, 
addedClasspath)
+        intp.addUrlsToClassPath(f.toURI.toURL)
+        sparkContext.addJar(f.toURI.toURL.getPath)
+      }
+    }
+  }
+
+  def addClasspath(arg: String): Unit = {
+    val f = File(arg).normalize
+    if (f.exists) {
+      addedClasspath = ClassPath.join(addedClasspath, f.path)
+      intp.addUrlsToClassPath(f.toURI.toURL)
+      sparkContext.addJar(f.toURI.toURL.getPath)
+      echo("Added '%s'.  Your new classpath is:\n\"%s\"".format(f.path, 
intp.global.classPath.asClasspathString))
+    }
+    else echo("The path '" + f + "' doesn't seem to exist.")
+  }
+
+
+  def powerCmd(): Result = {
+    if (isReplPower) "Already in power mode."
+    else enablePowerMode(false)
+  }
+
+  def enablePowerMode(isDuringInit: Boolean) = {
+    // replProps.power setValue true
+    // unleashAndSetPhase()
+    // asyncEcho(isDuringInit, power.banner)
+  }
+  // private def unleashAndSetPhase() {
+//     if (isReplPower) {
+// //      power.unleash()
+//       // Set the phase to "typer"
+//       intp beSilentDuring phaseCommand("typer")
+//     }
+//   }
+
+  def asyncEcho(async: Boolean, msg: => String) {
+    if (async) asyncMessage(msg)
+    else echo(msg)
+  }
+
+  def verbosity() = {
+    // val old = intp.printResults
+    // intp.printResults = !old
+    // echo("Switched " + (if (old) "off" else "on") + " result printing.")
+  }
+
+  /** Run one command submitted by the user.  Two values are returned:
+    * (1) whether to keep running, (2) the line to record for replay,
+    * if any. */
+  def command(line: String): Result = {
+    if (line startsWith ":") {
+      val cmd = line.tail takeWhile (x => !x.isWhitespace)
+      uniqueCommand(cmd) match {
+        case Some(lc) => lc(line.tail stripPrefix cmd dropWhile 
(_.isWhitespace))
+        case _        => ambiguousError(cmd)
+      }
+    }
+    else if (intp.global == null) Result(false, None)  // Notice failure to 
create compiler
+    else Result(true, interpretStartingWith(line))
+  }
+
+  private def readWhile(cond: String => Boolean) = {
+    Iterator continually in.readLine("") takeWhile (x => x != null && cond(x))
+  }
+
+  def pasteCommand(): Result = {
+    echo("// Entering paste mode (ctrl-D to finish)\n")
+    val code = readWhile(_ => true) mkString "\n"
+    echo("\n// Exiting paste mode, now interpreting.\n")
+    intp interpret code
+    ()
+  }
+
+  private object paste extends Pasted {
+    val ContinueString = "     | "
+    val PromptString   = "scala> "
+
+    def interpret(line: String): Unit = {
+      echo(line.trim)
+      intp interpret line
+      echo("")
+    }
+
+    def transcript(start: String) = {
+      echo("\n// Detected repl transcript paste: ctrl-D to finish.\n")
+      apply(Iterator(start) ++ readWhile(_.trim != PromptString.trim))
+    }
+  }
+  import paste.{ ContinueString, PromptString }
+
+  /** Interpret expressions starting with the first line.
+    * Read lines until a complete compilation unit is available
+    * or until a syntax error has been seen.  If a full unit is
+    * read, go ahead and interpret it.  Return the full string
+    * to be recorded for replay, if any.
+    */
+  def interpretStartingWith(code: String): Option[String] = {
+    // signal completion non-completion input has been received
+    in.completion.resetVerbosity()
+
+    def reallyInterpret = {
+      val reallyResult = intp.interpret(code)
+      (reallyResult, reallyResult match {
+        case IR.Error       => None
+        case IR.Success     => Some(code)
+        case IR.Incomplete  =>
+          if (in.interactive && code.endsWith("\n\n")) {
+            echo("You typed two blank lines.  Starting a new command.")
+            None
+          }
+          else in.readLine(ContinueString) match {
+            case null =>
+              // we know compilation is going to fail since we're at EOF and 
the
+              // parser thinks the input is still incomplete, but since this is
+              // a file being read non-interactively we want to fail.  So we 
send
+              // it straight to the compiler for the nice error message.
+              intp.compileString(code)
+              None
+
+            case line => interpretStartingWith(code + "\n" + line)
+          }
+      })
+    }
+
+    /** Here we place ourselves between the user and the interpreter and 
examine
+     *  the input they are ostensibly submitting.  We intervene in several 
cases:
+     *
+     *  1) If the line starts with "scala> " it is assumed to be an 
interpreter paste.
+     *  2) If the line starts with "." (but not ".." or "./") it is treated as 
an invocation
+     *     on the previous result.
+     *  3) If the Completion object's execute returns Some(_), we inject that 
value
+     *     and avoid the interpreter, as it's likely not valid scala code.
+     */
+    if (code == "") None
+    else if (!paste.running && code.trim.startsWith(PromptString)) {
+      paste.transcript(code)
+      None
+    }
+    else if (Completion.looksLikeInvocation(code) && intp.mostRecentVar != "") 
{
+      interpretStartingWith(intp.mostRecentVar + code)
+    }
+    else if (code.trim startsWith "//") {
+      // line comment, do nothing
+      None
+    }
+    else
+      reallyInterpret._2
+  }
+
+  // runs :load `file` on any files passed via -i
+  def loadFiles(settings: Settings) = settings match {
+    case settings: SparkRunnerSettings =>
+      for (filename <- settings.loadfiles.value) {
+        val cmd = ":load " + filename
+        command(cmd)
+        addReplay(cmd)
+        echo("")
+      }
+    case _ =>
+  }
+
+  /** Tries to create a JLineReader, falling back to SimpleReader:
+   *  unless settings or properties are such that it should start
+   *  with SimpleReader.
+   */
+  def chooseReader(settings: Settings): InteractiveReader = {
+    if (settings.Xnojline.value || Properties.isEmacsShell)
+      SimpleReader()
+    else try new SparkJLineReader(
+      if (settings.noCompletion.value) NoCompletion
+      else new SparkJLineCompletion(intp)
+    )
+    catch {
+      case ex @ (_: Exception | _: NoClassDefFoundError) =>
+        echo("Failed to created SparkJLineReader: " + ex + "\nFalling back to 
SimpleReader.")
+        SimpleReader()
+    }
+  }
+
+  val u: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
+  val m = u.runtimeMirror(Utils.getSparkClassLoader)
+  private def tagOfStaticClass[T: ClassTag]: u.TypeTag[T] =
+    u.TypeTag[T](
+      m,
+      new TypeCreator {
+        def apply[U <: ApiUniverse with Singleton](m: Mirror[U]): U # Type =
+          
m.staticClass(classTag[T].runtimeClass.getName).toTypeConstructor.asInstanceOf[U
 # Type]
+      })
+
+  def process(settings: Settings): Boolean = savingContextLoader {
+    if (getMaster() == "yarn-client") System.setProperty("SPARK_YARN_MODE", 
"true")
+
+    this.settings = settings
+    createInterpreter()
+
+    // sets in to some kind of reader depending on environmental cues
+    in = in0 match {
+      case Some(reader) => SimpleReader(reader, out, true)
+      case None         =>
+        // some post-initialization
+        chooseReader(settings) match {
+          case x: SparkJLineReader => addThunk(x.consoleReader.postInit) ; x
+          case x                   => x
+        }
+    }
+    lazy val tagOfSparkIMain = 
tagOfStaticClass[org.apache.spark.repl.SparkIMain]
+    // Bind intp somewhere out of the regular namespace where
+    // we can get at it in generated code.
+    addThunk(intp.quietBind(NamedParam[SparkIMain]("$intp", 
intp)(tagOfSparkIMain, classTag[SparkIMain])))
+    addThunk({
+      import scala.tools.nsc.io._
+      import Properties.userHome
+      import scala.compat.Platform.EOL
+      val autorun = replProps.replAutorunCode.option flatMap (f => 
io.File(f).safeSlurp())
+      if (autorun.isDefined) intp.quietRun(autorun.get)
+    })
+
+    addThunk(printWelcome())
+    addThunk(initializeSpark())
+
+    // it is broken on startup; go ahead and exit
+    if (intp.reporter.hasErrors)
+      return false
+
+    // This is about the illusion of snappiness.  We call initialize()
+    // which spins off a separate thread, then print the prompt and try
+    // our best to look ready.  The interlocking lazy vals tend to
+    // inter-deadlock, so we break the cycle with a single asynchronous
+    // message to an actor.
+    if (isAsync) {
+      intp initialize initializedCallback()
+      createAsyncListener() // listens for signal to run postInitialization
+    }
+    else {
+      intp.initializeSynchronous()
+      postInitialization()
+    }
+    // printWelcome()
+
+    loadFiles(settings)
+
+    try loop()
+    catch AbstractOrMissingHandler()
+    finally closeInterpreter()
+
+    true
+  }
+
+  def createSparkContext(): SparkContext = {
+    val execUri = System.getenv("SPARK_EXECUTOR_URI")
+    val jars = SparkILoop.getAddedJars
+    val conf = new SparkConf()
+      .setMaster(getMaster())
+      .setAppName("Spark shell")
+      .setJars(jars)
+      .set("spark.repl.class.uri", intp.classServer.uri)
+    if (execUri != null) {
+      conf.set("spark.executor.uri", execUri)
+    }
+    sparkContext = new SparkContext(conf)
+    logInfo("Created spark context..")
+    sparkContext
+  }
+
+  private def getMaster(): String = {
+    val master = this.master match {
+      case Some(m) => m
+      case None =>
+        val envMaster = sys.env.get("MASTER")
+        val propMaster = sys.props.get("spark.master")
+        propMaster.orElse(envMaster).getOrElse("local[*]")
+    }
+    master
+  }
+
+  /** process command-line arguments and do as they request */
+  def process(args: Array[String]): Boolean = {
+    val command = new SparkCommandLine(args.toList, msg => echo(msg))
+    def neededHelp(): String =
+      (if (command.settings.help.value) command.usageMsg + "\n" else "") +
+      (if (command.settings.Xhelp.value) command.xusageMsg + "\n" else "")
+
+    // if they asked for no help and command is valid, we call the real main
+    neededHelp() match {
+      case ""     => command.ok && process(command.settings)
+      case help   => echoNoNL(help) ; true
+    }
+  }
+
+  @deprecated("Use `process` instead", "2.9.0")
+  def main(settings: Settings): Unit = process(settings)
+}
+
+object SparkILoop {
+  implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp
+  private def echo(msg: String) = Console println msg
+
+  def getAddedJars: Array[String] = {
+    val envJars = sys.env.get("ADD_JARS")
+    val propJars = sys.props.get("spark.jars").flatMap { p =>
+      if (p == "") None else Some(p)
+    }
+    val jars = propJars.orElse(envJars).getOrElse("")
+    Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
+  }
+
+  // Designed primarily for use by test code: take a String with a
+  // bunch of code, and prints out a transcript of what it would look
+  // like if you'd just typed it into the repl.
+  def runForTranscript(code: String, settings: Settings): String = {
+    import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
+
+    stringFromStream { ostream =>
+      Console.withOut(ostream) {
+        val output = new JPrintWriter(new OutputStreamWriter(ostream), true) {
+          override def write(str: String) = {
+            // completely skip continuation lines
+            if (str forall (ch => ch.isWhitespace || ch == '|')) ()
+            // print a newline on empty scala prompts
+            else if ((str contains '\n') && (str.trim == "scala> ")) 
super.write("\n")
+            else super.write(str)
+          }
+        }
+        val input = new BufferedReader(new StringReader(code)) {
+          override def readLine(): String = {
+            val s = super.readLine()
+            // helping out by printing the line being interpreted.
+            if (s != null)
+              output.println(s)
+            s
+          }
+        }
+        val repl = new SparkILoop(input, output)
+
+        if (settings.classpath.isDefault)
+          settings.classpath.value = sys.props("java.class.path")
+
+        getAddedJars.foreach(settings.classpath.append(_))
+
+        repl process settings
+      }
+    }
+  }
+
+  /** Creates an interpreter loop with default settings and feeds
+   *  the given code to it as input.
+   */
+  def run(code: String, sets: Settings = new Settings): String = {
+    import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
+
+    stringFromStream { ostream =>
+      Console.withOut(ostream) {
+        val input    = new BufferedReader(new StringReader(code))
+        val output   = new JPrintWriter(new OutputStreamWriter(ostream), true)
+        val repl     = new ILoop(input, output)
+
+        if (sets.classpath.isDefault)
+          sets.classpath.value = sys.props("java.class.path")
+
+        repl process sets
+      }
+    }
+  }
+  def run(lines: List[String]): String = run(lines map (_ + "\n") mkString)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12f56334/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
new file mode 100644
index 0000000..7667a9c
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -0,0 +1,147 @@
+// scalastyle:off
+
+/* NSC -- new Scala compiler
+ * Copyright 2005-2013 LAMP/EPFL
+ * @author Paul Phillips
+ */
+
+package org.apache.spark.repl
+
+import scala.tools.nsc._
+import scala.tools.nsc.interpreter._
+
+import scala.reflect.internal.util.Position
+import scala.util.control.Exception.ignoring
+import scala.tools.nsc.util.stackTraceString
+
+import org.apache.spark.SPARK_VERSION
+
+/**
+ *  Machinery for the asynchronous initialization of the repl.
+ */
+trait SparkILoopInit {
+  self: SparkILoop =>
+
+  /** Print a welcome message */
+  def printWelcome() {
+    echo("""Welcome to
+      ____              __
+     / __/__  ___ _____/ /__
+    _\ \/ _ \/ _ `/ __/  '_/
+   /___/ .__/\_,_/_/ /_/\_\   version %s
+      /_/
+""".format(SPARK_VERSION))
+    import Properties._
+    val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
+      versionString, javaVmName, javaVersion)
+    echo(welcomeMsg)
+    echo("Type in expressions to have them evaluated.")
+    echo("Type :help for more information.")
+   }
+
+  protected def asyncMessage(msg: String) {
+    if (isReplInfo || isReplPower)
+      echoAndRefresh(msg)
+  }
+
+  private val initLock = new java.util.concurrent.locks.ReentrantLock()
+  private val initCompilerCondition = initLock.newCondition() // signal the 
compiler is initialized
+  private val initLoopCondition = initLock.newCondition()     // signal the 
whole repl is initialized
+  private val initStart = System.nanoTime
+
+  private def withLock[T](body: => T): T = {
+    initLock.lock()
+    try body
+    finally initLock.unlock()
+  }
+  // a condition used to ensure serial access to the compiler.
+  @volatile private var initIsComplete = false
+  @volatile private var initError: String = null
+  private def elapsed() = "%.3f".format((System.nanoTime - initStart).toDouble 
/ 1000000000L)
+
+  // the method to be called when the interpreter is initialized.
+  // Very important this method does nothing synchronous (i.e. do
+  // not try to use the interpreter) because until it returns, the
+  // repl's lazy val `global` is still locked.
+  protected def initializedCallback() = 
withLock(initCompilerCondition.signal())
+
+  // Spins off a thread which awaits a single message once the interpreter
+  // has been initialized.
+  protected def createAsyncListener() = {
+    io.spawn {
+      withLock(initCompilerCondition.await())
+      asyncMessage("[info] compiler init time: " + elapsed() + " s.")
+      postInitialization()
+    }
+  }
+
+  // called from main repl loop
+  protected def awaitInitialized(): Boolean = {
+    if (!initIsComplete)
+      withLock { while (!initIsComplete) initLoopCondition.await() }
+    if (initError != null) {
+      println("""
+        |Failed to initialize the REPL due to an unexpected error.
+        |This is a bug, please, report it along with the error diagnostics 
printed below.
+        |%s.""".stripMargin.format(initError)
+      )
+      false
+    } else true
+  }
+  // private def warningsThunks = List(
+  //   () => intp.bind("lastWarnings", "" + typeTag[List[(Position, String)]], 
intp.lastWarnings _),
+  // )
+
+  protected def postInitThunks = List[Option[() => Unit]](
+    Some(intp.setContextClassLoader _),
+    if (isReplPower) Some(() => enablePowerMode(true)) else None
+  ).flatten
+  // ++ (
+  //   warningsThunks
+  // )
+  // called once after init condition is signalled
+  protected def postInitialization() {
+    try {
+      postInitThunks foreach (f => addThunk(f()))
+      runThunks()
+    } catch {
+      case ex: Throwable =>
+        initError = stackTraceString(ex)
+        throw ex
+    } finally {
+      initIsComplete = true
+
+      if (isAsync) {
+        asyncMessage("[info] total init time: " + elapsed() + " s.")
+        withLock(initLoopCondition.signal())
+      }
+    }
+  }
+
+  def initializeSpark() {
+    intp.beQuietDuring {
+      command("""
+         @transient val sc = 
org.apache.spark.repl.Main.interp.createSparkContext();
+        """)
+      command("import org.apache.spark.SparkContext._")
+    }
+    echo("Spark context available as sc.")
+  }
+
+  // code to be executed only after the interpreter is initialized
+  // and the lazy val `global` can be accessed without risk of deadlock.
+  private var pendingThunks: List[() => Unit] = Nil
+  protected def addThunk(body: => Unit) = synchronized {
+    pendingThunks :+= (() => body)
+  }
+  protected def runThunks(): Unit = synchronized {
+    if (pendingThunks.nonEmpty)
+      logDebug("Clearing " + pendingThunks.size + " thunks.")
+
+    while (pendingThunks.nonEmpty) {
+      val thunk = pendingThunks.head
+      pendingThunks = pendingThunks.tail
+      thunk()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to