Repository: spark
Updated Branches:
  refs/heads/master ec4d40e48 -> 1faa1135a


Revert "[SPARK-2805] Upgrade to akka 2.3.4"

This reverts commit b9df8af62e8d7b263a668dfb6e9668ab4294ea37.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1faa1135
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1faa1135
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1faa1135

Branch: refs/heads/master
Commit: 1faa1135a3fc0acd89f934f01a4a2edefcb93d33
Parents: ec4d40e
Author: Patrick Wendell <[email protected]>
Authored: Thu Oct 9 14:50:36 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Thu Oct 9 14:50:36 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/Client.scala  |  2 +-
 .../apache/spark/deploy/client/AppClient.scala  |  2 +-
 .../spark/deploy/worker/WorkerWatcher.scala     |  2 +-
 .../apache/spark/MapOutputTrackerSuite.scala    |  4 +-
 pom.xml                                         |  2 +-
 .../spark/streaming/InputStreamsSuite.scala     | 71 ++++++++++++++++++++
 6 files changed, 77 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index f2687ce..065ddda 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, 
conf: SparkConf)
       println(s"Error connecting to master ${driverArgs.master} 
($remoteAddress), exiting.")
       System.exit(-1)
 
-    case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
+    case AssociationErrorEvent(cause, _, remoteAddress, _) =>
       println(s"Error connecting to master ${driverArgs.master} 
($remoteAddress), exiting.")
       println(s"Cause was: $cause")
       System.exit(-1)

http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 98a93d1..3279005 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -154,7 +154,7 @@ private[spark] class AppClient(
         logWarning(s"Connection to $address failed; waiting for master to 
reconnect...")
         markDisconnected()
 
-      case AssociationErrorEvent(cause, _, address, _, _) if 
isPossibleMaster(address) =>
+      case AssociationErrorEvent(cause, _, address, _) if 
isPossibleMaster(address) =>
         logWarning(s"Could not connect to $address: $cause")
 
       case StopAppClient =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 63a8ac8..6d0d0bb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
     case AssociatedEvent(localAddress, remoteAddress, inbound) if 
isWorker(remoteAddress) =>
       logInfo(s"Successfully connected to $workerUrl")
 
-    case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
+    case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
         if isWorker(remoteAddress) =>
       // These logs may not be seen if the worker (and associated pipe) has 
died
       logError(s"Could not initialize connection to worker $workerUrl. 
Exiting.")

http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index cbc0bd1..1fef79a 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -146,7 +146,7 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
     val masterTracker = new MapOutputTrackerMaster(conf)
     val actorSystem = ActorSystem("test")
     val actorRef = TestActorRef[MapOutputTrackerMasterActor](
-      Props(new MapOutputTrackerMasterActor(masterTracker, 
newConf)))(actorSystem)
+      new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
     val masterActor = actorRef.underlyingActor
 
     // Frame size should be ~123B, and no exception should be thrown
@@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
     val masterTracker = new MapOutputTrackerMaster(conf)
     val actorSystem = ActorSystem("test")
     val actorRef = TestActorRef[MapOutputTrackerMasterActor](
-      Props(new MapOutputTrackerMasterActor(masterTracker, 
newConf)))(actorSystem)
+      new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
     val masterActor = actorRef.underlyingActor
 
     // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should 
throw exception.

http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3b6d4ec..7756c89 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,7 +118,7 @@
     <mesos.version>0.18.1</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
     <akka.group>org.spark-project.akka</akka.group>
-    <akka.version>2.3.4-spark</akka.version>
+    <akka.version>2.2.3-shaded-protobuf</akka.version>
     <slf4j.version>1.7.5</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <hadoop.version>1.0.4</hadoop.version>

http://git-wip-us.apache.org/repos/asf/spark/blob/1faa1135/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6107fcd..952a74f 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.streaming
 
 import akka.actor.Actor
+import akka.actor.IO
+import akka.actor.IOManager
 import akka.actor.Props
 import akka.util.ByteString
 
@@ -142,6 +144,59 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
     conf.set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock")
   }
 
+  // TODO: This test works in IntelliJ but not through SBT
+  ignore("actor input stream") {
+    // Start the server
+    val testServer = new TestServer()
+    val port = testServer.port
+    testServer.start()
+
+    // Set up the streaming context and input streams
+    val ssc = new StreamingContext(conf, batchDuration)
+    val networkStream = ssc.actorStream[String](Props(new TestActor(port)), 
"TestActor",
+      // Had to pass the local value of port to prevent from closing over 
entire scope
+      StorageLevel.MEMORY_AND_DISK)
+    val outputBuffer = new ArrayBuffer[Seq[String]] with 
SynchronizedBuffer[Seq[String]]
+    val outputStream = new TestOutputStream(networkStream, outputBuffer)
+    def output = outputBuffer.flatMap(x => x)
+    outputStream.register()
+    ssc.start()
+
+    // Feed data to the server to send to the network receiver
+    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    val input = 1 to 9
+    val expectedOutput = input.map(x => x.toString)
+    Thread.sleep(1000)
+    for (i <- 0 until input.size) {
+      testServer.send(input(i).toString)
+      Thread.sleep(500)
+      clock.addToTime(batchDuration.milliseconds)
+    }
+    Thread.sleep(1000)
+    logInfo("Stopping server")
+    testServer.stop()
+    logInfo("Stopping context")
+    ssc.stop()
+
+    // Verify whether data received was as expected
+    logInfo("--------------------------------")
+    logInfo("output.size = " + outputBuffer.size)
+    logInfo("output")
+    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    logInfo("expected output.size = " + expectedOutput.size)
+    logInfo("expected output")
+    expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+    logInfo("--------------------------------")
+
+    // Verify whether all the elements received are as expected
+    // (whether the elements were received one in each interval is not 
verified)
+    assert(output.size === expectedOutput.size)
+    for (i <- 0 until output.size) {
+      assert(output(i) === expectedOutput(i))
+    }
+  }
+
+
   test("multi-thread receiver") {
     // set up the test receiver
     val numThreads = 10
@@ -323,6 +378,22 @@ class TestServer(portToBind: Int = 0) extends Logging {
   def port = serverSocket.getLocalPort
 }
 
+/** This is an actor for testing actor input stream */
+class TestActor(port: Int) extends Actor with ActorHelper {
+
+  def bytesToString(byteString: ByteString) = byteString.utf8String
+
+  override def preStart(): Unit = {
+    @deprecated("suppress compile time deprecation warning", "1.0.0")
+    val unit = IOManager(context.system).connect(new InetSocketAddress(port))
+  }
+
+  def receive = {
+    case IO.Read(socket, bytes) =>
+      store(bytesToString(bytes))
+  }
+}
+
 /** This is a receiver to test multiple threads inserting data using block 
generator */
 class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
   extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to