Repository: spark
Updated Branches:
  refs/heads/master 995221774 -> 084e4e126


http://git-wip-us.apache.org/repos/asf/spark/blob/084e4e12/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala 
b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala
new file mode 100644
index 0000000..06ca035
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc.netty
+
+import java.net.InetSocketAddress
+
+import io.netty.channel.Channel
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.network.client.{TransportResponseHandler, 
TransportClient}
+import org.apache.spark.rpc._
+
+class NettyRpcHandlerSuite extends SparkFunSuite {
+
+  val env = mock(classOf[NettyRpcEnv])
+  when(env.deserialize(any(classOf[Array[Byte]]))(any())).
+    thenReturn(RequestMessage(RpcAddress("localhost", 12345), null, null, 
false))
+
+  test("receive") {
+    val dispatcher = mock(classOf[Dispatcher])
+    val nettyRpcHandler = new NettyRpcHandler(dispatcher, env)
+
+    val channel = mock(classOf[Channel])
+    val client = new TransportClient(channel, 
mock(classOf[TransportResponseHandler]))
+    when(channel.remoteAddress()).thenReturn(new 
InetSocketAddress("localhost", 40000))
+    nettyRpcHandler.receive(client, null, null)
+
+    when(channel.remoteAddress()).thenReturn(new 
InetSocketAddress("localhost", 40001))
+    nettyRpcHandler.receive(client, null, null)
+
+    verify(dispatcher, 
times(1)).broadcastMessage(Associated(RpcAddress("localhost", 12345)))
+  }
+
+  test("connectionTerminated") {
+    val dispatcher = mock(classOf[Dispatcher])
+    val nettyRpcHandler = new NettyRpcHandler(dispatcher, env)
+
+    val channel = mock(classOf[Channel])
+    val client = new TransportClient(channel, 
mock(classOf[TransportResponseHandler]))
+    when(channel.remoteAddress()).thenReturn(new 
InetSocketAddress("localhost", 40000))
+    nettyRpcHandler.receive(client, null, null)
+
+    when(channel.remoteAddress()).thenReturn(new 
InetSocketAddress("localhost", 40000))
+    nettyRpcHandler.connectionTerminated(client)
+
+    verify(dispatcher, 
times(1)).broadcastMessage(Associated(RpcAddress("localhost", 12345)))
+    verify(dispatcher, 
times(1)).broadcastMessage(Disassociated(RpcAddress("localhost", 12345)))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/084e4e12/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
index df84128..fbb8bb6 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -78,6 +78,10 @@ public class TransportClient implements Closeable {
     this.handler = Preconditions.checkNotNull(handler);
   }
 
+  public Channel getChannel() {
+    return channel;
+  }
+
   public boolean isActive() {
     return channel.isOpen() || channel.isActive();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/084e4e12/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java 
b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
index 2ba92a4..dbb7f95 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
@@ -52,4 +52,6 @@ public abstract class RpcHandler {
    * No further requests will come from this client.
    */
   public void connectionTerminated(TransportClient client) { }
+
+  public void exceptionCaught(Throwable cause, TransportClient client) { }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/084e4e12/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 
b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index df60278..96941d2 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -71,6 +71,7 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
 
   @Override
   public void exceptionCaught(Throwable cause) {
+    rpcHandler.exceptionCaught(cause, reverseClient);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/084e4e12/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 204e614..d053e9e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -474,7 +474,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
       // Remote messages
       case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) =>
         val successful =
-          registerReceiver(streamId, typ, hostPort, receiverEndpoint, 
context.sender.address)
+          registerReceiver(streamId, typ, hostPort, receiverEndpoint, 
context.senderAddress)
         context.reply(successful)
       case AddBlock(receivedBlockInfo) =>
         context.reply(addBlock(receivedBlockInfo))

http://git-wip-us.apache.org/repos/asf/spark/blob/084e4e12/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 93621b4..32d2181 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -556,10 +556,7 @@ private[spark] class ApplicationMaster(
       override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: 
Boolean)
     extends RpcEndpoint with Logging {
 
-    override def onStart(): Unit = {
-      driver.send(RegisterClusterManager(self))
-
-    }
+    driver.send(RegisterClusterManager(self))
 
     override def receive: PartialFunction[Any, Unit] = {
       case x: AddWebUIFilter =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to