Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6870#discussion_r33729664
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorBlacklistTracker.scala 
---
    @@ -0,0 +1,175 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark._
    +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
    +
    +/**
    + * ExecutorBlacklistTracker blacklists the executors by tracking the 
status of running tasks with
    + * heuristic algorithm.
    + *
    + * A executor will be considered bad enough only when:
    + * 1. The failure task number on this executor is more than
    + *    spark.scheduler.blacklist.executorFaultThreshold.
    + * 2. The failure task number on this executor is
    + *    spark.scheduler.blacklist.averageBlacklistThreshold more than 
average failure task number
    + *    of this cluster.
    + *
    + * Also max number of blacklisted executors will not exceed the
    + * spark.scheduler.blacklist.maxBlacklistFraction of whole cluster, and 
blacklisted executors
    + * will be forgiven when there is no failure tasks in the
    + * spark.scheduler.blacklist.executorFaultTimeoutWindowInMinutes.
    + */
    +private[spark] class ExecutorBlacklistTracker(conf: SparkConf) extends 
SparkListener {
    +  import ExecutorBlacklistTracker._
    +
    +  private val maxBlacklistFraction = conf.getDouble(
    +    "spark.scheduler.blacklist.maxBlacklistFraction", 
MAX_BLACKLIST_FRACTION)
    +  private val avgBlacklistThreshold = conf.getDouble(
    +    "spark.scheduler.blacklist.averageBlacklistThreshold", 
AVERAGE_BLACKLIST_THRESHOLD)
    +  private val executorFaultThreshold = conf.getInt(
    +    "spark.scheduler.blacklist.executorFaultThreshold", 
EXECUTOR_FAULT_THRESHOLD)
    +  private val executorFaultTimeoutWindowInMinutes = conf.getInt(
    +    "spark.scheduler.blacklist.executorFaultTimeoutWindowInMinutes", 
EXECUTOR_FAULT_TIMEOUT_WINDOW)
    +
    +  // Count the number of executors registered
    +  var numExecutorsRegistered: Int = 0
    +
    +  // Track the number of failure tasks and time of latest failure to 
executor id
    +  val executorIdToTaskFailures = new mutable.HashMap[String, 
ExecutorFailureStatus]()
    +
    +  // Clock used to update and exclude the executors which are out of time 
window.
    +  private var clock: Clock = new SystemClock()
    +
    +  // Executor that handles the scheduling task
    +  private val executor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +    "spark-scheduler-blacklist-expire-timer")
    +
    +  def start(): Unit = {
    +    val scheduleTask = new Runnable() {
    +      override def run(): Unit = {
    +        Utils.logUncaughtExceptions(expireTimeoutExecutorBlacklist())
    +      }
    +    }
    +    executor.scheduleAtFixedRate(scheduleTask, 0L, 60, TimeUnit.SECONDS)
    +  }
    +
    +  def stop(): Unit = {
    +    executor.shutdown()
    +    executor.awaitTermination(10, TimeUnit.SECONDS)
    +  }
    +
    +  def setClock(newClock: Clock): Unit = {
    +    clock = newClock
    +  }
    +
    +  def getExecutorBlacklist: Set[String] = synchronized {
    +    executorIdToTaskFailures.filter(_._2.isBlackListed).keys.toSet
    --- End diff --
    
    this gets called a lot, and (hopefully) is updated only rarely.  It should 
probably be computed only when it changes, and then stored.  (Also I think the 
stored value could probably just be `@volatile`, you wouldn't need to 
synchronize ... but I'm not 100% sure ...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to