Repository: spark Updated Branches: refs/heads/master 314bc6843 -> 107320c9b
http://git-wip-us.apache.org/repos/asf/spark/blob/107320c9/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala new file mode 100644 index 0000000..06ca035 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rpc.netty + +import java.net.InetSocketAddress + +import io.netty.channel.Channel +import org.mockito.Mockito._ +import org.mockito.Matchers._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.network.client.{TransportResponseHandler, TransportClient} +import org.apache.spark.rpc._ + +class NettyRpcHandlerSuite extends SparkFunSuite { + + val env = mock(classOf[NettyRpcEnv]) + when(env.deserialize(any(classOf[Array[Byte]]))(any())). + thenReturn(RequestMessage(RpcAddress("localhost", 12345), null, null, false)) + + test("receive") { + val dispatcher = mock(classOf[Dispatcher]) + val nettyRpcHandler = new NettyRpcHandler(dispatcher, env) + + val channel = mock(classOf[Channel]) + val client = new TransportClient(channel, mock(classOf[TransportResponseHandler])) + when(channel.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 40000)) + nettyRpcHandler.receive(client, null, null) + + when(channel.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 40001)) + nettyRpcHandler.receive(client, null, null) + + verify(dispatcher, times(1)).broadcastMessage(Associated(RpcAddress("localhost", 12345))) + } + + test("connectionTerminated") { + val dispatcher = mock(classOf[Dispatcher]) + val nettyRpcHandler = new NettyRpcHandler(dispatcher, env) + + val channel = mock(classOf[Channel]) + val client = new TransportClient(channel, mock(classOf[TransportResponseHandler])) + when(channel.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 40000)) + nettyRpcHandler.receive(client, null, null) + + when(channel.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 40000)) + nettyRpcHandler.connectionTerminated(client) + + verify(dispatcher, times(1)).broadcastMessage(Associated(RpcAddress("localhost", 12345))) + verify(dispatcher, times(1)).broadcastMessage(Disassociated(RpcAddress("localhost", 12345))) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/107320c9/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java index df84128..fbb8bb6 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -78,6 +78,10 @@ public class TransportClient implements Closeable { this.handler = Preconditions.checkNotNull(handler); } + public Channel getChannel() { + return channel; + } + public boolean isActive() { return channel.isOpen() || channel.isActive(); } http://git-wip-us.apache.org/repos/asf/spark/blob/107320c9/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java index 2ba92a4..dbb7f95 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java @@ -52,4 +52,6 @@ public abstract class RpcHandler { * No further requests will come from this client. */ public void connectionTerminated(TransportClient client) { } + + public void exceptionCaught(Throwable cause, TransportClient client) { } } http://git-wip-us.apache.org/repos/asf/spark/blob/107320c9/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index df60278..96941d2 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -71,6 +71,7 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { @Override public void exceptionCaught(Throwable cause) { + rpcHandler.exceptionCaught(cause, reverseClient); } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/107320c9/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 204e614..d053e9e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -474,7 +474,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Remote messages case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) => val successful = - registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.sender.address) + registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.senderAddress) context.reply(successful) case AddBlock(receivedBlockInfo) => context.reply(addBlock(receivedBlockInfo)) http://git-wip-us.apache.org/repos/asf/spark/blob/107320c9/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 07a0a45..0df3173 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -556,10 +556,7 @@ private[spark] class ApplicationMaster( override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: Boolean) extends RpcEndpoint with Logging { - override def onStart(): Unit = { - driver.send(RegisterClusterManager(self)) - - } + driver.send(RegisterClusterManager(self)) override def receive: PartialFunction[Any, Unit] = { case x: AddWebUIFilter => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org