Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6870#discussion_r33731682
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala 
---
    @@ -0,0 +1,175 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark._
    +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
    +
    +/**
    + * ExecutorBlacklistTracker blacklists the executors by tracking the 
status of running tasks with
    + * heuristic algorithm.
    + *
    + * A executor will be considered bad enough only when:
    + * 1. The failure task number on this executor is more than
    + *    spark.scheduler.blacklist.executorFaultThreshold.
    + * 2. The failure task number on this executor is
    + *    spark.scheduler.blacklist.averageBlacklistThreshold more than 
average failure task number
    + *    of this cluster.
    + *
    + * Also max number of blacklisted executors will not exceed the
    + * spark.scheduler.blacklist.maxBlacklistFraction of whole cluster, and 
blacklisted executors
    + * will be forgiven when there is no failure tasks in the
    + * spark.scheduler.blacklist.executorFaultTimeoutWindowInMinutes.
    + */
    +private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends 
SparkListener {
    +  import ExecutorBlacklistTracker._
    +
    +  private val maxBlacklistFraction = conf.getDouble(
    +    "spark.scheduler.blacklist.maxBlacklistFraction", 
MAX_BLACKLIST_FRACTION)
    +  private val avgBlacklistThreshold = conf.getDouble(
    +    "spark.scheduler.blacklist.averageBlacklistThreshold", 
AVERAGE_BLACKLIST_THRESHOLD)
    +  private val executorFaultThreshold = conf.getInt(
    +    "spark.scheduler.blacklist.executorFaultThreshold", 
EXECUTOR_FAULT_THRESHOLD)
    +  private val executorFaultTimeoutWindowInMinutes = conf.getInt(
    +    "spark.scheduler.blacklist.executorFaultTimeoutWindowInMinutes", 
EXECUTOR_FAULT_TIMEOUT_WINDOW)
    +
    +  // Count the number of executors registered
    +  var numExecutorsRegistered: Int = 0
    +
    +  // Track the number of failure tasks and time of latest failure to 
executor id
    +  val executorIdToTaskFailures = new mutable.HashMap[String, 
ExecutorFailureStatus]()
    +
    +  // Clock used to update and exclude the executors which are out of time 
window.
    +  private var clock: Clock = new SystemClock()
    +
    +  // Executor that handles the scheduling task
    +  private val executor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +    "spark-scheduler-blacklist-expire-timer")
    +
    +  def start(): Unit = {
    +    val scheduleTask = new Runnable() {
    +      override def run(): Unit = {
    +        Utils.logUncaughtExceptions(expireTimeoutExecutorBlacklist())
    +      }
    +    }
    +    executor.scheduleAtFixedRate(scheduleTask, 0L, 60, TimeUnit.SECONDS)
    +  }
    +
    +  def stop(): Unit = {
    +    executor.shutdown()
    +    executor.awaitTermination(10, TimeUnit.SECONDS)
    +  }
    +
    +  def setClock(newClock: Clock): Unit = {
    +    clock = newClock
    +  }
    +
    +  def getExecutorBlacklist: Set[String] = synchronized {
    +    executorIdToTaskFailures.filter(_._2.isBlackListed).keys.toSet
    +  }
    +
    +  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = 
synchronized {
    +    taskEnd.reason match {
    +      case _: FetchFailed | _: ExceptionFailure | TaskResultLost |
    --- End diff --
    
    more random thoughts from me -- I have this nagging worry about building 
functionality on top of `SparkListener` since the listener bus is free to just 
drop events if it gets backlogged.  Even if this listener is fast, as we start 
adding more of these listeners, plus maybe a user-listener which is a little 
slow, and then lots of small tasks ... even if each listener individually is 
fine, you could drop events and things will start behaving erratically.
    
    But maybe the right answer to that is to fix the listener bus to include 
back pressure or something in the future, and for now we keep using it to build 
components on top of.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to