[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33644095
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -466,23 +531,13 @@ private[worker] class Worker(
 case _ =>
   logDebug(s"Driver $driverId changed state to $state")
   }
-  master ! DriverStateChanged(driverId, state, exception)
+  sendToMaster(driverStageChanged)
--- End diff --

Ah, it's just for avoiding to create a new instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/5392


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117379182
  
Alright I'm going to merge this. @zsxwing please submit a separate PR to 
address the TODO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117354387
  
Just one comment thanks to your proactive TODOifying :) LGTM, feel free to 
merge after.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33627528
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
---
@@ -504,6 +518,7 @@ private[master] class Master(
   }
 
   private def completeRecovery() {
+// TODO Why synchronized
--- End diff --

This was due to an earlier state in the code when this method could be 
invoked from a different thread. It can be safely removed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117290439
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117290218
  
  [Test build #36148 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36148/console)
 for   PR 5392 at commit 
[`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class SubmitDriverResponse(`
  * `  case class KillDriverResponse(`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33596669
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -466,23 +531,13 @@ private[worker] class Worker(
 case _ =>
   logDebug(s"Driver $driverId changed state to $state")
   }
-  master ! DriverStateChanged(driverId, state, exception)
+  sendToMaster(driverStageChanged)
--- End diff --

Rather, the change about the `driverStageChanged @` instead of recreating 
`DriverStateChanged(driverId, state, exception)`. It's not particularly 
important, but I was just curious if there was a reason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117240100
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117240149
  
  [Test build #36148 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36148/consoleFull)
 for   PR 5392 at commit 
[`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117240070
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117239937
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117239510
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117239481
  
  [Test build #36147 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36147/console)
 for   PR 5392 at commit 
[`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892).
 * This patch **fails MiMa tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class SubmitDriverResponse(`
  * `  case class KillDriverResponse(`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117237079
  
  [Test build #36147 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36147/consoleFull)
 for   PR 5392 at commit 
[`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117236094
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117236062
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117234563
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33582938
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -466,23 +531,13 @@ private[worker] class Worker(
 case _ =>
   logDebug(s"Driver $driverId changed state to $state")
   }
-  master ! DriverStateChanged(driverId, state, exception)
+  sendToMaster(driverStageChanged)
--- End diff --

Could you clarify this point? This is same as the previous codes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33581959
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, 
RpcUtils, Utils, AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
-
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
-
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
--- End diff --

It may be updated in a separate thread pool or the message loop of 
ClientEndpoint, so it's volatile.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117128529
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117128498
  
  [Test build #36121 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36121/console)
 for   PR 5392 at commit 
[`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class SubmitDriverResponse(`
  * `  case class KillDriverResponse(`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117070709
  
  [Test build #36121 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36121/consoleFull)
 for   PR 5392 at commit 
[`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117069066
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-117068957
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-116860938
  
The core logic all looks good to me, just had some nits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33519394
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -510,13 +580,25 @@ private[worker] class Worker(
 }
   }
 
+  private def sendToMaster(message: Any): Unit = {
--- End diff --

also doc this guy about its behavior


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33519340
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -466,23 +531,13 @@ private[worker] class Worker(
 case _ =>
   logDebug(s"Driver $driverId changed state to $state")
   }
-  master ! DriverStateChanged(driverId, state, exception)
+  sendToMaster(driverStageChanged)
--- End diff --

This change was not strictly necessary, right? Maybe it used to have a self 
reference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33519250
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -302,27 +367,27 @@ private[worker] class Worker(
   // the directory is used by an application - check that the 
application is not running
   // when cleaning up
   val appIdFromDir = dir.getName
-  val isAppStillRunning = 
executors.values.map(_.appId).contains(appIdFromDir)
+  val isAppStillRunning = appIds.contains(appIdFromDir)
   dir.isDirectory && !isAppStillRunning &&
-  !Utils.doesDirectoryContainAnyNewFiles(dir, 
APP_DATA_RETENTION_SECS)
+  !Utils.doesDirectoryContainAnyNewFiles(dir, 
APP_DATA_RETENTION_SECONDS)
 }.foreach { dir =>
   logInfo(s"Removing directory: ${dir.getPath}")
   Utils.deleteRecursively(dir)
 }
-  }
+  } (cleanupThreadExecutor)
--- End diff --

nit: `}(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33519252
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -302,27 +367,27 @@ private[worker] class Worker(
   // the directory is used by an application - check that the 
application is not running
   // when cleaning up
   val appIdFromDir = dir.getName
-  val isAppStillRunning = 
executors.values.map(_.appId).contains(appIdFromDir)
+  val isAppStillRunning = appIds.contains(appIdFromDir)
   dir.isDirectory && !isAppStillRunning &&
-  !Utils.doesDirectoryContainAnyNewFiles(dir, 
APP_DATA_RETENTION_SECS)
+  !Utils.doesDirectoryContainAnyNewFiles(dir, 
APP_DATA_RETENTION_SECONDS)
 }.foreach { dir =>
   logInfo(s"Removing directory: ${dir.getPath}")
   Utils.deleteRecursively(dir)
 }
-  }
+  } (cleanupThreadExecutor)
 
-  cleanupFuture onFailure {
+  cleanupFuture.onFailure {
 case e: Throwable =>
   logError("App dir cleanup failed: " + e.getMessage, e)
-  }
+  } (cleanupThreadExecutor)
--- End diff --

nit: `}(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33519086
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -258,41 +299,65 @@ private[worker] class Worker(
 }
   }
 
+  /**
+   * Cancel last registeration retry, or do nothing if no retry
+   */
+  private def cancelLastRegistrationRetry(): Unit = {
+if (registerMasterFutures != null) {
+  registerMasterFutures.foreach(_.cancel(true))
+  registerMasterFutures = null
+}
+registrationRetryTimer.foreach(_.cancel(true))
+registrationRetryTimer = None
+  }
+
   private def registerWithMaster() {
-// DisassociatedEvent may be triggered multiple times, so don't 
attempt registration
+// onDisconnected may be triggered multiple times, so don't attempt 
registration
 // if there are outstanding registration attempts scheduled.
 registrationRetryTimer match {
   case None =>
 registered = false
-tryRegisterAllMasters()
+registerMasterFutures = tryRegisterAllMasters()
 connectionAttemptCount = 0
-registrationRetryTimer = Some {
-  
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
-INITIAL_REGISTRATION_RETRY_INTERVAL, self, 
ReregisterWithMaster)
-}
+registrationRetryTimer = 
Some(forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
--- End diff --

The indentation of this block is a little sketchy, maybe we can put the 
`new Runnable` on the next line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33518507
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -235,21 +250,47 @@ private[worker] class Worker(
  * still not safe if the old master recovers within this interval, 
but this is a much
  * less likely scenario.
  */
-if (master != null) {
-  master ! RegisterWorker(
-workerId, host, port, cores, memory, webUi.boundPort, 
publicAddress)
-} else {
-  // We are retrying the initial registration
-  tryRegisterAllMasters()
+master match {
+  case Some(masterRef) =>
+// registered == false && master != None means we lost the 
connection to master, so
+// masterRef cannot be used and we need to recreate it again. 
Note: we must not set
+// master to None due to the above comments.
+if (registerMasterFutures != null) {
+  registerMasterFutures.foreach(_.cancel(true))
+}
+val masterAddress = masterRef.address
+registerMasterFutures = 
Array(registerMasterThreadPool.submit(new Runnable {
+  override def run(): Unit =
+try {
+  logInfo("Connecting to master " + masterAddress + "...")
+  val masterEndpoint =
+rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, 
masterAddress, Master.ENDPOINT_NAME)
+  masterEndpoint.send(RegisterWorker(
+workerId, host, port, self, cores, memory, 
webUi.boundPort, publicAddress))
+} catch {
+  case ie: InterruptedException => // Cancelled
+  case NonFatal(e) => logError(e.getMessage, e)
--- End diff --

message should probably be like "Failed to connect to master $masterAddress"
(can we merge this code path with the above?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33518468
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -235,21 +250,47 @@ private[worker] class Worker(
  * still not safe if the old master recovers within this interval, 
but this is a much
  * less likely scenario.
  */
-if (master != null) {
-  master ! RegisterWorker(
-workerId, host, port, cores, memory, webUi.boundPort, 
publicAddress)
-} else {
-  // We are retrying the initial registration
-  tryRegisterAllMasters()
+master match {
+  case Some(masterRef) =>
+// registered == false && master != None means we lost the 
connection to master, so
+// masterRef cannot be used and we need to recreate it again. 
Note: we must not set
+// master to None due to the above comments.
+if (registerMasterFutures != null) {
+  registerMasterFutures.foreach(_.cancel(true))
+}
+val masterAddress = masterRef.address
+registerMasterFutures = 
Array(registerMasterThreadPool.submit(new Runnable {
+  override def run(): Unit =
--- End diff --

nit: braces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33518043
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -181,24 +190,31 @@ private[worker] class Worker(
 metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
-  private def changeMaster(url: String, uiUrl: String) {
+  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
 // activeMasterUrl it's a valid Spark url since we receive it from 
master.
-activeMasterUrl = url
+activeMasterUrl = masterRef.address.toSparkURL
 activeMasterWebUiUrl = uiUrl
-master = context.actorSelection(
-  Master.toAkkaUrl(activeMasterUrl, 
AkkaUtils.protocol(context.system)))
-masterAddress = Master.toAkkaAddress(activeMasterUrl, 
AkkaUtils.protocol(context.system))
+master = Some(masterRef)
 connected = true
 // Cancel any outstanding re-registration attempts because we found a 
new master
-registrationRetryTimer.foreach(_.cancel())
-registrationRetryTimer = None
+cancelLastRegistrationRetry()
   }
 
-  private def tryRegisterAllMasters() {
-for (masterAkkaUrl <- masterAkkaUrls) {
-  logInfo("Connecting to master " + masterAkkaUrl + "...")
-  val actor = context.actorSelection(masterAkkaUrl)
-  actor ! RegisterWorker(workerId, host, port, cores, memory, 
webUi.boundPort, publicAddress)
+  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+for (masterAddress <- masterRpcAddresses) yield {
+  registerMasterThreadPool.submit(new Runnable {
+override def run(): Unit =
+  try {
+logInfo("Connecting to master " + masterAddress + "...")
+val masterEndpoint =
+  rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, 
Master.ENDPOINT_NAME)
+masterEndpoint.send(RegisterWorker(
+  workerId, host, port, self, cores, memory, webUi.boundPort, 
publicAddress))
+  } catch {
+case ie: InterruptedException => // Cancelled
+case NonFatal(e) => logError(e.getMessage, e)
--- End diff --

message should probably be like "Failed to connect to master $masterAddress"



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33517979
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -181,24 +190,31 @@ private[worker] class Worker(
 metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
-  private def changeMaster(url: String, uiUrl: String) {
+  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
 // activeMasterUrl it's a valid Spark url since we receive it from 
master.
-activeMasterUrl = url
+activeMasterUrl = masterRef.address.toSparkURL
 activeMasterWebUiUrl = uiUrl
-master = context.actorSelection(
-  Master.toAkkaUrl(activeMasterUrl, 
AkkaUtils.protocol(context.system)))
-masterAddress = Master.toAkkaAddress(activeMasterUrl, 
AkkaUtils.protocol(context.system))
+master = Some(masterRef)
 connected = true
 // Cancel any outstanding re-registration attempts because we found a 
new master
-registrationRetryTimer.foreach(_.cancel())
-registrationRetryTimer = None
+cancelLastRegistrationRetry()
   }
 
-  private def tryRegisterAllMasters() {
-for (masterAkkaUrl <- masterAkkaUrls) {
-  logInfo("Connecting to master " + masterAkkaUrl + "...")
-  val actor = context.actorSelection(masterAkkaUrl)
-  actor ! RegisterWorker(workerId, host, port, cores, memory, 
webUi.boundPort, publicAddress)
+  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+for (masterAddress <- masterRpcAddresses) yield {
--- End diff --

Maybe just `masterRpcAddresses.map { masterAddress =>` instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33517925
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -181,24 +190,31 @@ private[worker] class Worker(
 metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
-  private def changeMaster(url: String, uiUrl: String) {
+  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
 // activeMasterUrl it's a valid Spark url since we receive it from 
master.
-activeMasterUrl = url
+activeMasterUrl = masterRef.address.toSparkURL
 activeMasterWebUiUrl = uiUrl
-master = context.actorSelection(
-  Master.toAkkaUrl(activeMasterUrl, 
AkkaUtils.protocol(context.system)))
-masterAddress = Master.toAkkaAddress(activeMasterUrl, 
AkkaUtils.protocol(context.system))
+master = Some(masterRef)
 connected = true
 // Cancel any outstanding re-registration attempts because we found a 
new master
-registrationRetryTimer.foreach(_.cancel())
-registrationRetryTimer = None
+cancelLastRegistrationRetry()
   }
 
-  private def tryRegisterAllMasters() {
-for (masterAkkaUrl <- masterAkkaUrls) {
-  logInfo("Connecting to master " + masterAkkaUrl + "...")
-  val actor = context.actorSelection(masterAkkaUrl)
-  actor ! RegisterWorker(workerId, host, port, cores, memory, 
webUi.boundPort, publicAddress)
+  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+for (masterAddress <- masterRpcAddresses) yield {
+  registerMasterThreadPool.submit(new Runnable {
+override def run(): Unit =
--- End diff --

nit: add braces for this method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33515502
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, 
RpcUtils, Utils, AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
-
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
-
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
+@volatile private var registerMasterFutures: Array[JFuture[_]] = null
+@volatile private var registrationRetryTimer: JScheduledFuture[_] = 
null
+
+// A thread pool for registering with masters. Because registering 
with a master is a blocking
+// action, this thread pool must be able to create 
"masterRpcAddresses.size" threads at the same
+// time so that we can register with all masters.
+private val registerMasterThreadPool = new ThreadPoolExecutor(
+  0,
+  masterRpcAddresses.size, // Make sure we can register with all 
masters at the same time
+  60L, TimeUnit.SECONDS,
+  new SynchronousQueue[Runnable](),
+  
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+
+// A scheduled executor for scheduling the registration actions
+private val registrationRetryThread =
+  
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
+
+override def onStart(): Unit = {
   try {
-registerWithMaster()
+registerWithMaster(1)
   } catch {
 case e: Exception =>
   logWarning("Failed to connect to master", e)
   markDisconnected()
-  context.stop(self)
+  stop()
   }
 }
 
-def tryRegisterAllMasters() {
-  for (masterAkkaUrl <- masterAkkaUrls) {
-logInfo("Connecting to master " + masterAkkaUrl + "...")
-val actor = context.actorSelection(masterAkkaUrl)
-actor ! RegisterApplication(appDescription)
+/**
+ *  Register with all masters asynchronously and returns an array 
`Future`s for cancellation.
+ */
+private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+  for (masterAddress <- masterRpcAddresses) yield {
+registerMasterThreadPool.submit(new Runnable {
+  override def run(): Unit = try {
+if (registered) {
+  return
+}
+logInfo("Connecting to master " + masterAddress.toSparkURL + 
"...")
+val masterRef =
+  rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, 
Master.ENDPOINT_NAME)
+masterRef.send(RegisterApplication(appDescription, self))
+  } catch {
+case ie: InterruptedException => // Cancelled
+case NonFatal(e) => logError(e.getMessage, e)
+  }
+})
   }
 }
 
-def registerWithMaster() {
-  tryRegisterAllMasters()
-  import context.dispatche

[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33515404
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, 
RpcUtils, Utils, AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
-
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
-
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
+@volatile private var registerMasterFutures: Array[JFuture[_]] = null
+@volatile private var registrationRetryTimer: JScheduledFuture[_] = 
null
+
+// A thread pool for registering with masters. Because registering 
with a master is a blocking
+// action, this thread pool must be able to create 
"masterRpcAddresses.size" threads at the same
+// time so that we can register with all masters.
+private val registerMasterThreadPool = new ThreadPoolExecutor(
+  0,
+  masterRpcAddresses.size, // Make sure we can register with all 
masters at the same time
+  60L, TimeUnit.SECONDS,
+  new SynchronousQueue[Runnable](),
+  
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+
+// A scheduled executor for scheduling the registration actions
+private val registrationRetryThread =
+  
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
+
+override def onStart(): Unit = {
   try {
-registerWithMaster()
+registerWithMaster(1)
   } catch {
 case e: Exception =>
   logWarning("Failed to connect to master", e)
   markDisconnected()
-  context.stop(self)
+  stop()
   }
 }
 
-def tryRegisterAllMasters() {
-  for (masterAkkaUrl <- masterAkkaUrls) {
-logInfo("Connecting to master " + masterAkkaUrl + "...")
-val actor = context.actorSelection(masterAkkaUrl)
-actor ! RegisterApplication(appDescription)
+/**
+ *  Register with all masters asynchronously and returns an array 
`Future`s for cancellation.
+ */
+private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+  for (masterAddress <- masterRpcAddresses) yield {
+registerMasterThreadPool.submit(new Runnable {
+  override def run(): Unit = try {
+if (registered) {
+  return
+}
+logInfo("Connecting to master " + masterAddress.toSparkURL + 
"...")
+val masterRef =
+  rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, 
Master.ENDPOINT_NAME)
+masterRef.send(RegisterApplication(appDescription, self))
+  } catch {
+case ie: InterruptedException => // Cancelled
+case NonFatal(e) => logError(e.getMessage, e)
+  }
+})
   }
 }
 
-def registerWithMaster() {
-  tryRegisterAllMasters()
-  import context.dispatche

[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33514995
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, 
RpcUtils, Utils, AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
-
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
-
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
+@volatile private var registerMasterFutures: Array[JFuture[_]] = null
+@volatile private var registrationRetryTimer: JScheduledFuture[_] = 
null
+
+// A thread pool for registering with masters. Because registering 
with a master is a blocking
+// action, this thread pool must be able to create 
"masterRpcAddresses.size" threads at the same
+// time so that we can register with all masters.
+private val registerMasterThreadPool = new ThreadPoolExecutor(
+  0,
+  masterRpcAddresses.size, // Make sure we can register with all 
masters at the same time
+  60L, TimeUnit.SECONDS,
+  new SynchronousQueue[Runnable](),
+  
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+
+// A scheduled executor for scheduling the registration actions
+private val registrationRetryThread =
+  
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
+
+override def onStart(): Unit = {
   try {
-registerWithMaster()
+registerWithMaster(1)
   } catch {
 case e: Exception =>
   logWarning("Failed to connect to master", e)
   markDisconnected()
-  context.stop(self)
+  stop()
   }
 }
 
-def tryRegisterAllMasters() {
-  for (masterAkkaUrl <- masterAkkaUrls) {
-logInfo("Connecting to master " + masterAkkaUrl + "...")
-val actor = context.actorSelection(masterAkkaUrl)
-actor ! RegisterApplication(appDescription)
+/**
+ *  Register with all masters asynchronously and returns an array 
`Future`s for cancellation.
+ */
+private def tryRegisterAllMasters(): Array[JFuture[_]] = {
+  for (masterAddress <- masterRpcAddresses) yield {
+registerMasterThreadPool.submit(new Runnable {
+  override def run(): Unit = try {
+if (registered) {
+  return
+}
+logInfo("Connecting to master " + masterAddress.toSparkURL + 
"...")
+val masterRef =
+  rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, 
Master.ENDPOINT_NAME)
+masterRef.send(RegisterApplication(appDescription, self))
+  } catch {
+case ie: InterruptedException => // Cancelled
+case NonFatal(e) => logError(e.getMessage, e)
--- End diff --

message should probably be like "Failed to connect to $masterAddress"


---
If your project is set up for it, you can reply to this email an

[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33514893
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, 
RpcUtils, Utils, AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
-
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
-
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
--- End diff --

It's not generally true that ThreadSafeRpcEndpoints require their mutable 
state to be volatile, right? Perhaps this is just being modified from a 
separate thread pool?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33513905
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/Client.scala ---
@@ -82,29 +92,38 @@ private class ClientActor(driverArgs: ClientArguments, 
conf: SparkConf)
   driverArgs.cores,
   driverArgs.supervise,
   command)
-
-// This assumes only one Master is active at a time
-for (masterActor <- masterActors) {
-  masterActor ! RequestSubmitDriver(driverDescription)
-}
+ayncSendToMasterAndForwardReply[SubmitDriverResponse](
+  RequestSubmitDriver(driverDescription))
 
   case "kill" =>
 val driverId = driverArgs.driverId
-// This assumes only one Master is active at a time
-for (masterActor <- masterActors) {
-  masterActor ! RequestKillDriver(driverId)
-}
+
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
+}
+  }
+
+  /**
+   * Send the message to master and forward the reply to self 
asynchronously.
+   */
+  private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): 
Unit = {
+for (masterEndpoint <- masterEndpoints) {
+  masterEndpoint.ask[T](message).onComplete {
+case Success(v) => self.send(v)
+case Failure(e) =>
+  println(s"Error sending messages to master $masterEndpoint, 
exiting.")
--- End diff --

also let's make this the error message (should we be using println?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33513862
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/Client.scala ---
@@ -82,29 +92,38 @@ private class ClientActor(driverArgs: ClientArguments, 
conf: SparkConf)
   driverArgs.cores,
   driverArgs.supervise,
   command)
-
-// This assumes only one Master is active at a time
-for (masterActor <- masterActors) {
-  masterActor ! RequestSubmitDriver(driverDescription)
-}
+ayncSendToMasterAndForwardReply[SubmitDriverResponse](
+  RequestSubmitDriver(driverDescription))
 
   case "kill" =>
 val driverId = driverArgs.driverId
-// This assumes only one Master is active at a time
-for (masterActor <- masterActors) {
-  masterActor ! RequestKillDriver(driverId)
-}
+
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
+}
+  }
+
+  /**
+   * Send the message to master and forward the reply to self 
asynchronously.
+   */
+  private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): 
Unit = {
+for (masterEndpoint <- masterEndpoints) {
+  masterEndpoint.ask[T](message).onComplete {
+case Success(v) => self.send(v)
+case Failure(e) =>
+  println(s"Error sending messages to master $masterEndpoint, 
exiting.")
--- End diff --

exiting? I wouldn't expect this to be a failure condition given some 
masters may be dead, but maybe the text is just out of date.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-29 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r33513145
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/Client.scala ---
@@ -36,20 +36,30 @@ import org.apache.spark.util.{ActorLogReceive, 
AkkaUtils, RpcUtils, Utils}
  * We currently don't support retry if submission fails. In HA mode, 
client will submit request to
  * all masters and see which one could handle it.
  */
-private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
-  extends Actor with ActorLogReceive with Logging {
-
-  private val masterActors = driverArgs.masters.map { m =>
-context.actorSelection(Master.toAkkaUrl(m, 
AkkaUtils.protocol(context.system)))
-  }
-  private val lostMasters = new HashSet[Address]
-  private var activeMasterActor: ActorSelection = null
-
-  val timeout = RpcUtils.askTimeout(conf)
-
-  override def preStart(): Unit = {
-context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
-
+private class ClientEndpoint(
+override val rpcEnv: RpcEnv,
+driverArgs: ClientArguments,
+masterEndpoints: Seq[RpcEndpointRef],
+conf: SparkConf)
+  extends ThreadSafeRpcEndpoint with Logging {
+
+  // A scheduled executor used to send messages at the specified time.
+  private val forwardMessageThread =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("client-forward-message")
+  // Used to provide the implicit parameter of `Future` methods.
+  private val forwardMessageExecutionContext =
+ExecutionContext.fromExecutor(forwardMessageThread,
+  t => t match {
+case ie: InterruptedException => // Exit normally
+case e =>
--- End diff --

nit: `e: Throwable` explicitly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-109947603
  
  [Test build #34423 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34423/console)
 for   PR 5392 at commit 
[`9137b82`](https://github.com/apache/spark/commit/9137b82af1222424756500a455bd31d50dade6c7).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class SubmitDriverResponse(`
  * `  case class KillDriverResponse(`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-109947635
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-109916760
  
  [Test build #34423 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34423/consoleFull)
 for   PR 5392 at commit 
[`9137b82`](https://github.com/apache/spark/commit/9137b82af1222424756500a455bd31d50dade6c7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-109915245
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-06-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-109915318
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99267587
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31916/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99267573
  
  [Test build #31916 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31916/consoleFull)
 for   PR 5392 at commit 
[`2d24fb5`](https://github.com/apache/spark/commit/2d24fb5ef87930228a5b8486ffc8b4b76b328290).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99267584
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99250480
  
  [Test build #31916 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31916/consoleFull)
 for   PR 5392 at commit 
[`2d24fb5`](https://github.com/apache/spark/commit/2d24fb5ef87930228a5b8486ffc8b4b76b328290).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99250452
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99250432
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99232814
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99232816
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31907/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99232810
  
  [Test build #31907 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31907/consoleFull)
 for   PR 5392 at commit 
[`5a82374`](https://github.com/apache/spark/commit/5a8237473257f6f17862b79bb2358d6119215e27).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class SubmitDriverResponse(`
  * `  case class KillDriverResponse(`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99232525
  
  [Test build #31907 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31907/consoleFull)
 for   PR 5392 at commit 
[`5a82374`](https://github.com/apache/spark/commit/5a8237473257f6f17862b79bb2358d6119215e27).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99232116
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-05-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-99232078
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97695946
  
  [Test build #31388 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31388/consoleFull)
 for   PR 5392 at commit 
[`72304f0`](https://github.com/apache/spark/commit/72304f0150e74eb6432fc3141d3d5bc71bb93d61).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97695958
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97695959
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31388/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97670499
  
  [Test build #31388 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31388/consoleFull)
 for   PR 5392 at commit 
[`72304f0`](https://github.com/apache/spark/commit/72304f0150e74eb6432fc3141d3d5bc71bb93d61).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97670459
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97670448
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97631699
  
  [Test build #31361 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31361/consoleFull)
 for   PR 5392 at commit 
[`e56cb16`](https://github.com/apache/spark/commit/e56cb16614cf6fd997931e7cd574eef3d0ccba5d).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97631748
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31361/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97631746
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97621950
  
  [Test build #31361 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31361/consoleFull)
 for   PR 5392 at commit 
[`e56cb16`](https://github.com/apache/spark/commit/e56cb16614cf6fd997931e7cd574eef3d0ccba5d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97621860
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97621846
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97585686
  
Always send failure to the sender


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r29378265
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -181,24 +189,31 @@ private[worker] class Worker(
 metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
-  private def changeMaster(url: String, uiUrl: String) {
+  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
 // activeMasterUrl it's a valid Spark url since we receive it from 
master.
-activeMasterUrl = url
+activeMasterUrl = masterRef.address.toSparkURL
 activeMasterWebUiUrl = uiUrl
-master = context.actorSelection(
-  Master.toAkkaUrl(activeMasterUrl, 
AkkaUtils.protocol(context.system)))
-masterAddress = Master.toAkkaAddress(activeMasterUrl, 
AkkaUtils.protocol(context.system))
+master = Some(masterRef)
 connected = true
 // Cancel any outstanding re-registration attempts because we found a 
new master
-registrationRetryTimer.foreach(_.cancel())
-registrationRetryTimer = None
+cancelLastRegistrationRetry()
   }
 
-  private def tryRegisterAllMasters() {
-for (masterAkkaUrl <- masterAkkaUrls) {
-  logInfo("Connecting to master " + masterAkkaUrl + "...")
-  val actor = context.actorSelection(masterAkkaUrl)
-  actor ! RegisterWorker(workerId, host, port, cores, memory, 
webUi.boundPort, publicAddress)
+  private def tryRegisterAllMasters(): Array[Future[_]] = {
--- End diff --

Future => JFuture


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97546034
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31300/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97546003
  
  [Test build #31300 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31300/consoleFull)
 for   PR 5392 at commit 
[`aa34b9b`](https://github.com/apache/spark/commit/aa34b9bd3b41b6c4cd834d7b714ca061e31eba6d).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97546032
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97523566
  
  [Test build #31300 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31300/consoleFull)
 for   PR 5392 at commit 
[`aa34b9b`](https://github.com/apache/spark/commit/aa34b9bd3b41b6c4cd834d7b714ca061e31eba6d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97522810
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97522899
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97517073
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97517075
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31298/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97517071
  
  [Test build #31298 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31298/consoleFull)
 for   PR 5392 at commit 
[`bd541e7`](https://github.com/apache/spark/commit/bd541e78b138fadc7680f29ebb540d24e4d862a7).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97516752
  
  [Test build #31298 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31298/consoleFull)
 for   PR 5392 at commit 
[`bd541e7`](https://github.com/apache/spark/commit/bd541e78b138fadc7680f29ebb540d24e4d862a7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97516449
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-97516420
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r29312354
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -78,32 +82,26 @@ private[worker] class Worker(
 val randomNumberGenerator = new 
Random(UUID.randomUUID.getMostSignificantBits)
 randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
   }
-  private val INITIAL_REGISTRATION_RETRY_INTERVAL = (math.round(10 *
-REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds
-  private val PROLONGED_REGISTRATION_RETRY_INTERVAL = (math.round(60
-* REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds
+  private val INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(10 
*
+REGISTRATION_RETRY_FUZZ_MULTIPLIER))
+  private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = 
(math.round(60
+* REGISTRATION_RETRY_FUZZ_MULTIPLIER))
 
   private val CLEANUP_ENABLED = 
conf.getBoolean("spark.worker.cleanup.enabled", false)
   // How often worker will clean up old app folders
   private val CLEANUP_INTERVAL_MILLIS = 
 conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
   // TTL for app folders/data;  after TTL expires it will be cleaned up
-  private val APP_DATA_RETENTION_SECS = 
+  private val APP_DATA_RETENTION_SECS =
--- End diff --

should use consistent naming. change this to SECONDS


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-95494121
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30819/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-95494104
  
  [Test build #30819 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30819/consoleFull)
 for   PR 5392 at commit 
[`25a84d8`](https://github.com/apache/spark/commit/25a84d87a74ec843d2d0a0c9a1a6e8f62e3661f2).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-95472295
  
  [Test build #30819 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30819/consoleFull)
 for   PR 5392 at commit 
[`25a84d8`](https://github.com/apache/spark/commit/25a84d87a74ec843d2d0a0c9a1a6e8f62e3661f2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-95257197
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30763/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-95257184
  
  [Test build #30763 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30763/consoleFull)
 for   PR 5392 at commit 
[`dbfc916`](https://github.com/apache/spark/commit/dbfc91601d8ab85a437df1e387bfd466c209ccea).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  case class Heartbeat(workerId: String, worker: RpcEndpointRef) 
extends DeployMessage`
  * `  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: 
String) extends DeployMessage`
  * `  case class RegisterApplication(appDescription: 
ApplicationDescription, driver: RpcEndpointRef)`
  * `  case class RegisteredApplication(appId: String, master: 
RpcEndpointRef) extends DeployMessage`
  * `  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: 
String)`

 * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/5392#issuecomment-95237267
  
  [Test build #30763 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30763/consoleFull)
 for   PR 5392 at commit 
[`dbfc916`](https://github.com/apache/spark/commit/dbfc91601d8ab85a437df1e387bfd466c209ccea).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r28758602
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +36,128 @@ import org.apache.spark.util.{ActorLogReceive, Utils, 
AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
+@volatile private var registerMasterFutures: Array[Future[_]] = null
+@volatile private var registrationRetryTimer: ScheduledFuture[_] = null
 
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
+private val registerMasterThreadPool = new ThreadPoolExecutor(
+  0,
+  masterRpcAddresses.size, // Make sure we can register with all 
masters at the same time
+  60L, TimeUnit.SECONDS,
+  new SynchronousQueue[Runnable](),
+  Utils.namedThreadFactory("appclient-register-master-threadpool"))
 
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+private val registrationRetryThread =
+  
Utils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
+
+override def onStart(): Unit = {
   try {
-registerWithMaster()
+registerWithMaster(1)
   } catch {
 case e: Exception =>
   logWarning("Failed to connect to master", e)
   markDisconnected()
-  context.stop(self)
+  stop()
   }
 }
 
-def tryRegisterAllMasters() {
-  for (masterAkkaUrl <- masterAkkaUrls) {
-logInfo("Connecting to master " + masterAkkaUrl + "...")
-val actor = context.actorSelection(masterAkkaUrl)
-actor ! RegisterApplication(appDescription)
+private def tryRegisterAllMasters(): Array[Future[_]] = {
--- End diff --

and actually - can you document the logic for registration so it is easier 
to understand for review and future changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r28757696
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +36,128 @@ import org.apache.spark.util.{ActorLogReceive, Utils, 
AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
+@volatile private var registerMasterFutures: Array[Future[_]] = null
+@volatile private var registrationRetryTimer: ScheduledFuture[_] = null
 
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
+private val registerMasterThreadPool = new ThreadPoolExecutor(
+  0,
+  masterRpcAddresses.size, // Make sure we can register with all 
masters at the same time
+  60L, TimeUnit.SECONDS,
+  new SynchronousQueue[Runnable](),
+  Utils.namedThreadFactory("appclient-register-master-threadpool"))
 
-override def preStart() {
-  context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
+private val registrationRetryThread =
--- End diff --

and this one


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r28757688
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
@@ -40,98 +36,128 @@ import org.apache.spark.util.{ActorLogReceive, Utils, 
AkkaUtils}
  * @param masterUrls Each url should look like spark://host:port.
  */
 private[spark] class AppClient(
-actorSystem: ActorSystem,
+rpcEnv: RpcEnv,
 masterUrls: Array[String],
 appDescription: ApplicationDescription,
 listener: AppClientListener,
 conf: SparkConf)
   extends Logging {
 
-  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
+  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
 
-  private val REGISTRATION_TIMEOUT = 20.seconds
+  private val REGISTRATION_TIMEOUT_SECONDS = 20
   private val REGISTRATION_RETRIES = 3
 
-  private var masterAddress: Address = null
-  private var actor: ActorRef = null
+  private var endpoint: RpcEndpointRef = null
   private var appId: String = null
-  private var registered = false
-  private var activeMasterUrl: String = null
+  @volatile private var registered = false
+
+  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
+with Logging {
+
+private var master: Option[RpcEndpointRef] = None
+// To avoid calling listener.disconnected() multiple times
+private var alreadyDisconnected = false
+@volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
+@volatile private var registerMasterFutures: Array[Future[_]] = null
+@volatile private var registrationRetryTimer: ScheduledFuture[_] = null
 
-  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
-var master: ActorSelection = null
-var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
-var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
-var registrationRetryTimer: Option[Cancellable] = None
+private val registerMasterThreadPool = new ThreadPoolExecutor(
--- End diff --

document what this thread does


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r28757518
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -512,13 +573,24 @@ private[worker] class Worker(
 }
   }
 
+  private def sendToMaster(message: Any): Unit = {
+master match {
+  case Some(masterRef) => masterRef.send(message)
+  case None =>
+logWarning(s"Dropping $message because the connection to master 
has not yet established")
--- End diff --

has not yet BEEN established?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r28757203
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -922,6 +922,24 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Wrapper over newSingleThreadExecutor. Thread names are formatted as 
prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newDaemonSingleThreadExecutor(prefix: String): ExecutorService = {
+val threadFactory = namedThreadFactory(prefix)
+Executors.newSingleThreadExecutor(threadFactory)
+  }
+
+  /**
+   * Wrapper over newSingleThreadScheduledExecutor. Thread names are 
formatted as prefix-ID, where
+   * ID is a unique, sequentially assigned integer.
--- End diff --

since this is a single thread, ID is always 1 right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r28757176
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -922,6 +922,24 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Wrapper over newSingleThreadExecutor. Thread names are formatted as 
prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newDaemonSingleThreadExecutor(prefix: String): ExecutorService = {
--- End diff --

probably as a separate PR - might be good to create a ThreadUtils.scala and 
move all the thread pool/executor stuff there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...

2015-04-21 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/5392#discussion_r28756662
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
---
@@ -37,32 +35,35 @@ import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, 
Utils}
+import org.apache.spark.rpc._
+import org.apache.spark.util.{SignalLogger, Utils}
 
-/**
-  * @param masterAkkaUrls Each url should be a valid akka url.
-  */
 private[worker] class Worker(
-host: String,
-port: Int,
+override val rpcEnv: RpcEnv,
 webUiPort: Int,
 cores: Int,
 memory: Int,
-masterAkkaUrls: Array[String],
-actorSystemName: String,
-actorName: String,
+masterRpcAddresses: Array[RpcAddress],
+systemName: String,
+endpointName: String,
 workDirPath: String = null,
 val conf: SparkConf,
 val securityMgr: SecurityManager)
-  extends Actor with ActorLogReceive with Logging {
-  import context.dispatcher
+  extends ThreadSafeRpcEndpoint with Logging {
+
+  private val host = rpcEnv.address.host
+  private val port = rpcEnv.address.port
 
   Utils.checkHost(host, "Expected hostname")
   assert (port > 0)
 
-  // For worker and executor IDs
-  private def createDateFormat = new SimpleDateFormat("MMddHHmmss")  
+  private val forwordMessageScheduler =
--- End diff --

would be great to document the following 2 too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >