spark git commit: [SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message
Repository: spark Updated Branches: refs/heads/master 98b212d36 -> 8af2f8c61 [SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message Author: Shixiong ZhuCloses #10261 from zsxwing/SPARK-12267. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8af2f8c6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8af2f8c6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8af2f8c6 Branch: refs/heads/master Commit: 8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8 Parents: 98b212d Author: Shixiong Zhu Authored: Sat Dec 12 21:58:55 2015 -0800 Committer: Shixiong Zhu Committed: Sat Dec 12 21:58:55 2015 -0800 -- .../spark/deploy/master/ApplicationInfo.scala | 1 + .../org/apache/spark/deploy/worker/Worker.scala | 2 +- .../apache/spark/rpc/netty/NettyRpcEnv.scala| 21 ++ .../org/apache/spark/rpc/RpcEnvSuite.scala | 42 4 files changed, 65 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8af2f8c6/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index ac553b7..7e2cf95 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -66,6 +66,7 @@ private[spark] class ApplicationInfo( nextExecutorId = 0 removedExecutors = new ArrayBuffer[ExecutorDesc] executorLimit = Integer.MAX_VALUE +appUIUrlAtHistoryServer = None } private def newExecutorId(useID: Option[Int] = None): Int = { http://git-wip-us.apache.org/repos/asf/spark/blob/8af2f8c6/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 1afc1ff..f41efb0 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging { val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, - args.memory, args.masters, args.workDir) + args.memory, args.masters, args.workDir, conf = conf) rpcEnv.awaitTermination() } http://git-wip-us.apache.org/repos/asf/spark/blob/8af2f8c6/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 68c5f44..f82fd4e 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -553,6 +553,9 @@ private[netty] class NettyRpcHandler( // A variable to track whether we should dispatch the RemoteProcessConnected message. private val clients = new ConcurrentHashMap[TransportClient, JBoolean]() + // A variable to track the remote RpcEnv addresses of all clients + private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]() + override def receive( client: TransportClient, message: ByteBuffer, @@ -580,6 +583,12 @@ private[netty] class NettyRpcHandler( // Create a new message with the socket address of the client as the sender. RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content) } else { + // The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for + // the listening address + val remoteEnvAddress = requestMessage.senderAddress + if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) { +dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress)) + } requestMessage } } @@ -591,6 +600,12 @@ private[netty] class NettyRpcHandler( if (addr != null) { val clientAddr = RpcAddress(addr.getHostName, addr.getPort) dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr)) + // If the remove RpcEnv listens to some address, we should also fire a + // RemoteProcessConnectionError for the remote RpcEnv listening address + val remoteEnvAddress
spark git commit: [SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message
Repository: spark Updated Branches: refs/heads/branch-1.6 e05364baa -> d7e3bfd7d [SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message Author: Shixiong ZhuCloses #10261 from zsxwing/SPARK-12267. (cherry picked from commit 8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7e3bfd7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7e3bfd7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7e3bfd7 Branch: refs/heads/branch-1.6 Commit: d7e3bfd7d33b8fba44ef80932c0d40fb68075cb4 Parents: e05364b Author: Shixiong Zhu Authored: Sat Dec 12 21:58:55 2015 -0800 Committer: Shixiong Zhu Committed: Sat Dec 12 21:59:03 2015 -0800 -- .../spark/deploy/master/ApplicationInfo.scala | 1 + .../org/apache/spark/deploy/worker/Worker.scala | 2 +- .../apache/spark/rpc/netty/NettyRpcEnv.scala| 21 ++ .../org/apache/spark/rpc/RpcEnvSuite.scala | 42 4 files changed, 65 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d7e3bfd7/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index ac553b7..7e2cf95 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -66,6 +66,7 @@ private[spark] class ApplicationInfo( nextExecutorId = 0 removedExecutors = new ArrayBuffer[ExecutorDesc] executorLimit = Integer.MAX_VALUE +appUIUrlAtHistoryServer = None } private def newExecutorId(useID: Option[Int] = None): Int = { http://git-wip-us.apache.org/repos/asf/spark/blob/d7e3bfd7/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 1afc1ff..f41efb0 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging { val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, - args.memory, args.masters, args.workDir) + args.memory, args.masters, args.workDir, conf = conf) rpcEnv.awaitTermination() } http://git-wip-us.apache.org/repos/asf/spark/blob/d7e3bfd7/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index ed1f082..9d353bb 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -560,6 +560,9 @@ private[netty] class NettyRpcHandler( // A variable to track whether we should dispatch the RemoteProcessConnected message. private val clients = new ConcurrentHashMap[TransportClient, JBoolean]() + // A variable to track the remote RpcEnv addresses of all clients + private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]() + override def receive( client: TransportClient, message: ByteBuffer, @@ -587,6 +590,12 @@ private[netty] class NettyRpcHandler( // Create a new message with the socket address of the client as the sender. RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content) } else { + // The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for + // the listening address + val remoteEnvAddress = requestMessage.senderAddress + if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) { +dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress)) + } requestMessage } } @@ -598,6 +607,12 @@ private[netty] class NettyRpcHandler( if (addr != null) { val clientAddr = RpcAddress(addr.getHostName, addr.getPort) dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr)) + // If the remove RpcEnv listens to some