Repository: spark Updated Branches: refs/heads/master ec4d40e48 -> 1faa1135a
Revert "[SPARK-2805] Upgrade to akka 2.3.4" This reverts commit b9df8af62e8d7b263a668dfb6e9668ab4294ea37. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1faa1135 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1faa1135 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1faa1135 Branch: refs/heads/master Commit: 1faa1135a3fc0acd89f934f01a4a2edefcb93d33 Parents: ec4d40e Author: Patrick Wendell <[email protected]> Authored: Thu Oct 9 14:50:36 2014 -0700 Committer: Patrick Wendell <[email protected]> Committed: Thu Oct 9 14:50:36 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/deploy/Client.scala | 2 +- .../apache/spark/deploy/client/AppClient.scala | 2 +- .../spark/deploy/worker/WorkerWatcher.scala | 2 +- .../apache/spark/MapOutputTrackerSuite.scala | 4 +- pom.xml | 2 +- .../spark/streaming/InputStreamsSuite.scala | 71 ++++++++++++++++++++ 6 files changed, 77 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/core/src/main/scala/org/apache/spark/deploy/Client.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index f2687ce..065ddda 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") System.exit(-1) - case AssociationErrorEvent(cause, _, remoteAddress, _, _) => + case AssociationErrorEvent(cause, _, remoteAddress, _) => println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") println(s"Cause was: $cause") System.exit(-1) http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 98a93d1..3279005 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -154,7 +154,7 @@ private[spark] class AppClient( logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() - case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) => + case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => logWarning(s"Could not connect to $address: $cause") case StopAppClient => http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala index 63a8ac8..6d0d0bb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala @@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String) case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) => logInfo(s"Successfully connected to $workerUrl") - case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _) + case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound) if isWorker(remoteAddress) => // These logs may not be seen if the worker (and associated pipe) has died logError(s"Could not initialize connection to worker $workerUrl. Exiting.") http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index cbc0bd1..1fef79a 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -146,7 +146,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTrackerMaster(conf) val actorSystem = ActorSystem("test") val actorRef = TestActorRef[MapOutputTrackerMasterActor]( - Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem) + new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) val masterActor = actorRef.underlyingActor // Frame size should be ~123B, and no exception should be thrown @@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTrackerMaster(conf) val actorSystem = ActorSystem("test") val actorRef = TestActorRef[MapOutputTrackerMasterActor]( - Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem) + new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) val masterActor = actorRef.underlyingActor // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception. http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3b6d4ec..7756c89 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ <mesos.version>0.18.1</mesos.version> <mesos.classifier>shaded-protobuf</mesos.classifier> <akka.group>org.spark-project.akka</akka.group> - <akka.version>2.3.4-spark</akka.version> + <akka.version>2.2.3-shaded-protobuf</akka.version> <slf4j.version>1.7.5</slf4j.version> <log4j.version>1.2.17</log4j.version> <hadoop.version>1.0.4</hadoop.version> http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 6107fcd..952a74f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.streaming import akka.actor.Actor +import akka.actor.IO +import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString @@ -142,6 +144,59 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } + // TODO: This test works in IntelliJ but not through SBT + ignore("actor input stream") { + // Start the server + val testServer = new TestServer() + val port = testServer.port + testServer.start() + + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor", + // Had to pass the local value of port to prevent from closing over entire scope + StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(networkStream, outputBuffer) + def output = outputBuffer.flatMap(x => x) + outputStream.register() + ssc.start() + + // Feed data to the server to send to the network receiver + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = 1 to 9 + val expectedOutput = input.map(x => x.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + testServer.send(input(i).toString) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(1000) + logInfo("Stopping server") + testServer.stop() + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i) === expectedOutput(i)) + } + } + + test("multi-thread receiver") { // set up the test receiver val numThreads = 10 @@ -323,6 +378,22 @@ class TestServer(portToBind: Int = 0) extends Logging { def port = serverSocket.getLocalPort } +/** This is an actor for testing actor input stream */ +class TestActor(port: Int) extends Actor with ActorHelper { + + def bytesToString(byteString: ByteString) = byteString.utf8String + + override def preStart(): Unit = { + @deprecated("suppress compile time deprecation warning", "1.0.0") + val unit = IOManager(context.system).connect(new InetSocketAddress(port)) + } + + def receive = { + case IO.Read(socket, bytes) => + store(bytesToString(bytes)) + } +} + /** This is a receiver to test multiple threads inserting data using block generator */ class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
