spark git commit: [SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message

2015-12-12 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 98b212d36 -> 8af2f8c61


[SPARK-12267][CORE] Store the remote RpcEnv address to send the correct 
disconnetion message

Author: Shixiong Zhu 

Closes #10261 from zsxwing/SPARK-12267.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8af2f8c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8af2f8c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8af2f8c6

Branch: refs/heads/master
Commit: 8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8
Parents: 98b212d
Author: Shixiong Zhu 
Authored: Sat Dec 12 21:58:55 2015 -0800
Committer: Shixiong Zhu 
Committed: Sat Dec 12 21:58:55 2015 -0800

--
 .../spark/deploy/master/ApplicationInfo.scala   |  1 +
 .../org/apache/spark/deploy/worker/Worker.scala |  2 +-
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 21 ++
 .../org/apache/spark/rpc/RpcEnvSuite.scala  | 42 
 4 files changed, 65 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8af2f8c6/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index ac553b7..7e2cf95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -66,6 +66,7 @@ private[spark] class ApplicationInfo(
 nextExecutorId = 0
 removedExecutors = new ArrayBuffer[ExecutorDesc]
 executorLimit = Integer.MAX_VALUE
+appUIUrlAtHistoryServer = None
   }
 
   private def newExecutorId(useID: Option[Int] = None): Int = {

http://git-wip-us.apache.org/repos/asf/spark/blob/8af2f8c6/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1afc1ff..f41efb0 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging {
 val conf = new SparkConf
 val args = new WorkerArguments(argStrings, conf)
 val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, 
args.cores,
-  args.memory, args.masters, args.workDir)
+  args.memory, args.masters, args.workDir, conf = conf)
 rpcEnv.awaitTermination()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8af2f8c6/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 68c5f44..f82fd4e 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -553,6 +553,9 @@ private[netty] class NettyRpcHandler(
   // A variable to track whether we should dispatch the RemoteProcessConnected 
message.
   private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
 
+  // A variable to track the remote RpcEnv addresses of all clients
+  private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
+
   override def receive(
   client: TransportClient,
   message: ByteBuffer,
@@ -580,6 +583,12 @@ private[netty] class NettyRpcHandler(
   // Create a new message with the socket address of the client as the 
sender.
   RequestMessage(clientAddr, requestMessage.receiver, 
requestMessage.content)
 } else {
+  // The remote RpcEnv listens to some port, we should also fire a 
RemoteProcessConnected for
+  // the listening address
+  val remoteEnvAddress = requestMessage.senderAddress
+  if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) {
+dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress))
+  }
   requestMessage
 }
   }
@@ -591,6 +600,12 @@ private[netty] class NettyRpcHandler(
 if (addr != null) {
   val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
   dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
+  // If the remove RpcEnv listens to some address, we should also fire a
+  // RemoteProcessConnectionError for the remote RpcEnv listening address
+  val remoteEnvAddress 

spark git commit: [SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message

2015-12-12 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e05364baa -> d7e3bfd7d


[SPARK-12267][CORE] Store the remote RpcEnv address to send the correct 
disconnetion message

Author: Shixiong Zhu 

Closes #10261 from zsxwing/SPARK-12267.

(cherry picked from commit 8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7e3bfd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7e3bfd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7e3bfd7

Branch: refs/heads/branch-1.6
Commit: d7e3bfd7d33b8fba44ef80932c0d40fb68075cb4
Parents: e05364b
Author: Shixiong Zhu 
Authored: Sat Dec 12 21:58:55 2015 -0800
Committer: Shixiong Zhu 
Committed: Sat Dec 12 21:59:03 2015 -0800

--
 .../spark/deploy/master/ApplicationInfo.scala   |  1 +
 .../org/apache/spark/deploy/worker/Worker.scala |  2 +-
 .../apache/spark/rpc/netty/NettyRpcEnv.scala| 21 ++
 .../org/apache/spark/rpc/RpcEnvSuite.scala  | 42 
 4 files changed, 65 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d7e3bfd7/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index ac553b7..7e2cf95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -66,6 +66,7 @@ private[spark] class ApplicationInfo(
 nextExecutorId = 0
 removedExecutors = new ArrayBuffer[ExecutorDesc]
 executorLimit = Integer.MAX_VALUE
+appUIUrlAtHistoryServer = None
   }
 
   private def newExecutorId(useID: Option[Int] = None): Int = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d7e3bfd7/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1afc1ff..f41efb0 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging {
 val conf = new SparkConf
 val args = new WorkerArguments(argStrings, conf)
 val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, 
args.cores,
-  args.memory, args.masters, args.workDir)
+  args.memory, args.masters, args.workDir, conf = conf)
 rpcEnv.awaitTermination()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d7e3bfd7/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index ed1f082..9d353bb 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -560,6 +560,9 @@ private[netty] class NettyRpcHandler(
   // A variable to track whether we should dispatch the RemoteProcessConnected 
message.
   private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
 
+  // A variable to track the remote RpcEnv addresses of all clients
+  private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
+
   override def receive(
   client: TransportClient,
   message: ByteBuffer,
@@ -587,6 +590,12 @@ private[netty] class NettyRpcHandler(
   // Create a new message with the socket address of the client as the 
sender.
   RequestMessage(clientAddr, requestMessage.receiver, 
requestMessage.content)
 } else {
+  // The remote RpcEnv listens to some port, we should also fire a 
RemoteProcessConnected for
+  // the listening address
+  val remoteEnvAddress = requestMessage.senderAddress
+  if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) {
+dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress))
+  }
   requestMessage
 }
   }
@@ -598,6 +607,12 @@ private[netty] class NettyRpcHandler(
 if (addr != null) {
   val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
   dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
+  // If the remove RpcEnv listens to some