[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33644095 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -466,23 +531,13 @@ private[worker] class Worker( case _ => logDebug(s"Driver $driverId changed state to $state") } - master ! DriverStateChanged(driverId, state, exception) + sendToMaster(driverStageChanged) --- End diff -- Ah, it's just for avoiding to create a new instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/5392 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117379182 Alright I'm going to merge this. @zsxwing please submit a separate PR to address the TODO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117354387 Just one comment thanks to your proactive TODOifying :) LGTM, feel free to merge after. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33627528 --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala --- @@ -504,6 +518,7 @@ private[master] class Master( } private def completeRecovery() { +// TODO Why synchronized --- End diff -- This was due to an earlier state in the code when this method could be invoked from a different thread. It can be safely removed now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117290439 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117290218 [Test build #36148 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36148/console) for PR 5392 at commit [`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class SubmitDriverResponse(` * ` case class KillDriverResponse(` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33596669 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -466,23 +531,13 @@ private[worker] class Worker( case _ => logDebug(s"Driver $driverId changed state to $state") } - master ! DriverStateChanged(driverId, state, exception) + sendToMaster(driverStageChanged) --- End diff -- Rather, the change about the `driverStageChanged @` instead of recreating `DriverStateChanged(driverId, state, exception)`. It's not particularly important, but I was just curious if there was a reason. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117240100 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117240149 [Test build #36148 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36148/consoleFull) for PR 5392 at commit [`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117240070 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117239937 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117239510 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117239481 [Test build #36147 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36147/console) for PR 5392 at commit [`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892). * This patch **fails MiMa tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class SubmitDriverResponse(` * ` case class KillDriverResponse(` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117237079 [Test build #36147 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36147/consoleFull) for PR 5392 at commit [`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117236094 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117236062 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117234563 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33582938 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -466,23 +531,13 @@ private[worker] class Worker( case _ => logDebug(s"Driver $driverId changed state to $state") } - master ! DriverStateChanged(driverId, state, exception) + sendToMaster(driverStageChanged) --- End diff -- Could you clarify this point? This is same as the previous codes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33581959 --- Diff: core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala --- @@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils} * @param masterUrls Each url should look like spark://host:port. */ private[spark] class AppClient( -actorSystem: ActorSystem, +rpcEnv: RpcEnv, masterUrls: Array[String], appDescription: ApplicationDescription, listener: AppClientListener, conf: SparkConf) extends Logging { - private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) + private val masterRpcAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) - private val REGISTRATION_TIMEOUT = 20.seconds + private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var masterAddress: Address = null - private var actor: ActorRef = null + private var endpoint: RpcEndpointRef = null private var appId: String = null - private var registered = false - private var activeMasterUrl: String = null - - private class ClientActor extends Actor with ActorLogReceive with Logging { -var master: ActorSelection = null -var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times -var alreadyDead = false // To avoid calling listener.dead() multiple times -var registrationRetryTimer: Option[Cancellable] = None - -override def preStart() { - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + @volatile private var registered = false + + private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint +with Logging { + +private var master: Option[RpcEndpointRef] = None +// To avoid calling listener.disconnected() multiple times +private var alreadyDisconnected = false +@volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times --- End diff -- It may be updated in a separate thread pool or the message loop of ClientEndpoint, so it's volatile. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117128529 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117128498 [Test build #36121 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36121/console) for PR 5392 at commit [`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class SubmitDriverResponse(` * ` case class KillDriverResponse(` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117070709 [Test build #36121 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36121/consoleFull) for PR 5392 at commit [`2de7bed`](https://github.com/apache/spark/commit/2de7bed8c501eda1491fb585cbc3771431385892). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117069066 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-117068957 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-116860938 The core logic all looks good to me, just had some nits. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33519394 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -510,13 +580,25 @@ private[worker] class Worker( } } + private def sendToMaster(message: Any): Unit = { --- End diff -- also doc this guy about its behavior --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33519340 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -466,23 +531,13 @@ private[worker] class Worker( case _ => logDebug(s"Driver $driverId changed state to $state") } - master ! DriverStateChanged(driverId, state, exception) + sendToMaster(driverStageChanged) --- End diff -- This change was not strictly necessary, right? Maybe it used to have a self reference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33519250 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -302,27 +367,27 @@ private[worker] class Worker( // the directory is used by an application - check that the application is not running // when cleaning up val appIdFromDir = dir.getName - val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir) + val isAppStillRunning = appIds.contains(appIdFromDir) dir.isDirectory && !isAppStillRunning && - !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS) + !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS) }.foreach { dir => logInfo(s"Removing directory: ${dir.getPath}") Utils.deleteRecursively(dir) } - } + } (cleanupThreadExecutor) --- End diff -- nit: `}(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33519252 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -302,27 +367,27 @@ private[worker] class Worker( // the directory is used by an application - check that the application is not running // when cleaning up val appIdFromDir = dir.getName - val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir) + val isAppStillRunning = appIds.contains(appIdFromDir) dir.isDirectory && !isAppStillRunning && - !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS) + !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS) }.foreach { dir => logInfo(s"Removing directory: ${dir.getPath}") Utils.deleteRecursively(dir) } - } + } (cleanupThreadExecutor) - cleanupFuture onFailure { + cleanupFuture.onFailure { case e: Throwable => logError("App dir cleanup failed: " + e.getMessage, e) - } + } (cleanupThreadExecutor) --- End diff -- nit: `}(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33519086 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -258,41 +299,65 @@ private[worker] class Worker( } } + /** + * Cancel last registeration retry, or do nothing if no retry + */ + private def cancelLastRegistrationRetry(): Unit = { +if (registerMasterFutures != null) { + registerMasterFutures.foreach(_.cancel(true)) + registerMasterFutures = null +} +registrationRetryTimer.foreach(_.cancel(true)) +registrationRetryTimer = None + } + private def registerWithMaster() { -// DisassociatedEvent may be triggered multiple times, so don't attempt registration +// onDisconnected may be triggered multiple times, so don't attempt registration // if there are outstanding registration attempts scheduled. registrationRetryTimer match { case None => registered = false -tryRegisterAllMasters() +registerMasterFutures = tryRegisterAllMasters() connectionAttemptCount = 0 -registrationRetryTimer = Some { - context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL, -INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) -} +registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(new Runnable { --- End diff -- The indentation of this block is a little sketchy, maybe we can put the `new Runnable` on the next line? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33518507 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -235,21 +250,47 @@ private[worker] class Worker( * still not safe if the old master recovers within this interval, but this is a much * less likely scenario. */ -if (master != null) { - master ! RegisterWorker( -workerId, host, port, cores, memory, webUi.boundPort, publicAddress) -} else { - // We are retrying the initial registration - tryRegisterAllMasters() +master match { + case Some(masterRef) => +// registered == false && master != None means we lost the connection to master, so +// masterRef cannot be used and we need to recreate it again. Note: we must not set +// master to None due to the above comments. +if (registerMasterFutures != null) { + registerMasterFutures.foreach(_.cancel(true)) +} +val masterAddress = masterRef.address +registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable { + override def run(): Unit = +try { + logInfo("Connecting to master " + masterAddress + "...") + val masterEndpoint = +rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) + masterEndpoint.send(RegisterWorker( +workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress)) +} catch { + case ie: InterruptedException => // Cancelled + case NonFatal(e) => logError(e.getMessage, e) --- End diff -- message should probably be like "Failed to connect to master $masterAddress" (can we merge this code path with the above?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33518468 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -235,21 +250,47 @@ private[worker] class Worker( * still not safe if the old master recovers within this interval, but this is a much * less likely scenario. */ -if (master != null) { - master ! RegisterWorker( -workerId, host, port, cores, memory, webUi.boundPort, publicAddress) -} else { - // We are retrying the initial registration - tryRegisterAllMasters() +master match { + case Some(masterRef) => +// registered == false && master != None means we lost the connection to master, so +// masterRef cannot be used and we need to recreate it again. Note: we must not set +// master to None due to the above comments. +if (registerMasterFutures != null) { + registerMasterFutures.foreach(_.cancel(true)) +} +val masterAddress = masterRef.address +registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable { + override def run(): Unit = --- End diff -- nit: braces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33518043 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -181,24 +190,31 @@ private[worker] class Worker( metricsSystem.getServletHandlers.foreach(webUi.attachHandler) } - private def changeMaster(url: String, uiUrl: String) { + private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) { // activeMasterUrl it's a valid Spark url since we receive it from master. -activeMasterUrl = url +activeMasterUrl = masterRef.address.toSparkURL activeMasterWebUiUrl = uiUrl -master = context.actorSelection( - Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system))) -masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system)) +master = Some(masterRef) connected = true // Cancel any outstanding re-registration attempts because we found a new master -registrationRetryTimer.foreach(_.cancel()) -registrationRetryTimer = None +cancelLastRegistrationRetry() } - private def tryRegisterAllMasters() { -for (masterAkkaUrl <- masterAkkaUrls) { - logInfo("Connecting to master " + masterAkkaUrl + "...") - val actor = context.actorSelection(masterAkkaUrl) - actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + private def tryRegisterAllMasters(): Array[JFuture[_]] = { +for (masterAddress <- masterRpcAddresses) yield { + registerMasterThreadPool.submit(new Runnable { +override def run(): Unit = + try { +logInfo("Connecting to master " + masterAddress + "...") +val masterEndpoint = + rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) +masterEndpoint.send(RegisterWorker( + workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress)) + } catch { +case ie: InterruptedException => // Cancelled +case NonFatal(e) => logError(e.getMessage, e) --- End diff -- message should probably be like "Failed to connect to master $masterAddress" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33517979 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -181,24 +190,31 @@ private[worker] class Worker( metricsSystem.getServletHandlers.foreach(webUi.attachHandler) } - private def changeMaster(url: String, uiUrl: String) { + private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) { // activeMasterUrl it's a valid Spark url since we receive it from master. -activeMasterUrl = url +activeMasterUrl = masterRef.address.toSparkURL activeMasterWebUiUrl = uiUrl -master = context.actorSelection( - Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system))) -masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system)) +master = Some(masterRef) connected = true // Cancel any outstanding re-registration attempts because we found a new master -registrationRetryTimer.foreach(_.cancel()) -registrationRetryTimer = None +cancelLastRegistrationRetry() } - private def tryRegisterAllMasters() { -for (masterAkkaUrl <- masterAkkaUrls) { - logInfo("Connecting to master " + masterAkkaUrl + "...") - val actor = context.actorSelection(masterAkkaUrl) - actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + private def tryRegisterAllMasters(): Array[JFuture[_]] = { +for (masterAddress <- masterRpcAddresses) yield { --- End diff -- Maybe just `masterRpcAddresses.map { masterAddress =>` instead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33517925 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -181,24 +190,31 @@ private[worker] class Worker( metricsSystem.getServletHandlers.foreach(webUi.attachHandler) } - private def changeMaster(url: String, uiUrl: String) { + private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) { // activeMasterUrl it's a valid Spark url since we receive it from master. -activeMasterUrl = url +activeMasterUrl = masterRef.address.toSparkURL activeMasterWebUiUrl = uiUrl -master = context.actorSelection( - Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system))) -masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system)) +master = Some(masterRef) connected = true // Cancel any outstanding re-registration attempts because we found a new master -registrationRetryTimer.foreach(_.cancel()) -registrationRetryTimer = None +cancelLastRegistrationRetry() } - private def tryRegisterAllMasters() { -for (masterAkkaUrl <- masterAkkaUrls) { - logInfo("Connecting to master " + masterAkkaUrl + "...") - val actor = context.actorSelection(masterAkkaUrl) - actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + private def tryRegisterAllMasters(): Array[JFuture[_]] = { +for (masterAddress <- masterRpcAddresses) yield { + registerMasterThreadPool.submit(new Runnable { +override def run(): Unit = --- End diff -- nit: add braces for this method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33515502 --- Diff: core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala --- @@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils} * @param masterUrls Each url should look like spark://host:port. */ private[spark] class AppClient( -actorSystem: ActorSystem, +rpcEnv: RpcEnv, masterUrls: Array[String], appDescription: ApplicationDescription, listener: AppClientListener, conf: SparkConf) extends Logging { - private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) + private val masterRpcAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) - private val REGISTRATION_TIMEOUT = 20.seconds + private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var masterAddress: Address = null - private var actor: ActorRef = null + private var endpoint: RpcEndpointRef = null private var appId: String = null - private var registered = false - private var activeMasterUrl: String = null - - private class ClientActor extends Actor with ActorLogReceive with Logging { -var master: ActorSelection = null -var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times -var alreadyDead = false // To avoid calling listener.dead() multiple times -var registrationRetryTimer: Option[Cancellable] = None - -override def preStart() { - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + @volatile private var registered = false + + private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint +with Logging { + +private var master: Option[RpcEndpointRef] = None +// To avoid calling listener.disconnected() multiple times +private var alreadyDisconnected = false +@volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times +@volatile private var registerMasterFutures: Array[JFuture[_]] = null +@volatile private var registrationRetryTimer: JScheduledFuture[_] = null + +// A thread pool for registering with masters. Because registering with a master is a blocking +// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same +// time so that we can register with all masters. +private val registerMasterThreadPool = new ThreadPoolExecutor( + 0, + masterRpcAddresses.size, // Make sure we can register with all masters at the same time + 60L, TimeUnit.SECONDS, + new SynchronousQueue[Runnable](), + ThreadUtils.namedThreadFactory("appclient-register-master-threadpool")) + +// A scheduled executor for scheduling the registration actions +private val registrationRetryThread = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") + +override def onStart(): Unit = { try { -registerWithMaster() +registerWithMaster(1) } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() - context.stop(self) + stop() } } -def tryRegisterAllMasters() { - for (masterAkkaUrl <- masterAkkaUrls) { -logInfo("Connecting to master " + masterAkkaUrl + "...") -val actor = context.actorSelection(masterAkkaUrl) -actor ! RegisterApplication(appDescription) +/** + * Register with all masters asynchronously and returns an array `Future`s for cancellation. + */ +private def tryRegisterAllMasters(): Array[JFuture[_]] = { + for (masterAddress <- masterRpcAddresses) yield { +registerMasterThreadPool.submit(new Runnable { + override def run(): Unit = try { +if (registered) { + return +} +logInfo("Connecting to master " + masterAddress.toSparkURL + "...") +val masterRef = + rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) +masterRef.send(RegisterApplication(appDescription, self)) + } catch { +case ie: InterruptedException => // Cancelled +case NonFatal(e) => logError(e.getMessage, e) + } +}) } } -def registerWithMaster() { - tryRegisterAllMasters() - import context.dispatche
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33515404 --- Diff: core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala --- @@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils} * @param masterUrls Each url should look like spark://host:port. */ private[spark] class AppClient( -actorSystem: ActorSystem, +rpcEnv: RpcEnv, masterUrls: Array[String], appDescription: ApplicationDescription, listener: AppClientListener, conf: SparkConf) extends Logging { - private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) + private val masterRpcAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) - private val REGISTRATION_TIMEOUT = 20.seconds + private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var masterAddress: Address = null - private var actor: ActorRef = null + private var endpoint: RpcEndpointRef = null private var appId: String = null - private var registered = false - private var activeMasterUrl: String = null - - private class ClientActor extends Actor with ActorLogReceive with Logging { -var master: ActorSelection = null -var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times -var alreadyDead = false // To avoid calling listener.dead() multiple times -var registrationRetryTimer: Option[Cancellable] = None - -override def preStart() { - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + @volatile private var registered = false + + private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint +with Logging { + +private var master: Option[RpcEndpointRef] = None +// To avoid calling listener.disconnected() multiple times +private var alreadyDisconnected = false +@volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times +@volatile private var registerMasterFutures: Array[JFuture[_]] = null +@volatile private var registrationRetryTimer: JScheduledFuture[_] = null + +// A thread pool for registering with masters. Because registering with a master is a blocking +// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same +// time so that we can register with all masters. +private val registerMasterThreadPool = new ThreadPoolExecutor( + 0, + masterRpcAddresses.size, // Make sure we can register with all masters at the same time + 60L, TimeUnit.SECONDS, + new SynchronousQueue[Runnable](), + ThreadUtils.namedThreadFactory("appclient-register-master-threadpool")) + +// A scheduled executor for scheduling the registration actions +private val registrationRetryThread = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") + +override def onStart(): Unit = { try { -registerWithMaster() +registerWithMaster(1) } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() - context.stop(self) + stop() } } -def tryRegisterAllMasters() { - for (masterAkkaUrl <- masterAkkaUrls) { -logInfo("Connecting to master " + masterAkkaUrl + "...") -val actor = context.actorSelection(masterAkkaUrl) -actor ! RegisterApplication(appDescription) +/** + * Register with all masters asynchronously and returns an array `Future`s for cancellation. + */ +private def tryRegisterAllMasters(): Array[JFuture[_]] = { + for (masterAddress <- masterRpcAddresses) yield { +registerMasterThreadPool.submit(new Runnable { + override def run(): Unit = try { +if (registered) { + return +} +logInfo("Connecting to master " + masterAddress.toSparkURL + "...") +val masterRef = + rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) +masterRef.send(RegisterApplication(appDescription, self)) + } catch { +case ie: InterruptedException => // Cancelled +case NonFatal(e) => logError(e.getMessage, e) + } +}) } } -def registerWithMaster() { - tryRegisterAllMasters() - import context.dispatche
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33514995 --- Diff: core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala --- @@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils} * @param masterUrls Each url should look like spark://host:port. */ private[spark] class AppClient( -actorSystem: ActorSystem, +rpcEnv: RpcEnv, masterUrls: Array[String], appDescription: ApplicationDescription, listener: AppClientListener, conf: SparkConf) extends Logging { - private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) + private val masterRpcAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) - private val REGISTRATION_TIMEOUT = 20.seconds + private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var masterAddress: Address = null - private var actor: ActorRef = null + private var endpoint: RpcEndpointRef = null private var appId: String = null - private var registered = false - private var activeMasterUrl: String = null - - private class ClientActor extends Actor with ActorLogReceive with Logging { -var master: ActorSelection = null -var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times -var alreadyDead = false // To avoid calling listener.dead() multiple times -var registrationRetryTimer: Option[Cancellable] = None - -override def preStart() { - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + @volatile private var registered = false + + private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint +with Logging { + +private var master: Option[RpcEndpointRef] = None +// To avoid calling listener.disconnected() multiple times +private var alreadyDisconnected = false +@volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times +@volatile private var registerMasterFutures: Array[JFuture[_]] = null +@volatile private var registrationRetryTimer: JScheduledFuture[_] = null + +// A thread pool for registering with masters. Because registering with a master is a blocking +// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same +// time so that we can register with all masters. +private val registerMasterThreadPool = new ThreadPoolExecutor( + 0, + masterRpcAddresses.size, // Make sure we can register with all masters at the same time + 60L, TimeUnit.SECONDS, + new SynchronousQueue[Runnable](), + ThreadUtils.namedThreadFactory("appclient-register-master-threadpool")) + +// A scheduled executor for scheduling the registration actions +private val registrationRetryThread = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") + +override def onStart(): Unit = { try { -registerWithMaster() +registerWithMaster(1) } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() - context.stop(self) + stop() } } -def tryRegisterAllMasters() { - for (masterAkkaUrl <- masterAkkaUrls) { -logInfo("Connecting to master " + masterAkkaUrl + "...") -val actor = context.actorSelection(masterAkkaUrl) -actor ! RegisterApplication(appDescription) +/** + * Register with all masters asynchronously and returns an array `Future`s for cancellation. + */ +private def tryRegisterAllMasters(): Array[JFuture[_]] = { + for (masterAddress <- masterRpcAddresses) yield { +registerMasterThreadPool.submit(new Runnable { + override def run(): Unit = try { +if (registered) { + return +} +logInfo("Connecting to master " + masterAddress.toSparkURL + "...") +val masterRef = + rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) +masterRef.send(RegisterApplication(appDescription, self)) + } catch { +case ie: InterruptedException => // Cancelled +case NonFatal(e) => logError(e.getMessage, e) --- End diff -- message should probably be like "Failed to connect to $masterAddress" --- If your project is set up for it, you can reply to this email an
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33514893 --- Diff: core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala --- @@ -40,98 +37,139 @@ import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils} * @param masterUrls Each url should look like spark://host:port. */ private[spark] class AppClient( -actorSystem: ActorSystem, +rpcEnv: RpcEnv, masterUrls: Array[String], appDescription: ApplicationDescription, listener: AppClientListener, conf: SparkConf) extends Logging { - private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) + private val masterRpcAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) - private val REGISTRATION_TIMEOUT = 20.seconds + private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var masterAddress: Address = null - private var actor: ActorRef = null + private var endpoint: RpcEndpointRef = null private var appId: String = null - private var registered = false - private var activeMasterUrl: String = null - - private class ClientActor extends Actor with ActorLogReceive with Logging { -var master: ActorSelection = null -var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times -var alreadyDead = false // To avoid calling listener.dead() multiple times -var registrationRetryTimer: Option[Cancellable] = None - -override def preStart() { - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + @volatile private var registered = false + + private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint +with Logging { + +private var master: Option[RpcEndpointRef] = None +// To avoid calling listener.disconnected() multiple times +private var alreadyDisconnected = false +@volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times --- End diff -- It's not generally true that ThreadSafeRpcEndpoints require their mutable state to be volatile, right? Perhaps this is just being modified from a separate thread pool? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33513905 --- Diff: core/src/main/scala/org/apache/spark/deploy/Client.scala --- @@ -82,29 +92,38 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) driverArgs.cores, driverArgs.supervise, command) - -// This assumes only one Master is active at a time -for (masterActor <- masterActors) { - masterActor ! RequestSubmitDriver(driverDescription) -} +ayncSendToMasterAndForwardReply[SubmitDriverResponse]( + RequestSubmitDriver(driverDescription)) case "kill" => val driverId = driverArgs.driverId -// This assumes only one Master is active at a time -for (masterActor <- masterActors) { - masterActor ! RequestKillDriver(driverId) -} + ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) +} + } + + /** + * Send the message to master and forward the reply to self asynchronously. + */ + private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = { +for (masterEndpoint <- masterEndpoints) { + masterEndpoint.ask[T](message).onComplete { +case Success(v) => self.send(v) +case Failure(e) => + println(s"Error sending messages to master $masterEndpoint, exiting.") --- End diff -- also let's make this the error message (should we be using println?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33513862 --- Diff: core/src/main/scala/org/apache/spark/deploy/Client.scala --- @@ -82,29 +92,38 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) driverArgs.cores, driverArgs.supervise, command) - -// This assumes only one Master is active at a time -for (masterActor <- masterActors) { - masterActor ! RequestSubmitDriver(driverDescription) -} +ayncSendToMasterAndForwardReply[SubmitDriverResponse]( + RequestSubmitDriver(driverDescription)) case "kill" => val driverId = driverArgs.driverId -// This assumes only one Master is active at a time -for (masterActor <- masterActors) { - masterActor ! RequestKillDriver(driverId) -} + ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) +} + } + + /** + * Send the message to master and forward the reply to self asynchronously. + */ + private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = { +for (masterEndpoint <- masterEndpoints) { + masterEndpoint.ask[T](message).onComplete { +case Success(v) => self.send(v) +case Failure(e) => + println(s"Error sending messages to master $masterEndpoint, exiting.") --- End diff -- exiting? I wouldn't expect this to be a failure condition given some masters may be dead, but maybe the text is just out of date. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r33513145 --- Diff: core/src/main/scala/org/apache/spark/deploy/Client.scala --- @@ -36,20 +36,30 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils} * We currently don't support retry if submission fails. In HA mode, client will submit request to * all masters and see which one could handle it. */ -private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) - extends Actor with ActorLogReceive with Logging { - - private val masterActors = driverArgs.masters.map { m => -context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system))) - } - private val lostMasters = new HashSet[Address] - private var activeMasterActor: ActorSelection = null - - val timeout = RpcUtils.askTimeout(conf) - - override def preStart(): Unit = { -context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - +private class ClientEndpoint( +override val rpcEnv: RpcEnv, +driverArgs: ClientArguments, +masterEndpoints: Seq[RpcEndpointRef], +conf: SparkConf) + extends ThreadSafeRpcEndpoint with Logging { + + // A scheduled executor used to send messages at the specified time. + private val forwardMessageThread = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("client-forward-message") + // Used to provide the implicit parameter of `Future` methods. + private val forwardMessageExecutionContext = +ExecutionContext.fromExecutor(forwardMessageThread, + t => t match { +case ie: InterruptedException => // Exit normally +case e => --- End diff -- nit: `e: Throwable` explicitly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-109947603 [Test build #34423 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34423/console) for PR 5392 at commit [`9137b82`](https://github.com/apache/spark/commit/9137b82af1222424756500a455bd31d50dade6c7). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class SubmitDriverResponse(` * ` case class KillDriverResponse(` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-109947635 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-109916760 [Test build #34423 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/34423/consoleFull) for PR 5392 at commit [`9137b82`](https://github.com/apache/spark/commit/9137b82af1222424756500a455bd31d50dade6c7). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-109915245 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-109915318 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99267587 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31916/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99267573 [Test build #31916 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31916/consoleFull) for PR 5392 at commit [`2d24fb5`](https://github.com/apache/spark/commit/2d24fb5ef87930228a5b8486ffc8b4b76b328290). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99267584 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99250480 [Test build #31916 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31916/consoleFull) for PR 5392 at commit [`2d24fb5`](https://github.com/apache/spark/commit/2d24fb5ef87930228a5b8486ffc8b4b76b328290). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99250452 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99250432 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99232814 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99232816 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31907/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99232810 [Test build #31907 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31907/consoleFull) for PR 5392 at commit [`5a82374`](https://github.com/apache/spark/commit/5a8237473257f6f17862b79bb2358d6119215e27). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class SubmitDriverResponse(` * ` case class KillDriverResponse(` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99232525 [Test build #31907 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31907/consoleFull) for PR 5392 at commit [`5a82374`](https://github.com/apache/spark/commit/5a8237473257f6f17862b79bb2358d6119215e27). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99232116 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-99232078 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97695946 [Test build #31388 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31388/consoleFull) for PR 5392 at commit [`72304f0`](https://github.com/apache/spark/commit/72304f0150e74eb6432fc3141d3d5bc71bb93d61). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97695958 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97695959 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31388/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97670499 [Test build #31388 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31388/consoleFull) for PR 5392 at commit [`72304f0`](https://github.com/apache/spark/commit/72304f0150e74eb6432fc3141d3d5bc71bb93d61). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97670459 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97670448 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97631699 [Test build #31361 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31361/consoleFull) for PR 5392 at commit [`e56cb16`](https://github.com/apache/spark/commit/e56cb16614cf6fd997931e7cd574eef3d0ccba5d). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97631748 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31361/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97631746 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97621950 [Test build #31361 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31361/consoleFull) for PR 5392 at commit [`e56cb16`](https://github.com/apache/spark/commit/e56cb16614cf6fd997931e7cd574eef3d0ccba5d). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97621860 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97621846 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97585686 Always send failure to the sender --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r29378265 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -181,24 +189,31 @@ private[worker] class Worker( metricsSystem.getServletHandlers.foreach(webUi.attachHandler) } - private def changeMaster(url: String, uiUrl: String) { + private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) { // activeMasterUrl it's a valid Spark url since we receive it from master. -activeMasterUrl = url +activeMasterUrl = masterRef.address.toSparkURL activeMasterWebUiUrl = uiUrl -master = context.actorSelection( - Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system))) -masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system)) +master = Some(masterRef) connected = true // Cancel any outstanding re-registration attempts because we found a new master -registrationRetryTimer.foreach(_.cancel()) -registrationRetryTimer = None +cancelLastRegistrationRetry() } - private def tryRegisterAllMasters() { -for (masterAkkaUrl <- masterAkkaUrls) { - logInfo("Connecting to master " + masterAkkaUrl + "...") - val actor = context.actorSelection(masterAkkaUrl) - actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + private def tryRegisterAllMasters(): Array[Future[_]] = { --- End diff -- Future => JFuture --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97546034 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31300/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97546003 [Test build #31300 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31300/consoleFull) for PR 5392 at commit [`aa34b9b`](https://github.com/apache/spark/commit/aa34b9bd3b41b6c4cd834d7b714ca061e31eba6d). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97546032 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97523566 [Test build #31300 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31300/consoleFull) for PR 5392 at commit [`aa34b9b`](https://github.com/apache/spark/commit/aa34b9bd3b41b6c4cd834d7b714ca061e31eba6d). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97522810 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97522899 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97517073 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97517075 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31298/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97517071 [Test build #31298 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31298/consoleFull) for PR 5392 at commit [`bd541e7`](https://github.com/apache/spark/commit/bd541e78b138fadc7680f29ebb540d24e4d862a7). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97516752 [Test build #31298 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31298/consoleFull) for PR 5392 at commit [`bd541e7`](https://github.com/apache/spark/commit/bd541e78b138fadc7680f29ebb540d24e4d862a7). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97516449 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-97516420 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r29312354 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -78,32 +82,26 @@ private[worker] class Worker( val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits) randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND } - private val INITIAL_REGISTRATION_RETRY_INTERVAL = (math.round(10 * -REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds - private val PROLONGED_REGISTRATION_RETRY_INTERVAL = (math.round(60 -* REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds + private val INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(10 * +REGISTRATION_RETRY_FUZZ_MULTIPLIER)) + private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60 +* REGISTRATION_RETRY_FUZZ_MULTIPLIER)) private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false) // How often worker will clean up old app folders private val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000 // TTL for app folders/data; after TTL expires it will be cleaned up - private val APP_DATA_RETENTION_SECS = + private val APP_DATA_RETENTION_SECS = --- End diff -- should use consistent naming. change this to SECONDS --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-95494121 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30819/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-95494104 [Test build #30819 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30819/consoleFull) for PR 5392 at commit [`25a84d8`](https://github.com/apache/spark/commit/25a84d87a74ec843d2d0a0c9a1a6e8f62e3661f2). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-95472295 [Test build #30819 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30819/consoleFull) for PR 5392 at commit [`25a84d8`](https://github.com/apache/spark/commit/25a84d87a74ec843d2d0a0c9a1a6e8f62e3661f2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-95257197 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30763/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-95257184 [Test build #30763 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30763/consoleFull) for PR 5392 at commit [`dbfc916`](https://github.com/apache/spark/commit/dbfc91601d8ab85a437df1e387bfd466c209ccea). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage` * ` case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage` * ` case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)` * ` case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage` * ` case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)` * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5392#issuecomment-95237267 [Test build #30763 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30763/consoleFull) for PR 5392 at commit [`dbfc916`](https://github.com/apache/spark/commit/dbfc91601d8ab85a437df1e387bfd466c209ccea). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r28758602 --- Diff: core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala --- @@ -40,98 +36,128 @@ import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils} * @param masterUrls Each url should look like spark://host:port. */ private[spark] class AppClient( -actorSystem: ActorSystem, +rpcEnv: RpcEnv, masterUrls: Array[String], appDescription: ApplicationDescription, listener: AppClientListener, conf: SparkConf) extends Logging { - private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) + private val masterRpcAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) - private val REGISTRATION_TIMEOUT = 20.seconds + private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var masterAddress: Address = null - private var actor: ActorRef = null + private var endpoint: RpcEndpointRef = null private var appId: String = null - private var registered = false - private var activeMasterUrl: String = null + @volatile private var registered = false + + private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint +with Logging { + +private var master: Option[RpcEndpointRef] = None +// To avoid calling listener.disconnected() multiple times +private var alreadyDisconnected = false +@volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times +@volatile private var registerMasterFutures: Array[Future[_]] = null +@volatile private var registrationRetryTimer: ScheduledFuture[_] = null - private class ClientActor extends Actor with ActorLogReceive with Logging { -var master: ActorSelection = null -var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times -var alreadyDead = false // To avoid calling listener.dead() multiple times -var registrationRetryTimer: Option[Cancellable] = None +private val registerMasterThreadPool = new ThreadPoolExecutor( + 0, + masterRpcAddresses.size, // Make sure we can register with all masters at the same time + 60L, TimeUnit.SECONDS, + new SynchronousQueue[Runnable](), + Utils.namedThreadFactory("appclient-register-master-threadpool")) -override def preStart() { - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) +private val registrationRetryThread = + Utils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread") + +override def onStart(): Unit = { try { -registerWithMaster() +registerWithMaster(1) } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() - context.stop(self) + stop() } } -def tryRegisterAllMasters() { - for (masterAkkaUrl <- masterAkkaUrls) { -logInfo("Connecting to master " + masterAkkaUrl + "...") -val actor = context.actorSelection(masterAkkaUrl) -actor ! RegisterApplication(appDescription) +private def tryRegisterAllMasters(): Array[Future[_]] = { --- End diff -- and actually - can you document the logic for registration so it is easier to understand for review and future changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r28757696 --- Diff: core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala --- @@ -40,98 +36,128 @@ import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils} * @param masterUrls Each url should look like spark://host:port. */ private[spark] class AppClient( -actorSystem: ActorSystem, +rpcEnv: RpcEnv, masterUrls: Array[String], appDescription: ApplicationDescription, listener: AppClientListener, conf: SparkConf) extends Logging { - private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) + private val masterRpcAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) - private val REGISTRATION_TIMEOUT = 20.seconds + private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var masterAddress: Address = null - private var actor: ActorRef = null + private var endpoint: RpcEndpointRef = null private var appId: String = null - private var registered = false - private var activeMasterUrl: String = null + @volatile private var registered = false + + private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint +with Logging { + +private var master: Option[RpcEndpointRef] = None +// To avoid calling listener.disconnected() multiple times +private var alreadyDisconnected = false +@volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times +@volatile private var registerMasterFutures: Array[Future[_]] = null +@volatile private var registrationRetryTimer: ScheduledFuture[_] = null - private class ClientActor extends Actor with ActorLogReceive with Logging { -var master: ActorSelection = null -var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times -var alreadyDead = false // To avoid calling listener.dead() multiple times -var registrationRetryTimer: Option[Cancellable] = None +private val registerMasterThreadPool = new ThreadPoolExecutor( + 0, + masterRpcAddresses.size, // Make sure we can register with all masters at the same time + 60L, TimeUnit.SECONDS, + new SynchronousQueue[Runnable](), + Utils.namedThreadFactory("appclient-register-master-threadpool")) -override def preStart() { - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) +private val registrationRetryThread = --- End diff -- and this one --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r28757688 --- Diff: core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala --- @@ -40,98 +36,128 @@ import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils} * @param masterUrls Each url should look like spark://host:port. */ private[spark] class AppClient( -actorSystem: ActorSystem, +rpcEnv: RpcEnv, masterUrls: Array[String], appDescription: ApplicationDescription, listener: AppClientListener, conf: SparkConf) extends Logging { - private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) + private val masterRpcAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) - private val REGISTRATION_TIMEOUT = 20.seconds + private val REGISTRATION_TIMEOUT_SECONDS = 20 private val REGISTRATION_RETRIES = 3 - private var masterAddress: Address = null - private var actor: ActorRef = null + private var endpoint: RpcEndpointRef = null private var appId: String = null - private var registered = false - private var activeMasterUrl: String = null + @volatile private var registered = false + + private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint +with Logging { + +private var master: Option[RpcEndpointRef] = None +// To avoid calling listener.disconnected() multiple times +private var alreadyDisconnected = false +@volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times +@volatile private var registerMasterFutures: Array[Future[_]] = null +@volatile private var registrationRetryTimer: ScheduledFuture[_] = null - private class ClientActor extends Actor with ActorLogReceive with Logging { -var master: ActorSelection = null -var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times -var alreadyDead = false // To avoid calling listener.dead() multiple times -var registrationRetryTimer: Option[Cancellable] = None +private val registerMasterThreadPool = new ThreadPoolExecutor( --- End diff -- document what this thread does --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r28757518 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -512,13 +573,24 @@ private[worker] class Worker( } } + private def sendToMaster(message: Any): Unit = { +master match { + case Some(masterRef) => masterRef.send(message) + case None => +logWarning(s"Dropping $message because the connection to master has not yet established") --- End diff -- has not yet BEEN established? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r28757203 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -922,6 +922,24 @@ private[spark] object Utils extends Logging { } /** + * Wrapper over newSingleThreadExecutor. Thread names are formatted as prefix-ID, where ID is a + * unique, sequentially assigned integer. + */ + def newDaemonSingleThreadExecutor(prefix: String): ExecutorService = { +val threadFactory = namedThreadFactory(prefix) +Executors.newSingleThreadExecutor(threadFactory) + } + + /** + * Wrapper over newSingleThreadScheduledExecutor. Thread names are formatted as prefix-ID, where + * ID is a unique, sequentially assigned integer. --- End diff -- since this is a single thread, ID is always 1 right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r28757176 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -922,6 +922,24 @@ private[spark] object Utils extends Logging { } /** + * Wrapper over newSingleThreadExecutor. Thread names are formatted as prefix-ID, where ID is a + * unique, sequentially assigned integer. + */ + def newDaemonSingleThreadExecutor(prefix: String): ExecutorService = { --- End diff -- probably as a separate PR - might be good to create a ThreadUtils.scala and move all the thread pool/executor stuff there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6602][Core] Update Master, Worker, Clie...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/5392#discussion_r28756662 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala --- @@ -37,32 +35,35 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils} +import org.apache.spark.rpc._ +import org.apache.spark.util.{SignalLogger, Utils} -/** - * @param masterAkkaUrls Each url should be a valid akka url. - */ private[worker] class Worker( -host: String, -port: Int, +override val rpcEnv: RpcEnv, webUiPort: Int, cores: Int, memory: Int, -masterAkkaUrls: Array[String], -actorSystemName: String, -actorName: String, +masterRpcAddresses: Array[RpcAddress], +systemName: String, +endpointName: String, workDirPath: String = null, val conf: SparkConf, val securityMgr: SecurityManager) - extends Actor with ActorLogReceive with Logging { - import context.dispatcher + extends ThreadSafeRpcEndpoint with Logging { + + private val host = rpcEnv.address.host + private val port = rpcEnv.address.port Utils.checkHost(host, "Expected hostname") assert (port > 0) - // For worker and executor IDs - private def createDateFormat = new SimpleDateFormat("MMddHHmmss") + private val forwordMessageScheduler = --- End diff -- would be great to document the following 2 too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org