Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5392#discussion_r28757696
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ---
    @@ -40,98 +36,128 @@ import org.apache.spark.util.{ActorLogReceive, Utils, 
AkkaUtils}
      * @param masterUrls Each url should look like spark://host:port.
      */
     private[spark] class AppClient(
    -    actorSystem: ActorSystem,
    +    rpcEnv: RpcEnv,
         masterUrls: Array[String],
         appDescription: ApplicationDescription,
         listener: AppClientListener,
         conf: SparkConf)
       extends Logging {
     
    -  private val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, 
AkkaUtils.protocol(actorSystem)))
    +  private val masterRpcAddresses = 
masterUrls.map(RpcAddress.fromSparkURL(_))
     
    -  private val REGISTRATION_TIMEOUT = 20.seconds
    +  private val REGISTRATION_TIMEOUT_SECONDS = 20
       private val REGISTRATION_RETRIES = 3
     
    -  private var masterAddress: Address = null
    -  private var actor: ActorRef = null
    +  private var endpoint: RpcEndpointRef = null
       private var appId: String = null
    -  private var registered = false
    -  private var activeMasterUrl: String = null
    +  @volatile private var registered = false
    +
    +  private class ClientEndpoint(override val rpcEnv: RpcEnv) extends 
ThreadSafeRpcEndpoint
    +    with Logging {
    +
    +    private var master: Option[RpcEndpointRef] = None
    +    // To avoid calling listener.disconnected() multiple times
    +    private var alreadyDisconnected = false
    +    @volatile private var alreadyDead = false // To avoid calling 
listener.dead() multiple times
    +    @volatile private var registerMasterFutures: Array[Future[_]] = null
    +    @volatile private var registrationRetryTimer: ScheduledFuture[_] = null
     
    -  private class ClientActor extends Actor with ActorLogReceive with 
Logging {
    -    var master: ActorSelection = null
    -    var alreadyDisconnected = false  // To avoid calling 
listener.disconnected() multiple times
    -    var alreadyDead = false  // To avoid calling listener.dead() multiple 
times
    -    var registrationRetryTimer: Option[Cancellable] = None
    +    private val registerMasterThreadPool = new ThreadPoolExecutor(
    +      0,
    +      masterRpcAddresses.size, // Make sure we can register with all 
masters at the same time
    +      60L, TimeUnit.SECONDS,
    +      new SynchronousQueue[Runnable](),
    +      Utils.namedThreadFactory("appclient-register-master-threadpool"))
     
    -    override def preStart() {
    -      context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
    +    private val registrationRetryThread =
    --- End diff --
    
    and this one


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to