Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/17031#discussion_r103281854 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -582,141 +688,33 @@ private[spark] class MesosClusterScheduler( } } - override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { - logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}") - val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() - val currentTime = new Date() - - val currentOffers = offers.asScala.map { - o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList) - }.toList - - stateLock.synchronized { - // We first schedule all the supervised drivers that are ready to retry. - // This list will be empty if none of the drivers are marked as supervise. - val driversToRetry = pendingRetryDrivers.filter { d => - d.retryState.get.nextRetry.before(currentTime) - } - - scheduleTasks( - copyBuffer(driversToRetry), - removeFromPendingRetryDrivers, - currentOffers, - tasks) - - // Then we walk through the queued drivers and try to schedule them. - scheduleTasks( - copyBuffer(queuedDrivers), - removeFromQueuedDrivers, - currentOffers, - tasks) - } - tasks.foreach { case (offerId, taskInfos) => - driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava) - } - - for (o <- currentOffers if !tasks.contains(o.offerId)) { - driver.declineOffer(o.offerId) - } - } - - private def copyBuffer( - buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = { - val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size) - buffer.copyToBuffer(newBuffer) - newBuffer - } - - def getSchedulerState(): MesosClusterSchedulerState = { - stateLock.synchronized { - new MesosClusterSchedulerState( - frameworkId, - masterInfo.map(m => s"http://${m.getIp}:${m.getPort}"), - copyBuffer(queuedDrivers), - launchedDrivers.values.map(_.copy()).toList, - finishedDrivers.map(_.copy()).toList, - copyBuffer(pendingRetryDrivers)) - } - } - - override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {} - override def disconnected(driver: SchedulerDriver): Unit = {} - override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = { - logInfo(s"Framework re-registered with master ${masterInfo.getId}") - } - override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {} - override def error(driver: SchedulerDriver, error: String): Unit = { - logError("Error received: " + error) - markErr() - } + private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = { + val taskId = TaskID.newBuilder().setValue(desc.submissionId).build() - /** - * Check if the task state is a recoverable state that we can relaunch the task. - * Task state like TASK_ERROR are not relaunchable state since it wasn't able - * to be validated by Mesos. - */ - private def shouldRelaunch(state: MesosTaskState): Boolean = { - state == MesosTaskState.TASK_FAILED || - state == MesosTaskState.TASK_LOST - } + val (remainingResources, cpuResourcesToUse) = + partitionResources(offer.resources, "cpus", desc.cores) + val (finalResources, memResourcesToUse) = + partitionResources(remainingResources.asJava, "mem", desc.mem) + offer.resources = finalResources.asJava - override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = { - val taskId = status.getTaskId.getValue - stateLock.synchronized { - if (launchedDrivers.contains(taskId)) { - if (status.getReason == Reason.REASON_RECONCILIATION && - !pendingRecover.contains(taskId)) { - // Task has already received update and no longer requires reconciliation. - return - } - val state = launchedDrivers(taskId) - // Check if the driver is supervise enabled and can be relaunched. - if (state.driverDescription.supervise && shouldRelaunch(status.getState)) { - removeFromLaunchedDrivers(taskId) - state.finishDate = Some(new Date()) - val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState - val (retries, waitTimeSec) = retryState - .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) } - .getOrElse{ (1, 1) } - val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L) - - val newDriverDescription = state.driverDescription.copy( - retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec))) - pendingRetryDrivers += newDriverDescription - pendingRetryDriversState.persist(taskId, newDriverDescription) - } else if (TaskState.isFinished(mesosToTaskState(status.getState))) { - removeFromLaunchedDrivers(taskId) - state.finishDate = Some(new Date()) - if (finishedDrivers.size >= retainedDrivers) { - val toRemove = math.max(retainedDrivers / 10, 1) - finishedDrivers.trimStart(toRemove) - } - finishedDrivers += state - } - state.mesosTaskStatus = Option(status) - } else { - logError(s"Unable to find driver $taskId in status update") - } - } + val appName = desc.conf.get("spark.app.name") + val taskInfo = TaskInfo.newBuilder() + .setTaskId(taskId) + .setName(s"Driver for ${appName}") --- End diff -- brackets are redundant.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org