[SPARK-22087][SPARK-14650][WIP][BUILD][REPL][CORE] Compile Spark REPL for Scala 2.12 + other 2.12 fixes
## What changes were proposed in this pull request? Enable Scala 2.12 REPL. Fix most remaining issues with 2.12 compilation and warnings, including: - Selecting Kafka 0.10.1+ for Scala 2.12 and patching over a minor API difference - Fixing lots of "eta expansion of zero arg method deprecated" warnings - Resolving the SparkContext.sequenceFile implicits compile problem - Fixing an odd but valid jetty-server missing dependency in hive-thriftserver ## How was this patch tested? Existing tests Author: Sean Owen <so...@cloudera.com> Closes #19307 from srowen/Scala212. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/576c43fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/576c43fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/576c43fb Branch: refs/heads/master Commit: 576c43fb4226e4efa12189b41c3bc862019862c6 Parents: 4943ea5 Author: Sean Owen <so...@cloudera.com> Authored: Sun Sep 24 09:40:13 2017 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Sun Sep 24 09:40:13 2017 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 36 ++ .../deploy/history/FsHistoryProvider.scala | 8 +- .../org/apache/spark/deploy/worker/Worker.scala | 14 +- .../executor/CoarseGrainedExecutorBackend.scala | 4 +- .../spark/memory/UnifiedMemoryManager.scala | 2 +- .../apache/spark/rdd/DoubleRDDFunctions.scala | 6 +- .../apache/spark/rpc/netty/NettyRpcEnv.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 14 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 10 +- .../spark/serializer/KryoSerializer.scala | 4 +- .../spark/storage/BlockManagerMaster.scala | 23 +- .../storage/BlockManagerSlaveEndpoint.scala | 10 +- .../apache/spark/ui/UIWorkloadGenerator.scala | 14 +- .../spark/deploy/SparkSubmitUtilsSuite.scala | 11 +- .../deploy/history/ApplicationCacheSuite.scala | 4 +- .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 8 +- .../scheduler/SchedulerIntegrationSuite.scala | 2 + .../apache/spark/serializer/KryoBenchmark.scala | 16 +- examples/pom.xml | 2 +- .../flume/FlumePollingStreamSuite.scala | 4 +- external/kafka-0-10-sql/pom.xml | 11 + .../spark/sql/kafka010/KafkaTestUtils.scala | 5 +- external/kafka-0-10/pom.xml | 11 + .../scala/org/apache/spark/ml/Pipeline.scala | 5 +- .../spark/ml/classification/LinearSVC.scala | 2 +- .../ml/classification/LogisticRegression.scala | 2 +- .../spark/ml/classification/OneVsRest.scala | 2 +- .../spark/ml/regression/LinearRegression.scala | 2 +- .../spark/ml/tree/impl/RandomForest.scala | 6 +- .../spark/mllib/tree/impurity/Impurity.scala | 2 +- .../DifferentiableLossAggregatorSuite.scala | 4 +- .../spark/ml/tree/impl/RandomForestSuite.scala | 6 +- repl/pom.xml | 2 - .../main/scala/org/apache/spark/repl/Main.scala | 122 ------ .../scala/org/apache/spark/repl/ReplSuite.scala | 220 ---------- .../apache/spark/repl/SingletonReplSuite.scala | 408 ------------------- .../org/apache/spark/repl/SparkILoop.scala | 134 ++++++ .../main/scala/org/apache/spark/repl/Main.scala | 122 ++++++ .../scala/org/apache/spark/repl/ReplSuite.scala | 220 ++++++++++ .../apache/spark/repl/SingletonReplSuite.scala | 408 +++++++++++++++++++ .../expressions/aggregate/Percentile.scala | 2 +- .../org/apache/spark/sql/types/Metadata.scala | 2 +- .../spark/sql/execution/GenerateExec.scala | 4 +- .../datasources/InMemoryFileIndex.scala | 1 + .../datasources/csv/UnivocityParser.scala | 2 +- .../sql/execution/joins/HashedRelation.scala | 8 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 4 + .../apache/spark/sql/DataFrameStatSuite.scala | 3 - .../streaming/StreamingQueryManagerSuite.scala | 2 +- sql/hive-thriftserver/pom.xml | 10 + .../spark/streaming/StreamingContext.scala | 2 +- 51 files changed, 1066 insertions(+), 862 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1821bc8..cec61d8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2826,6 +2826,42 @@ object WritableConverter { // them automatically. However, we still keep the old functions in SparkContext for backward // compatibility and forward to the following functions directly. + // The following implicit declarations have been added on top of the very similar ones + // below in order to enable compatibility with Scala 2.12. Scala 2.12 deprecates eta + // expansion of zero-arg methods and thus won't match a no-arg method where it expects + // an implicit that is a function of no args. + + implicit val intWritableConverterFn: () => WritableConverter[Int] = + () => simpleWritableConverter[Int, IntWritable](_.get) + + implicit val longWritableConverterFn: () => WritableConverter[Long] = + () => simpleWritableConverter[Long, LongWritable](_.get) + + implicit val doubleWritableConverterFn: () => WritableConverter[Double] = + () => simpleWritableConverter[Double, DoubleWritable](_.get) + + implicit val floatWritableConverterFn: () => WritableConverter[Float] = + () => simpleWritableConverter[Float, FloatWritable](_.get) + + implicit val booleanWritableConverterFn: () => WritableConverter[Boolean] = + () => simpleWritableConverter[Boolean, BooleanWritable](_.get) + + implicit val bytesWritableConverterFn: () => WritableConverter[Array[Byte]] = { + () => simpleWritableConverter[Array[Byte], BytesWritable] { bw => + // getBytes method returns array which is longer then data to be returned + Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) + } + } + + implicit val stringWritableConverterFn: () => WritableConverter[String] = + () => simpleWritableConverter[String, Text](_.toString) + + implicit def writableWritableConverterFn[T <: Writable : ClassTag]: () => WritableConverter[T] = + () => new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) + + // These implicits remain included for backwards-compatibility. They fulfill the + // same role as those above. + implicit def intWritableConverter(): WritableConverter[Int] = simpleWritableConverter[Int, IntWritable](_.get) http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 20fe911..910121e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -218,11 +218,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) if (!conf.contains("spark.testing")) { // A task that periodically checks for event log updates on disk. logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds") - pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) + pool.scheduleWithFixedDelay( + getRunner(() => checkForLogs()), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { // A task that periodically cleans event logs on disk. - pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) + pool.scheduleWithFixedDelay( + getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) } } else { logDebug("Background update thread disabled for testing") @@ -268,7 +270,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appListener.adminAclsGroups.getOrElse("") ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))) + Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize))) } else { None } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 29a810f..ed5fa4b 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -450,10 +450,9 @@ private[deploy] class Worker( } }(cleanupThreadExecutor) - cleanupFuture.onFailure { - case e: Throwable => - logError("App dir cleanup failed: " + e.getMessage, e) - }(cleanupThreadExecutor) + cleanupFuture.failed.foreach(e => + logError("App dir cleanup failed: " + e.getMessage, e) + )(cleanupThreadExecutor) case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) @@ -622,10 +621,9 @@ private[deploy] class Worker( dirList.foreach { dir => Utils.deleteRecursively(new File(dir)) } - }(cleanupThreadExecutor).onFailure { - case e: Throwable => - logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e) - }(cleanupThreadExecutor) + }(cleanupThreadExecutor).failed.foreach(e => + logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e) + )(cleanupThreadExecutor) } shuffleService.applicationRemoved(id) } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ed893cd..d27362a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -163,9 +163,9 @@ private[spark] class CoarseGrainedExecutorBackend( if (notifyDriver && driver.nonEmpty) { driver.get.ask[Boolean]( RemoveExecutor(executorId, new ExecutorLossReason(reason)) - ).onFailure { case e => + ).failed.foreach(e => logWarning(s"Unable to notify the driver due to " + e.getMessage, e) - }(ThreadUtils.sameThread) + )(ThreadUtils.sameThread) } System.exit(code) http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index df19355..78edd2c 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -143,7 +143,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( } executionPool.acquireMemory( - numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize) + numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize) } override def acquireStorageMemory( http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index 57782c0..943abae 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -128,9 +128,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { } // Compute the minimum and the maximum val (max: Double, min: Double) = self.mapPartitions { items => - Iterator(items.foldRight(Double.NegativeInfinity, - Double.PositiveInfinity)((e: Double, x: (Double, Double)) => - (x._1.max(e), x._2.min(e)))) + Iterator( + items.foldRight((Double.NegativeInfinity, Double.PositiveInfinity) + )((e: Double, x: (Double, Double)) => (x._1.max(e), x._2.min(e)))) }.reduce { (maxmin1, maxmin2) => (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2)) } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 1777e7a..f951591 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -232,7 +232,7 @@ private[netty] class NettyRpcEnv( onFailure, (client, response) => onSuccess(deserialize[Any](client, response))) postToOutbox(message.receiver, rpcMessage) - promise.future.onFailure { + promise.future.failed.foreach { case _: TimeoutException => rpcMessage.onTimeout() case _ => }(ThreadUtils.sameThread) http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 562dd1d..9153751 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec import scala.collection.Map -import scala.collection.mutable.{HashMap, HashSet, Stack} +import scala.collection.mutable.{ArrayStack, HashMap, HashSet} import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps @@ -396,12 +396,12 @@ class DAGScheduler( /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ private def getMissingAncestorShuffleDependencies( - rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { - val ancestors = new Stack[ShuffleDependency[_, _, _]] + rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = { + val ancestors = new ArrayStack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting - val waitingForVisit = new Stack[RDD[_]] + val waitingForVisit = new ArrayStack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() @@ -434,7 +434,7 @@ class DAGScheduler( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] - val waitingForVisit = new Stack[RDD[_]] + val waitingForVisit = new ArrayStack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() @@ -456,7 +456,7 @@ class DAGScheduler( val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting - val waitingForVisit = new Stack[RDD[_]] + val waitingForVisit = new ArrayStack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd @@ -1633,7 +1633,7 @@ class DAGScheduler( val visitedRdds = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting - val waitingForVisit = new Stack[RDD[_]] + val waitingForVisit = new ArrayStack[RDD[_]] def visit(rdd: RDD[_]) { if (!visitedRdds(rdd)) { visitedRdds += rdd http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a0ef209..424e43b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -471,15 +471,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { // Only log the failure since we don't care about the result. - driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure { - case t => logError(t.getMessage, t) - }(ThreadUtils.sameThread) + driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).failed.foreach(t => + logError(t.getMessage, t))(ThreadUtils.sameThread) } protected def removeWorker(workerId: String, host: String, message: String): Unit = { - driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure { - case t => logError(t.getMessage, t) - }(ThreadUtils.sameThread) + driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t => + logError(t.getMessage, t))(ThreadUtils.sameThread) } def sufficientResourcesRegistered(): Boolean = true http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 4f03e54..58483c9 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -501,8 +501,8 @@ private class JavaIterableWrapperSerializer private object JavaIterableWrapperSerializer extends Logging { // The class returned by JavaConverters.asJava // (scala.collection.convert.Wrappers$IterableWrapper). - val wrapperClass = - scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass + import scala.collection.JavaConverters._ + val wrapperClass = Seq(1).asJava.getClass // Get the underlying method so we can use it to get the Scala collection for serialization. private val underlyingMethodOpt = { http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index ea5d842..8b1dc0b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -118,10 +118,9 @@ class BlockManagerMaster( /** Remove all blocks belonging to the given RDD. */ def removeRdd(rddId: Int, blocking: Boolean) { val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId)) - future.onFailure { - case e: Exception => - logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e) - }(ThreadUtils.sameThread) + future.failed.foreach(e => + logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e) + )(ThreadUtils.sameThread) if (blocking) { timeout.awaitResult(future) } @@ -130,10 +129,9 @@ class BlockManagerMaster( /** Remove all blocks belonging to the given shuffle. */ def removeShuffle(shuffleId: Int, blocking: Boolean) { val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) - future.onFailure { - case e: Exception => - logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e) - }(ThreadUtils.sameThread) + future.failed.foreach(e => + logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e) + )(ThreadUtils.sameThread) if (blocking) { timeout.awaitResult(future) } @@ -143,11 +141,10 @@ class BlockManagerMaster( def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { val future = driverEndpoint.askSync[Future[Seq[Int]]]( RemoveBroadcast(broadcastId, removeFromMaster)) - future.onFailure { - case e: Exception => - logWarning(s"Failed to remove broadcast $broadcastId" + - s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e) - }(ThreadUtils.sameThread) + future.failed.foreach(e => + logWarning(s"Failed to remove broadcast $broadcastId" + + s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e) + )(ThreadUtils.sameThread) if (blocking) { timeout.awaitResult(future) } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index 1aaa424..742cf4f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -85,13 +85,13 @@ class BlockManagerSlaveEndpoint( logDebug(actionMessage) body } - future.onSuccess { case response => - logDebug("Done " + actionMessage + ", response is " + response) + future.foreach { response => + logDebug(s"Done $actionMessage, response is $response") context.reply(response) - logDebug("Sent response: " + response + " to " + context.senderAddress) + logDebug(s"Sent response: $response to ${context.senderAddress}") } - future.onFailure { case t: Throwable => - logError("Error in " + actionMessage, t) + future.failed.foreach { t => + logError(s"Error in $actionMessage", t) context.sendFailure(t) } } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 094953f..6229e80 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -66,11 +66,11 @@ private[spark] object UIWorkloadGenerator { def nextFloat(): Float = new Random().nextFloat() val jobs = Seq[(String, () => Long)]( - ("Count", baseData.count), - ("Cache and Count", baseData.map(x => x).cache().count), - ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count), - ("Entirely failed phase", baseData.map(x => throw new Exception).count), - ("Partially failed phase", { + ("Count", () => baseData.count), + ("Cache and Count", () => baseData.map(x => x).cache().count), + ("Single Shuffle", () => baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count), + ("Entirely failed phase", () => baseData.map { x => throw new Exception(); 1 }.count), + ("Partially failed phase", () => { baseData.map{x => val probFailure = (4.0 / NUM_PARTITIONS) if (nextFloat() < probFailure) { @@ -79,7 +79,7 @@ private[spark] object UIWorkloadGenerator { 1 }.count }), - ("Partially failed phase (longer tasks)", { + ("Partially failed phase (longer tasks)", () => { baseData.map{x => val probFailure = (4.0 / NUM_PARTITIONS) if (nextFloat() < probFailure) { @@ -89,7 +89,7 @@ private[spark] object UIWorkloadGenerator { 1 }.count }), - ("Job with delays", baseData.map(x => Thread.sleep(100)).count) + ("Job with delays", () => baseData.map(x => Thread.sleep(100)).count) ) val barrier = new Semaphore(-nJobSet * jobs.size + 1) http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 88b77e5..eb8c203 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -83,11 +83,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val resolver = settings.getDefaultResolver.asInstanceOf[ChainResolver] assert(resolver.getResolvers.size() === 4) val expected = repos.split(",").map(r => s"$r/") - resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) => - if (1 < i && i < 3) { - assert(resolver.getName === s"repo-$i") - assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1)) - } + resolver.getResolvers.toArray.map(_.asInstanceOf[AbstractResolver]).zipWithIndex.foreach { + case (r, i) => + if (1 < i && i < 3) { + assert(r.getName === s"repo-$i") + assert(r.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1)) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala index c175ed3..6e50e84 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -78,7 +78,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar logDebug(s"getAppUI($appId, $attemptId)") getAppUICount += 1 instances.get(CacheKey(appId, attemptId)).map( e => - LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime))) + LoadedAppUI(e.ui, () => updateProbe(appId, attemptId, e.probeTime))) } override def attachSparkUI( @@ -122,7 +122,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar completed: Boolean, timestamp: Long): Unit = { instances += (CacheKey(appId, attemptId) -> - new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp)) + new CacheEntry(ui, completed, () => updateProbe(appId, attemptId, timestamp), timestamp)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index f4be8ea..de0e71a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -130,10 +130,10 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim info("Should not have reached this code path (onComplete matching Failure)") throw new Exception("Task should succeed") } - f.onSuccess { case a: Any => + f.foreach { a => sem.release() } - f.onFailure { case t => + f.failed.foreach { t => info("Should not have reached this code path (onFailure)") throw new Exception("Task should succeed") } @@ -164,11 +164,11 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim case scala.util.Failure(e) => sem.release() } - f.onSuccess { case a: Any => + f.foreach { a => info("Should not have reached this code path (onSuccess)") throw new Exception("Task should fail") } - f.onFailure { case t => + f.failed.foreach { t => sem.release() } intercept[SparkException] { http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index a8249e1..75ea409 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -625,6 +625,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor backend.taskFailed(taskDescription, fetchFailed) case (1, _, partition) => backend.taskSuccess(taskDescription, 42 + partition) + case unmatched => + fail(s"Unexpected shuffle output $unmatched") } } withBackend(runBackend _) { http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala index 64be966..a1cf357 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala @@ -78,10 +78,10 @@ class KryoBenchmark extends SparkFunSuite { sum } } - basicTypes("Int", Random.nextInt) - basicTypes("Long", Random.nextLong) - basicTypes("Float", Random.nextFloat) - basicTypes("Double", Random.nextDouble) + basicTypes("Int", () => Random.nextInt()) + basicTypes("Long", () => Random.nextLong()) + basicTypes("Float", () => Random.nextFloat()) + basicTypes("Double", () => Random.nextDouble()) // Benchmark Array of Primitives val arrayCount = 10000 @@ -101,10 +101,10 @@ class KryoBenchmark extends SparkFunSuite { sum } } - basicTypeArray("Int", Random.nextInt) - basicTypeArray("Long", Random.nextLong) - basicTypeArray("Float", Random.nextFloat) - basicTypeArray("Double", Random.nextDouble) + basicTypeArray("Int", () => Random.nextInt()) + basicTypeArray("Long", () => Random.nextLong()) + basicTypeArray("Float", () => Random.nextFloat()) + basicTypeArray("Double", () => Random.nextDouble()) // Benchmark Maps val mapsCount = 1000 http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 33eca48..52a6764 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -114,7 +114,7 @@ <dependency> <groupId>com.github.scopt</groupId> <artifactId>scopt_${scala.binary.version}</artifactId> - <version>3.3.0</version> + <version>3.7.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 1c93079..4324cc6 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -61,11 +61,11 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with } test("flume polling test") { - testMultipleTimes(testFlumePolling) + testMultipleTimes(() => testFlumePolling()) } test("flume polling test multiple hosts") { - testMultipleTimes(testFlumePollingMultipleHost) + testMultipleTimes(() => testFlumePollingMultipleHost()) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/external/kafka-0-10-sql/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 0f61a10..0c9f0aa 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -102,8 +102,19 @@ </dependency> </dependencies> + <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> </build> + + <profiles> + <profile> + <id>scala-2.12</id> + <properties> + <kafka.version>0.10.1.1</kafka.version> + </properties> + </profile> + </profiles> + </project> http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 066a68a..2df8352 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -173,7 +173,10 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L AdminUtils.createTopic(zkUtils, topic, partitions, 1) created = true } catch { - case e: kafka.common.TopicExistsException if overwrite => deleteTopic(topic) + // Workaround fact that TopicExistsException is in kafka.common in 0.10.0 and + // org.apache.kafka.common.errors in 0.10.1 (!) + case e: Exception if (e.getClass.getSimpleName == "TopicExistsException") && overwrite => + deleteTopic(topic) } } // wait until metadata is propagated http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/external/kafka-0-10/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 4d9861a..6eb7ba5 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -87,8 +87,19 @@ </dependency> </dependencies> + <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> </build> + + <profiles> + <profile> + <id>scala-2.12</id> + <properties> + <kafka.version>0.10.1.1</kafka.version> + </properties> + </profile> + </profiles> + </project> http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index b76dc5f..103082b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -250,8 +250,9 @@ object Pipeline extends MLReadable[Pipeline] { // Save stages val stagesDir = new Path(path, "stages").toString - stages.zipWithIndex.foreach { case (stage: MLWritable, idx: Int) => - stage.write.save(getStagePath(stage.uid, idx, stages.length, stagesDir)) + stages.zipWithIndex.foreach { case (stage, idx) => + stage.asInstanceOf[MLWritable].write.save( + getStagePath(stage.uid, idx, stages.length, stagesDir)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 1c97d77..ce400f4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -184,7 +184,7 @@ class LinearSVC @Since("2.2.0") ( (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.treeAggregate( - new MultivariateOnlineSummarizer, new MultiClassSummarizer + (new MultivariateOnlineSummarizer, new MultiClassSummarizer) )(seqOp, combOp, $(aggregationDepth)) } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index cbc8f4a..fa19160 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -514,7 +514,7 @@ class LogisticRegression @Since("1.2.0") ( (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.treeAggregate( - new MultivariateOnlineSummarizer, new MultiClassSummarizer + (new MultivariateOnlineSummarizer, new MultiClassSummarizer) )(seqOp, combOp, $(aggregationDepth)) } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 92a7742..3ab99b3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -235,7 +235,7 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] { val extraJson = ("labelMetadata" -> instance.labelMetadata.json) ~ ("numClasses" -> instance.models.length) OneVsRestParams.saveImpl(path, instance, sc, Some(extraJson)) - instance.models.zipWithIndex.foreach { case (model: MLWritable, idx) => + instance.models.map(_.asInstanceOf[MLWritable]).zipWithIndex.foreach { case (model, idx) => val modelPath = new Path(path, s"model_$idx").toString model.save(modelPath) } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index b2a9681..df1aa60 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -265,7 +265,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.treeAggregate( - new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer + (new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer) )(seqOp, combOp, $(aggregationDepth)) } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index f7d969f..acfc639 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -169,7 +169,7 @@ private[spark] object RandomForest extends Logging { training the same tree in the next iteration. This focus allows us to send fewer trees to workers on each iteration; see topNodesForGroup below. */ - val nodeStack = new mutable.Stack[(Int, LearningNode)] + val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] val rng = new Random() rng.setSeed(seed) @@ -367,7 +367,7 @@ private[spark] object RandomForest extends Logging { nodesForGroup: Map[Int, Array[LearningNode]], treeToNodeToIndexInfo: Map[Int, Map[Int, NodeIndexInfo]], splits: Array[Array[Split]], - nodeStack: mutable.Stack[(Int, LearningNode)], + nodeStack: mutable.ArrayStack[(Int, LearningNode)], timer: TimeTracker = new TimeTracker, nodeIdCache: Option[NodeIdCache] = None): Unit = { @@ -1076,7 +1076,7 @@ private[spark] object RandomForest extends Logging { * The feature indices are None if not subsampling features. */ private[tree] def selectNodesToSplit( - nodeStack: mutable.Stack[(Int, LearningNode)], + nodeStack: mutable.ArrayStack[(Int, LearningNode)], maxMemoryUsage: Long, metadata: DecisionTreeMetadata, rng: Random): (Map[Int, Array[LearningNode]], Map[Int, Map[Int, NodeIndexInfo]]) = { http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala index 4c77468..f151a6a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala @@ -162,7 +162,7 @@ private[spark] abstract class ImpurityCalculator(val stats: Array[Double]) exten * Fails if the array is empty. */ protected def indexOfLargestArrayElement(array: Array[Double]): Int = { - val result = array.foldLeft(-1, Double.MinValue, 0) { + val result = array.foldLeft((-1, Double.MinValue, 0)) { case ((maxIndex, maxValue, currentIndex), currentValue) => if (currentValue > maxValue) { (currentIndex, currentValue, currentIndex + 1) http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala index d7cdeae..9fddf09 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala @@ -174,7 +174,7 @@ object DifferentiableLossAggregatorSuite { (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.aggregate( - new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer + (new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer) )(seqOp, combOp) } @@ -191,7 +191,7 @@ object DifferentiableLossAggregatorSuite { (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.aggregate( - new MultivariateOnlineSummarizer, new MultiClassSummarizer + (new MultivariateOnlineSummarizer, new MultiClassSummarizer) )(seqOp, combOp) } } http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala index df155b4..dbe2ea9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala @@ -324,7 +324,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val treeToNodeToIndexInfo = Map((0, Map( (topNode.id, new RandomForest.NodeIndexInfo(0, None)) ))) - val nodeStack = new mutable.Stack[(Int, LearningNode)] + val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode), nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack) @@ -366,7 +366,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val treeToNodeToIndexInfo = Map((0, Map( (topNode.id, new RandomForest.NodeIndexInfo(0, None)) ))) - val nodeStack = new mutable.Stack[(Int, LearningNode)] + val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode), nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack) @@ -478,7 +478,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val failString = s"Failed on test with:" + s"numTrees=$numTrees, featureSubsetStrategy=$featureSubsetStrategy," + s" numFeaturesPerNode=$numFeaturesPerNode, seed=$seed" - val nodeStack = new mutable.Stack[(Int, LearningNode)] + val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] val topNodes: Array[LearningNode] = new Array[LearningNode](numTrees) Range(0, numTrees).foreach { treeIndex => topNodes(treeIndex) = LearningNode.emptyNode(nodeIndex = 1) http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/pom.xml ---------------------------------------------------------------------- diff --git a/repl/pom.xml b/repl/pom.xml index 51eb9b6..bd2cfc4 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -171,7 +171,6 @@ </plugins> </build> - <!-- <profiles> <profile> <id>scala-2.12</id> @@ -181,6 +180,5 @@ </properties> </profile> </profiles> - --> </project> http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala deleted file mode 100644 index cc76a70..0000000 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.repl - -import java.io.File -import java.net.URI -import java.util.Locale - -import scala.tools.nsc.GenericRunnerSettings - -import org.apache.spark._ -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION -import org.apache.spark.util.Utils - -object Main extends Logging { - - initializeLogIfNecessary(true) - Signaling.cancelOnInterrupt() - - val conf = new SparkConf() - val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) - val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") - - var sparkContext: SparkContext = _ - var sparkSession: SparkSession = _ - // this is a public var because tests reset it. - var interp: SparkILoop = _ - - private var hasErrors = false - - private def scalaOptionError(msg: String): Unit = { - hasErrors = true - // scalastyle:off println - Console.err.println(msg) - // scalastyle:on println - } - - def main(args: Array[String]) { - doMain(args, new SparkILoop) - } - - // Visible for testing - private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { - interp = _interp - val jars = Utils.getLocalUserJarsForShell(conf) - // Remove file:///, file:// or file:/ scheme if exists for each jar - .map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x } - .mkString(File.pathSeparator) - val interpArguments = List( - "-Yrepl-class-based", - "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", - "-classpath", jars - ) ++ args.toList - - val settings = new GenericRunnerSettings(scalaOptionError) - settings.processArguments(interpArguments, true) - - if (!hasErrors) { - interp.process(settings) // Repl starts and goes in loop of R.E.P.L - Option(sparkContext).foreach(_.stop) - } - } - - def createSparkSession(): SparkSession = { - val execUri = System.getenv("SPARK_EXECUTOR_URI") - conf.setIfMissing("spark.app.name", "Spark shell") - // SparkContext will detect this configuration and register it with the RpcEnv's - // file server, setting spark.repl.class.uri to the actual URI for executors to - // use. This is sort of ugly but since executors are started as part of SparkContext - // initialization in certain cases, there's an initialization order issue that prevents - // this from being set after SparkContext is instantiated. - conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) - if (execUri != null) { - conf.set("spark.executor.uri", execUri) - } - if (System.getenv("SPARK_HOME") != null) { - conf.setSparkHome(System.getenv("SPARK_HOME")) - } - - val builder = SparkSession.builder.config(conf) - if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase(Locale.ROOT) == "hive") { - if (SparkSession.hiveClassesArePresent) { - // In the case that the property is not set at all, builder's config - // does not have this value set to 'hive' yet. The original default - // behavior is that when there are hive classes, we use hive catalog. - sparkSession = builder.enableHiveSupport().getOrCreate() - logInfo("Created Spark session with Hive support") - } else { - // Need to change it back to 'in-memory' if no hive classes are found - // in the case that the property is set to hive in spark-defaults.conf - builder.config(CATALOG_IMPLEMENTATION.key, "in-memory") - sparkSession = builder.getOrCreate() - logInfo("Created Spark session") - } - } else { - // In the case that the property is set but not to 'hive', the internal - // default is 'in-memory'. So the sparkSession will use in-memory catalog. - sparkSession = builder.getOrCreate() - logInfo("Created Spark session") - } - sparkContext = sparkSession.sparkContext - sparkSession - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala deleted file mode 100644 index c7ae194..0000000 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.repl - -import java.io._ -import java.net.URLClassLoader - -import scala.collection.mutable.ArrayBuffer - -import org.apache.log4j.{Level, LogManager} - -import org.apache.spark.{SparkContext, SparkFunSuite} -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION - -class ReplSuite extends SparkFunSuite { - - def runInterpreter(master: String, input: String): String = { - val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath" - - val in = new BufferedReader(new StringReader(input + "\n")) - val out = new StringWriter() - val cl = getClass.getClassLoader - var paths = new ArrayBuffer[String] - if (cl.isInstanceOf[URLClassLoader]) { - val urlLoader = cl.asInstanceOf[URLClassLoader] - for (url <- urlLoader.getURLs) { - if (url.getProtocol == "file") { - paths += url.getFile - } - } - } - val classpath = paths.map(new File(_).getAbsolutePath).mkString(File.pathSeparator) - - val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH) - System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath) - Main.sparkContext = null - Main.sparkSession = null // causes recreation of SparkContext for each test. - Main.conf.set("spark.master", master) - Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out))) - - if (oldExecutorClasspath != null) { - System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath) - } else { - System.clearProperty(CONF_EXECUTOR_CLASSPATH) - } - return out.toString - } - - // Simulate the paste mode in Scala REPL. - def runInterpreterInPasteMode(master: String, input: String): String = - runInterpreter(master, ":paste\n" + input + 4.toChar) // 4 is the ascii code of CTRL + D - - def assertContains(message: String, output: String) { - val isContain = output.contains(message) - assert(isContain, - "Interpreter output did not contain '" + message + "':\n" + output) - } - - def assertDoesNotContain(message: String, output: String) { - val isContain = output.contains(message) - assert(!isContain, - "Interpreter output contained '" + message + "':\n" + output) - } - - test("propagation of local properties") { - // A mock ILoop that doesn't install the SIGINT handler. - class ILoop(out: PrintWriter) extends SparkILoop(None, out) { - settings = new scala.tools.nsc.Settings - settings.usejavacp.value = true - org.apache.spark.repl.Main.interp = this - } - - val out = new StringWriter() - Main.interp = new ILoop(new PrintWriter(out)) - Main.sparkContext = new SparkContext("local", "repl-test") - Main.interp.createInterpreter() - - Main.sparkContext.setLocalProperty("someKey", "someValue") - - // Make sure the value we set in the caller to interpret is propagated in the thread that - // interprets the command. - Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")") - assert(out.toString.contains("someValue")) - - Main.sparkContext.stop() - System.clearProperty("spark.driver.port") - } - - test("SPARK-15236: use Hive catalog") { - // turn on the INFO log so that it is possible the code will dump INFO - // entry for using "HiveMetastore" - val rootLogger = LogManager.getRootLogger() - val logLevel = rootLogger.getLevel - rootLogger.setLevel(Level.INFO) - try { - Main.conf.set(CATALOG_IMPLEMENTATION.key, "hive") - val output = runInterpreter("local", - """ - |spark.sql("drop table if exists t_15236") - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - // only when the config is set to hive and - // hive classes are built, we will use hive catalog. - // Then log INFO entry will show things using HiveMetastore - if (SparkSession.hiveClassesArePresent) { - assertContains("HiveMetaStore", output) - } else { - // If hive classes are not built, in-memory catalog will be used - assertDoesNotContain("HiveMetaStore", output) - } - } finally { - rootLogger.setLevel(logLevel) - } - } - - test("SPARK-15236: use in-memory catalog") { - val rootLogger = LogManager.getRootLogger() - val logLevel = rootLogger.getLevel - rootLogger.setLevel(Level.INFO) - try { - Main.conf.set(CATALOG_IMPLEMENTATION.key, "in-memory") - val output = runInterpreter("local", - """ - |spark.sql("drop table if exists t_16236") - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertDoesNotContain("HiveMetaStore", output) - } finally { - rootLogger.setLevel(logLevel) - } - } - - test("broadcast vars") { - // Test that the value that a broadcast var had when it was created is used, - // even if that variable is then modified in the driver program - // TODO: This doesn't actually work for arrays when we run in local mode! - val output = runInterpreter("local", - """ - |var array = new Array[Int](5) - |val broadcastArray = sc.broadcast(array) - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() - |array(0) = 5 - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output) - assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output) - } - - if (System.getenv("MESOS_NATIVE_JAVA_LIBRARY") != null) { - test("running on Mesos") { - val output = runInterpreter("localquiet", - """ - |var v = 7 - |def getV() = v - |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) - |v = 10 - |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) - |var array = new Array[Int](5) - |val broadcastArray = sc.broadcast(array) - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() - |array(0) = 5 - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res0: Int = 70", output) - assertContains("res1: Int = 100", output) - assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) - assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) - } - } - - test("line wrapper only initialized once when used as encoder outer scope") { - val output = runInterpreter("local", - """ - |val fileName = "repl-test-" + System.currentTimeMillis - |val tmpDir = System.getProperty("java.io.tmpdir") - |val file = new java.io.File(tmpDir, fileName) - |def createFile(): Unit = file.createNewFile() - | - |createFile();case class TestCaseClass(value: Int) - |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect() - | - |file.delete() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - } - - test("define case class and create Dataset together with paste mode") { - val output = runInterpreterInPasteMode("local-cluster[1,1,1024]", - """ - |import spark.implicits._ - |case class TestClass(value: Int) - |Seq(TestClass(1)).toDS() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala deleted file mode 100644 index ec3d790..0000000 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala +++ /dev/null @@ -1,408 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.repl - -import java.io._ -import java.net.URLClassLoader - -import scala.collection.mutable.ArrayBuffer - -import org.apache.commons.lang3.StringEscapeUtils - -import org.apache.spark.SparkFunSuite -import org.apache.spark.util.Utils - -/** - * A special test suite for REPL that all test cases share one REPL instance. - */ -class SingletonReplSuite extends SparkFunSuite { - - private val out = new StringWriter() - private val in = new PipedOutputStream() - private var thread: Thread = _ - - private val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath" - private val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH) - - override def beforeAll(): Unit = { - super.beforeAll() - - val cl = getClass.getClassLoader - var paths = new ArrayBuffer[String] - if (cl.isInstanceOf[URLClassLoader]) { - val urlLoader = cl.asInstanceOf[URLClassLoader] - for (url <- urlLoader.getURLs) { - if (url.getProtocol == "file") { - paths += url.getFile - } - } - } - val classpath = paths.map(new File(_).getAbsolutePath).mkString(File.pathSeparator) - - System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath) - Main.conf.set("spark.master", "local-cluster[2,1,1024]") - val interp = new SparkILoop( - new BufferedReader(new InputStreamReader(new PipedInputStream(in))), - new PrintWriter(out)) - - // Forces to create new SparkContext - Main.sparkContext = null - Main.sparkSession = null - - // Starts a new thread to run the REPL interpreter, so that we won't block. - thread = new Thread(new Runnable { - override def run(): Unit = Main.doMain(Array("-classpath", classpath), interp) - }) - thread.setDaemon(true) - thread.start() - - waitUntil(() => out.toString.contains("Type :help for more information")) - } - - override def afterAll(): Unit = { - in.close() - thread.join() - if (oldExecutorClasspath != null) { - System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath) - } else { - System.clearProperty(CONF_EXECUTOR_CLASSPATH) - } - super.afterAll() - } - - private def waitUntil(cond: () => Boolean): Unit = { - import scala.concurrent.duration._ - import org.scalatest.concurrent.Eventually._ - - eventually(timeout(50.seconds), interval(500.millis)) { - assert(cond(), "current output: " + out.toString) - } - } - - /** - * Run the given commands string in a globally shared interpreter instance. Note that the given - * commands should not crash the interpreter, to not affect other test cases. - */ - def runInterpreter(input: String): String = { - val currentOffset = out.getBuffer.length() - // append a special statement to the end of the given code, so that we can know what's - // the final output of this code snippet and rely on it to wait until the output is ready. - val timestamp = System.currentTimeMillis() - in.write((input + s"\nval _result_$timestamp = 1\n").getBytes) - in.flush() - val stopMessage = s"_result_$timestamp: Int = 1" - waitUntil(() => out.getBuffer.substring(currentOffset).contains(stopMessage)) - out.getBuffer.substring(currentOffset) - } - - def assertContains(message: String, output: String) { - val isContain = output.contains(message) - assert(isContain, - "Interpreter output did not contain '" + message + "':\n" + output) - } - - def assertDoesNotContain(message: String, output: String) { - val isContain = output.contains(message) - assert(!isContain, - "Interpreter output contained '" + message + "':\n" + output) - } - - test("simple foreach with accumulator") { - val output = runInterpreter( - """ - |val accum = sc.longAccumulator - |sc.parallelize(1 to 10).foreach(x => accum.add(x)) - |val res = accum.value - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res: Long = 55", output) - } - - test("external vars") { - val output = runInterpreter( - """ - |var v = 7 - |val res1 = sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_) - |v = 10 - |val res2 = sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_) - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res1: Int = 70", output) - assertContains("res2: Int = 100", output) - } - - test("external classes") { - val output = runInterpreter( - """ - |class C { - |def foo = 5 - |} - |val res = sc.parallelize(1 to 10).map(x => (new C).foo).collect().reduceLeft(_+_) - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res: Int = 50", output) - } - - test("external functions") { - val output = runInterpreter( - """ - |def double(x: Int) = x + x - |val res = sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_) - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res: Int = 110", output) - } - - test("external functions that access vars") { - val output = runInterpreter( - """ - |var v = 7 - |def getV() = v - |val res1 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) - |v = 10 - |val res2 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res1: Int = 70", output) - assertContains("res2: Int = 100", output) - } - - test("broadcast vars") { - // Test that the value that a broadcast var had when it was created is used, - // even if that variable is then modified in the driver program - val output = runInterpreter( - """ - |var array = new Array[Int](5) - |val broadcastArray = sc.broadcast(array) - |val res1 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() - |array(0) = 5 - |val res2 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res1: Array[Int] = Array(0, 0, 0, 0, 0)", output) - assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) - } - - test("interacting with files") { - val tempDir = Utils.createTempDir() - val out = new FileWriter(tempDir + "/input") - out.write("Hello world!\n") - out.write("What's up?\n") - out.write("Goodbye\n") - out.close() - val output = runInterpreter( - """ - |var file = sc.textFile("%s").cache() - |val res1 = file.count() - |val res2 = file.count() - |val res3 = file.count() - """.stripMargin.format(StringEscapeUtils.escapeJava( - tempDir.getAbsolutePath + File.separator + "input"))) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res1: Long = 3", output) - assertContains("res2: Long = 3", output) - assertContains("res3: Long = 3", output) - Utils.deleteRecursively(tempDir) - } - - test("local-cluster mode") { - val output = runInterpreter( - """ - |var v = 7 - |def getV() = v - |val res1 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) - |v = 10 - |val res2 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) - |var array = new Array[Int](5) - |val broadcastArray = sc.broadcast(array) - |val res3 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() - |array(0) = 5 - |val res4 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res1: Int = 70", output) - assertContains("res2: Int = 100", output) - assertContains("res3: Array[Int] = Array(0, 0, 0, 0, 0)", output) - assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) - } - - test("SPARK-1199 two instances of same class don't type check.") { - val output = runInterpreter( - """ - |case class Sum(exp: String, exp2: String) - |val a = Sum("A", "B") - |def b(a: Sum): String = a match { case Sum(_, _) => "Found Sum" } - |b(a) - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - } - - test("SPARK-2452 compound statements.") { - val output = runInterpreter( - """ - |val x = 4 ; def f() = x - |f() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - } - - test("SPARK-2576 importing implicits") { - // We need to use local-cluster to test this case. - val output = runInterpreter( - """ - |import spark.implicits._ - |case class TestCaseClass(value: Int) - |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect() - | - |// Test Dataset Serialization in the REPL - |Seq(TestCaseClass(1)).toDS().collect() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - } - - test("Datasets and encoders") { - val output = runInterpreter( - """ - |import org.apache.spark.sql.functions._ - |import org.apache.spark.sql.{Encoder, Encoders} - |import org.apache.spark.sql.expressions.Aggregator - |import org.apache.spark.sql.TypedColumn - |val simpleSum = new Aggregator[Int, Int, Int] { - | def zero: Int = 0 // The initial value. - | def reduce(b: Int, a: Int) = b + a // Add an element to the running total - | def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values. - | def finish(b: Int) = b // Return the final result. - | def bufferEncoder: Encoder[Int] = Encoders.scalaInt - | def outputEncoder: Encoder[Int] = Encoders.scalaInt - |}.toColumn - | - |val ds = Seq(1, 2, 3, 4).toDS() - |ds.select(simpleSum).collect - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - } - - test("SPARK-2632 importing a method from non serializable class and not using it.") { - val output = runInterpreter( - """ - |class TestClass() { def testMethod = 3 } - |val t = new TestClass - |import t.testMethod - |case class TestCaseClass(value: Int) - |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - } - - test("collecting objects of class defined in repl") { - val output = runInterpreter( - """ - |case class Foo(i: Int) - |val res = sc.parallelize((1 to 100).map(Foo), 10).collect() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res: Array[Foo] = Array(Foo(1),", output) - } - - test("collecting objects of class defined in repl - shuffling") { - val output = runInterpreter( - """ - |case class Foo(i: Int) - |val list = List((1, Foo(1)), (1, Foo(2))) - |val res = sc.parallelize(list).groupByKey().collect() - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res: Array[(Int, Iterable[Foo])] = Array((1,", output) - } - - test("replicating blocks of object with class defined in repl") { - val output = runInterpreter( - """ - |val timeout = 60000 // 60 seconds - |val start = System.currentTimeMillis - |while(sc.getExecutorStorageStatus.size != 3 && - | (System.currentTimeMillis - start) < timeout) { - | Thread.sleep(10) - |} - |if (System.currentTimeMillis - start >= timeout) { - | throw new java.util.concurrent.TimeoutException("Executors were not up in 60 seconds") - |} - |import org.apache.spark.storage.StorageLevel._ - |case class Foo(i: Int) - |val ret = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_AND_DISK_2) - |ret.count() - |val res = sc.getExecutorStorageStatus.map(s => s.rddBlocksById(ret.id).size).sum - """.stripMargin) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res: Int = 20", output) - } - - test("should clone and clean line object in ClosureCleaner") { - val output = runInterpreter( - """ - |import org.apache.spark.rdd.RDD - | - |val lines = sc.textFile("pom.xml") - |case class Data(s: String) - |val dataRDD = lines.map(line => Data(line.take(3))) - |dataRDD.cache.count - |val repartitioned = dataRDD.repartition(dataRDD.partitions.size) - |repartitioned.cache.count - | - |def getCacheSize(rdd: RDD[_]) = { - | sc.getRDDStorageInfo.filter(_.id == rdd.id).map(_.memSize).sum - |} - |val cacheSize1 = getCacheSize(dataRDD) - |val cacheSize2 = getCacheSize(repartitioned) - | - |// The cache size of dataRDD and the repartitioned one should be similar. - |val deviation = math.abs(cacheSize2 - cacheSize1).toDouble / cacheSize1 - |assert(deviation < 0.2, - | s"deviation too large: $deviation, first size: $cacheSize1, second size: $cacheSize2") - """.stripMargin) - assertDoesNotContain("AssertionError", output) - assertDoesNotContain("Exception", output) - } - - test("newProductSeqEncoder with REPL defined class") { - val output = runInterpreter( - """ - |case class Click(id: Int) - |spark.implicits.newProductSeqEncoder[Click] - """.stripMargin) - - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala new file mode 100644 index 0000000..4135940 --- /dev/null +++ b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import java.io.BufferedReader + +// scalastyle:off println +import scala.Predef.{println => _, _} +// scalastyle:on println +import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter.{ILoop, JPrintWriter} +import scala.tools.nsc.util.stringFromStream +import scala.util.Properties.{javaVersion, javaVmName, versionString} + +/** + * A Spark-specific interactive shell. + */ +class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) + extends ILoop(in0, out) { + def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) + def this() = this(None, new JPrintWriter(Console.out, true)) + + def initializeSpark() { + intp.beQuietDuring { + processLine(""" + @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { + org.apache.spark.repl.Main.sparkSession + } else { + org.apache.spark.repl.Main.createSparkSession() + } + @transient val sc = { + val _sc = spark.sparkContext + if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { + val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) + if (proxyUrl != null) { + println( + s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") + } else { + println(s"Spark Context Web UI is available at Spark Master Public URL") + } + } else { + _sc.uiWebUrl.foreach { + webUrl => println(s"Spark context Web UI available at ${webUrl}") + } + } + println("Spark context available as 'sc' " + + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") + println("Spark session available as 'spark'.") + _sc + } + """) + processLine("import org.apache.spark.SparkContext._") + processLine("import spark.implicits._") + processLine("import spark.sql") + processLine("import org.apache.spark.sql.functions._") + } + } + + /** Print a welcome message */ + override def printWelcome() { + import org.apache.spark.SPARK_VERSION + echo("""Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /___/ .__/\_,_/_/ /_/\_\ version %s + /_/ + """.format(SPARK_VERSION)) + val welcomeMsg = "Using Scala %s (%s, Java %s)".format( + versionString, javaVmName, javaVersion) + echo(welcomeMsg) + echo("Type in expressions to have them evaluated.") + echo("Type :help for more information.") + } + + /** Available commands */ + override def commands: List[LoopCommand] = standardCommands + + /** + * We override `createInterpreter` because we need to initialize Spark *before* the REPL + * sees any files, so that the Spark context is visible in those files. This is a bit of a + * hack, but there isn't another hook available to us at this point. + */ + override def createInterpreter(): Unit = { + super.createInterpreter() + initializeSpark() + } + + override def resetCommand(line: String): Unit = { + super.resetCommand(line) + initializeSpark() + echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.") + } +} + +object SparkILoop { + + /** + * Creates an interpreter loop with default settings and feeds + * the given code to it as input. + */ + def run(code: String, sets: Settings = new Settings): String = { + import java.io.{ BufferedReader, StringReader, OutputStreamWriter } + + stringFromStream { ostream => + Console.withOut(ostream) { + val input = new BufferedReader(new StringReader(code)) + val output = new JPrintWriter(new OutputStreamWriter(ostream), true) + val repl = new SparkILoop(input, output) + + if (sets.classpath.isDefault) { + sets.classpath.value = sys.props("java.class.path") + } + repl process sets + } + } + } + def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org