Github user skonto commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17031#discussion_r103281854
  
    --- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
    @@ -582,141 +688,33 @@ private[spark] class MesosClusterScheduler(
         }
       }
     
    -  override def resourceOffers(driver: SchedulerDriver, offers: 
JList[Offer]): Unit = {
    -    logTrace(s"Received offers from Mesos: 
\n${offers.asScala.mkString("\n")}")
    -    val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
    -    val currentTime = new Date()
    -
    -    val currentOffers = offers.asScala.map {
    -      o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
    -    }.toList
    -
    -    stateLock.synchronized {
    -      // We first schedule all the supervised drivers that are ready to 
retry.
    -      // This list will be empty if none of the drivers are marked as 
supervise.
    -      val driversToRetry = pendingRetryDrivers.filter { d =>
    -        d.retryState.get.nextRetry.before(currentTime)
    -      }
    -
    -      scheduleTasks(
    -        copyBuffer(driversToRetry),
    -        removeFromPendingRetryDrivers,
    -        currentOffers,
    -        tasks)
    -
    -      // Then we walk through the queued drivers and try to schedule them.
    -      scheduleTasks(
    -        copyBuffer(queuedDrivers),
    -        removeFromQueuedDrivers,
    -        currentOffers,
    -        tasks)
    -    }
    -    tasks.foreach { case (offerId, taskInfos) =>
    -      driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
    -    }
    -
    -    for (o <- currentOffers if !tasks.contains(o.offerId)) {
    -      driver.declineOffer(o.offerId)
    -    }
    -  }
    -
    -  private def copyBuffer(
    -      buffer: ArrayBuffer[MesosDriverDescription]): 
ArrayBuffer[MesosDriverDescription] = {
    -    val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
    -    buffer.copyToBuffer(newBuffer)
    -    newBuffer
    -  }
    -
    -  def getSchedulerState(): MesosClusterSchedulerState = {
    -    stateLock.synchronized {
    -      new MesosClusterSchedulerState(
    -        frameworkId,
    -        masterInfo.map(m => s"http://${m.getIp}:${m.getPort}";),
    -        copyBuffer(queuedDrivers),
    -        launchedDrivers.values.map(_.copy()).toList,
    -        finishedDrivers.map(_.copy()).toList,
    -        copyBuffer(pendingRetryDrivers))
    -    }
    -  }
    -
    -  override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): 
Unit = {}
    -  override def disconnected(driver: SchedulerDriver): Unit = {}
    -  override def reregistered(driver: SchedulerDriver, masterInfo: 
MasterInfo): Unit = {
    -    logInfo(s"Framework re-registered with master ${masterInfo.getId}")
    -  }
    -  override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit 
= {}
    -  override def error(driver: SchedulerDriver, error: String): Unit = {
    -    logError("Error received: " + error)
    -    markErr()
    -  }
    +  private def createTaskInfo(desc: MesosDriverDescription, offer: 
ResourceOffer): TaskInfo = {
    +    val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
     
    -  /**
    -   * Check if the task state is a recoverable state that we can relaunch 
the task.
    -   * Task state like TASK_ERROR are not relaunchable state since it wasn't 
able
    -   * to be validated by Mesos.
    -   */
    -  private def shouldRelaunch(state: MesosTaskState): Boolean = {
    -    state == MesosTaskState.TASK_FAILED ||
    -      state == MesosTaskState.TASK_LOST
    -  }
    +    val (remainingResources, cpuResourcesToUse) =
    +      partitionResources(offer.resources, "cpus", desc.cores)
    +    val (finalResources, memResourcesToUse) =
    +      partitionResources(remainingResources.asJava, "mem", desc.mem)
    +    offer.resources = finalResources.asJava
     
    -  override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): 
Unit = {
    -    val taskId = status.getTaskId.getValue
    -    stateLock.synchronized {
    -      if (launchedDrivers.contains(taskId)) {
    -        if (status.getReason == Reason.REASON_RECONCILIATION &&
    -          !pendingRecover.contains(taskId)) {
    -          // Task has already received update and no longer requires 
reconciliation.
    -          return
    -        }
    -        val state = launchedDrivers(taskId)
    -        // Check if the driver is supervise enabled and can be relaunched.
    -        if (state.driverDescription.supervise && 
shouldRelaunch(status.getState)) {
    -          removeFromLaunchedDrivers(taskId)
    -          state.finishDate = Some(new Date())
    -          val retryState: Option[MesosClusterRetryState] = 
state.driverDescription.retryState
    -          val (retries, waitTimeSec) = retryState
    -            .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, 
rs.waitTime * 2)) }
    -            .getOrElse{ (1, 1) }
    -          val nextRetry = new Date(new Date().getTime + waitTimeSec * 
1000L)
    -
    -          val newDriverDescription = state.driverDescription.copy(
    -            retryState = Some(new MesosClusterRetryState(status, retries, 
nextRetry, waitTimeSec)))
    -          pendingRetryDrivers += newDriverDescription
    -          pendingRetryDriversState.persist(taskId, newDriverDescription)
    -        } else if 
(TaskState.isFinished(mesosToTaskState(status.getState))) {
    -          removeFromLaunchedDrivers(taskId)
    -          state.finishDate = Some(new Date())
    -          if (finishedDrivers.size >= retainedDrivers) {
    -            val toRemove = math.max(retainedDrivers / 10, 1)
    -            finishedDrivers.trimStart(toRemove)
    -          }
    -          finishedDrivers += state
    -        }
    -        state.mesosTaskStatus = Option(status)
    -      } else {
    -        logError(s"Unable to find driver $taskId in status update")
    -      }
    -    }
    +    val appName = desc.conf.get("spark.app.name")
    +    val taskInfo = TaskInfo.newBuilder()
    +      .setTaskId(taskId)
    +      .setName(s"Driver for ${appName}")
    --- End diff --
    
    brackets are redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to