Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5268#discussion_r27490525
  
    --- Diff: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 ---
    @@ -21,39 +21,43 @@ import java.net.URL
     import java.nio.ByteBuffer
     
     import scala.collection.mutable
    -import scala.concurrent.Await
    +import scala.util.{Failure, Success}
     
    -import akka.actor.{Actor, ActorSelection, Props}
    -import akka.pattern.Patterns
    -import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
    -
    -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
    +import org.apache.spark.rpc._
    +import org.apache.spark._
     import org.apache.spark.TaskState.TaskState
     import org.apache.spark.deploy.SparkHadoopUtil
     import org.apache.spark.deploy.worker.WorkerWatcher
     import org.apache.spark.scheduler.TaskDescription
     import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
    -import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, 
Utils}
    +import org.apache.spark.util.{SignalLogger, Utils}
     
     private[spark] class CoarseGrainedExecutorBackend(
    +    override val rpcEnv: RpcEnv,
         driverUrl: String,
         executorId: String,
         hostPort: String,
         cores: Int,
         userClassPath: Seq[URL],
         env: SparkEnv)
    -  extends Actor with ActorLogReceive with ExecutorBackend with Logging {
    +  extends RpcEndpoint with ExecutorBackend with Logging {
     
       Utils.checkHostPort(hostPort, "Expected hostport")
     
       var executor: Executor = null
    -  var driver: ActorSelection = null
    +  @volatile var driver: Option[RpcEndpointRef] = None
     
    -  override def preStart() {
    +  override def onStart() {
    +    import scala.concurrent.ExecutionContext.Implicits.global
         logInfo("Connecting to driver: " + driverUrl)
    -    driver = context.actorSelection(driverUrl)
    -    driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
    -    context.system.eventStream.subscribe(self, 
classOf[RemotingLifecycleEvent])
    +    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
    +      driver = Some(ref)
    +      ref.sendWithReply[RegisteredExecutor.type](
    +        RegisterExecutor(executorId, self, hostPort, cores, 
extractLogUrls))
    +    } onComplete {
    +      case Success(msg) => self.send(msg)
    +      case Failure(e) => logError(s"Cannot register to driver: 
$driverUrl", e)
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to