Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5423#discussion_r33467960
  
    --- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryProvider.scala
 ---
    @@ -0,0 +1,1015 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history.yarn
    +
    +import java.io.FileNotFoundException
    +import java.net.URI
    +import java.util.Date
    +import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
    +import java.util.zip.ZipOutputStream
    +
    +import scala.collection.JavaConversions._
    +
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
    +import org.apache.spark.deploy.history.yarn.rest.{JerseyBinding, 
TimelineQueryClient}
    +import org.apache.spark.deploy.history.{ApplicationHistoryInfo, 
ApplicationHistoryProvider, HistoryServer}
    +import org.apache.spark.scheduler.{ApplicationEventListener, 
SparkListenerBus}
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.{SparkException, Logging, SecurityManager, 
SparkConf}
    +
    +/**
    + * A  History provider which reads in the history from
    + * the YARN Timeline Service.
    + *
    + * The service is a remote HTTP service, so failure modes are
    + * different from simple file IO.
    + *
    + * 1. Application listings are asynchronous, and made on a schedule, though
    + * they can be forced (and the schedule disabled).
    + * 2. The results are cached and can be retrieved with 
[[getApplications()]].
    + * 3. The most recent failure of any operation is stored,
    + * The [[getLastFailure()]] call will return the last exception
    + * or `None`. It is shared across threads so is primarily there for
    + * tests and basic diagnostics.
    + * 4. Listing the details of a single application in [[getAppUI()]]
    + * is synchronous and *not* cached.
    + * 5. the [[maybeCheckHealth()]] call performs a health check as the 
initial
    + * binding operation of this instance. This call invokes 
[[TimelineQueryClient.healthCheck()]]
    + * for better diagnostics on binding failures -particularly configuration 
problems.
    + * 6. Every REST call, synchronous or asynchronous, will invoke 
[[maybeCheckHealth()]] until
    + * the health check eventually succeeds.
    + * <p>
    + * If the timeline is  not enabled, the API calls used by the web UI
    + * downgrade gracefully (returning empty entries), rather than fail.
    + * 
    + *
    + * @param sparkConf configuration of the provider
    + */
    +private[spark] class YarnHistoryProvider(sparkConf: SparkConf)
    +  extends ApplicationHistoryProvider with Logging {
    +
    +  /**
    +   * The configuration here is a YarnConfiguration built off the spark 
configuration
    +   * supplied in the constructor; this operation ensures that 
`yarn-default.xml`
    +   * and `yarn-site.xml` are pulled in. Options in the spark conf will 
override
    +   * those in the -default and -site XML resources which are not marked as 
final.
    +   */
    +  private val yarnConf = {
    +    new YarnConfiguration(SparkHadoopUtil.get.newConfiguration(sparkConf))
    +  }
    +
    +  /**
    +   * UI ACL option
    +   */
    +  private val uiAclsEnabled = 
sparkConf.getBoolean("spark.history.ui.acls.enable", false)
    +
    +  private val detailedInfo = 
sparkConf.getBoolean(YarnHistoryProvider.OPTION_DETAILED_INFO, false)
    +  private val NOT_STARTED = "<Not Started>"
    +
    +  /* minimum interval between each check for event log updates */
    +  private val refreshInterval = 
sparkConf.getLong(YarnHistoryProvider.OPTION_MIN_REFRESH_INTERVAL,
    +    YarnHistoryProvider.DEFAULT_MIN_REFRESH_INTERVAL_SECONDS) * 1000
    +
    +  /**
    +   * Window limit in milliseconds
    +   */
    +  private val windowLimitMs = 
sparkConf.getLong(YarnHistoryProvider.OPTION_WINDOW_LIMIT,
    +    YarnHistoryProvider.DEFAULT_WINDOW_LIMIT) * 1000
    +
    +  /**
    +   * Number of events to get
    +   */
    +  private val eventFetchLimit = 
sparkConf.getLong(YarnHistoryProvider.OPTION_EVENT_FETCH_LIMIT,
    +    YarnHistoryProvider.DEFAULT_EVENT_FETCH_LIMIT)
    +
    +  private val eventFetchOption: Option[Long] = if (eventFetchLimit > 0) 
Some(eventFetchLimit) else None
    +
    +  /**
    +   * Start time. Doesn't use the `now` call as tests can subclass that and
    +   * it won't be valid until after the subclass has been constructed
    +   */
    +  val serviceStartTime = System.currentTimeMillis()
    +
    +  /**
    +   * Timeline endpoint URI
    +   */
    +  protected val timelineEndpoint = createTimelineEndpoint()
    +
    +  /**
    +   * The timeline query client which uses the `jersey`
    +   * Jersey instance to talk to a timeline service running
    +   * at [[timelineEndpoint]], and creates a timeline (write) client 
instance
    +   * to handle token renewal
    +   *
    +   */
    +  protected val timelineQueryClient = {
    +    createTimelineQueryClient()
    +  }
    +
    +
    +  /**
    +   * Override point: create the timeline endpoint
    +   * @return a URI to the timeline web service
    +   */
    +  protected def createTimelineEndpoint(): URI = {
    +    getTimelineEndpoint(yarnConf)
    +  }
    +
    +  /**
    +   * Override point: create the timeline query client.
    +   * This is called during instance creation.
    +   * @return a timeline query client ot use for the duration
    +   *         of this instance
    +   */
    +  protected def createTimelineQueryClient(): TimelineQueryClient = {
    +    new TimelineQueryClient(timelineEndpoint, yarnConf, 
JerseyBinding.createClientConfig())
    +  }
    +
    +  /**
    +   * The empty listing, with a timestamp to indicate that the listing
    +   * has never taken place.
    +   */
    +  private val emptyListing = new ApplicationListingResults(0, Nil, None)
    +
    +  /**
    +   * List of applications. Initial result is empty
    +   */
    +  private var applications: ApplicationListingResults = emptyListing
    +  
    +  /**
    +   * Last exception seen and when
    +   */
    +  protected var lastFailureCause: Option[(Throwable, Date)] = None
    +
    +  private val refreshCount = new AtomicLong(0)
    +  private val refreshFailedCount = new AtomicLong(0)
    +
    +  /**
    +   * Health marker
    +   */
    +  private val healthy = new AtomicBoolean(false)
    +
    +  /**
    +   * Enabled flag
    +   */
    +  private val _enabled = timelineServiceEnabled(yarnConf)
    +
    +  /**
    +   * Atomic boolean used to signal to the refresh thread that it
    +   * must exit its loop.
    +   */
    +  private val stopRefresh = new AtomicBoolean(false)
    +
    +  /**
    +   * refresher
    +   */
    +  val refresher = new Refresher()
    +
    +  /**
    +   * Initialize the provider
    +   */
    +  init()
    +
    +  /**
    +   * Check the configuration and log whether or not it is enabled;
    +   * if it is enabled then the URL is logged too.
    +   */
    +  private def init(): Unit = {
    +    if (!enabled) {
    +      logError(YarnHistoryProvider.TEXT_SERVICE_DISABLED)
    +    } else {
    +      logInfo(YarnHistoryProvider.TEXT_SERVICE_ENABLED)
    +      logInfo(YarnHistoryProvider.KEY_SERVICE_URL + ": " + 
timelineEndpoint)
    +      logDebug(sparkConf.toDebugString)
    +      // get the thread time
    +      logInfo(s"refresh interval $refreshInterval milliseconds")
    +      if (refreshInterval < 0) {
    +        throw new 
Exception(YarnHistoryProvider.TEXT_INVALID_UPDATE_INTERVAL +
    +            s": ${refreshInterval/1000}")
    +      }
    +      startRefreshThread()
    +    }
    +  }
    +
    +
    +  /**
    +   * Stop the service. After this point operations will fail.
    +   */
    +  override def stop(): Unit = {
    +    logDebug(s"Stopping $this")
    +    // attempt to stop the refresh thread
    +    if (!stopRefreshThread()) {
    +      closeQueryClient()
    +    }
    +
    +  }
    +
    +  /**
    +   * Close the query client
    +   */
    +  def closeQueryClient(): Unit = {
    +    // and otherwise, stop the query client
    +    logDebug("Stopping Timeline client")
    +    timelineQueryClient.close()
    +  }
    +
    +  /**
    +   * Is the timeline service (and therefore this provider) enabled.
    +   * (override point for tests).
    +   *
    +   * Important: this is called during construction, so test-time subclasses
    +   * will be invoked before their own construction has taken place.
    +   * Code appropriately.
    +   * @return true if the provider/YARN configuration enables the timeline
    +   *         service.
    +   */
    +  def enabled: Boolean = {
    +    _enabled
    +  }
    +  
    +  /**
    +   * Get the timeline query client. Used internally to ease testing
    +   * @return the client.
    +   */
    +  def getTimelineQueryClient(): TimelineQueryClient = {
    +    timelineQueryClient
    +  }
    +
    +  /**
    +   * Set the last exception
    +   * @param ex exception seen
    +   */
    +  private def setLastFailure(ex: Throwable): Unit = {
    +    setLastFailure(ex, now())
    +  }
    +
    +  /**
    +   * Set the last exception
    +   * @param ex exception seen
    +   * @param timestamp the timestamp of the failure
    +   */
    +  private def setLastFailure(ex: Throwable, timestamp: Long): Unit = {
    +    this.synchronized {
    +      lastFailureCause = Some(ex, new Date(timestamp))
    +    }
    +  }
    +
    +  /**
    +   * Reset the failure info
    +   */
    +  private def resetLastFailure(): Unit = {
    +    this.synchronized {
    +      lastFailureCause = None
    +    }
    +  }
    +
    +  /**
    +   * Get the last exception
    +   * @return the last exception or  null
    +   */
    +  def getLastFailure(): Option[(Throwable, Date)] = {
    +    this.synchronized {
    +      lastFailureCause
    +    }
    +  }
    +
    +  /**
    +   * Query for the connection being healthy
    +   * @return
    +   */
    +  def isHealthy(): Boolean = {
    +    healthy.get()
    +  }
    +
    +  /**
    +   * Get that the health flag itself. This allows test code to initialize 
it properly.
    +   * Also: if accessed and set to false, it will trigger another health 
chek.
    +   * @return
    +   */
    +  protected def getHealthFlag(): AtomicBoolean = {
    +    healthy;
    +  }
    +
    +  /**
    +   * Thread safe accessor to application list
    +   * @return
    +   */
    +  def getApplications(): ApplicationListingResults = {
    +    this.synchronized(applications)
    +  }
    +
    +  /**
    +   * Thread safe call to update the application results
    +   * @param newVal new value
    +   */
    +  protected def setApplications(newVal: ApplicationListingResults): Unit = 
{
    +    this.synchronized {
    +      applications = newVal
    +    }
    +  }
    +
    +  /**
    +   * Health check to call before any other operation is attempted.
    +   * This is atomic, using the `healthy` flag to check.
    +   * If the endpoint is considered unhealthy then the healthy flag
    +   * is reset to false and an exception thrown.
    +   * @return true if the health check took place
    +   */
    +  protected def maybeCheckHealth(): Boolean = {
    +    val h = getHealthFlag();
    +    if (!h.getAndSet(true)) {
    +      val client = getTimelineQueryClient()
    +      try {
    +        client.healthCheck()
    +        true
    +      } catch {
    +        case e: Exception =>
    +          // failure
    +          logWarning(s"Health check of $client failed", e)
    +          setLastFailure(e)
    +          // reset health so another caller may attempt it.
    +          h.set(false)
    +          // propagate the failure
    +          throw e;
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Start the refresh thread with the given interval.
    +   *
    +   * When this thread exits, it will close the `timelineQueryClient`
    +   * instance
    +   */
    +  def startRefreshThread(): Unit = {
    +    logInfo(s"Starting timeline refresh thread")
    +    val thread = new Thread(refresher, s"YarnHistoryProvider Refresher")
    +    thread.setDaemon(true)
    +    refresher.start(thread)
    +  }
    +
    +  /**
    +   * Stop the refresh thread if there is one.
    +   *
    +   * This does not guarantee an immediate halt to the thread.
    +   * @return true if there was a refresh thread to stop
    +   */
    +  def stopRefreshThread(): Boolean = {
    +    refresher.stopRefresher()
    +  }
    +
    +  /**
    +   * Probe for the refresh thread running
    +   * @return true if the refresh thread has been created and is still alive
    +   */
    +  def isRefreshThreadRunning(): Boolean = {
    +    refresher.isRunning()
    +  }
    +
    +  def getRefreshCount(): Long = { refreshCount.get() }
    +  def getRefreshFailedCount(): Long = { refreshFailedCount.get() }
    +
    +  /**
    +   * List applications.
    +   * <p>
    +   * If the timeline is not enabled, returns an empty list
    +   * @return  the result of the last successful listing operation,
    +   *          or the `emptyListing` result if no listing has yet been 
successful
    +   */
    +   def listApplications(limit: Option[Long] = None,
    +      windowStart: Option[Long] = None,
    +      windowEnd: Option[Long] = None): ApplicationListingResults = {
    +    if (!enabled) {
    +      // Timeline is disabled: return the empty listing
    +      return emptyListing
    +    }
    +    try {
    +      maybeCheckHealth()
    +      val client = getTimelineQueryClient()
    +      logInfo(s"getListing from: $client")
    +      // get the timestamp after any health check
    +      val timestamp = now()
    +      val timelineEntities =
    +        client.listEntities(YarnHistoryService.SPARK_EVENT_ENTITY_TYPE,
    +          windowStart = windowStart,
    +          windowEnd = windowEnd,
    +          limit = limit)
    +
    +      val listing = timelineEntities.flatMap { en =>
    +        try {
    +          val historyInfo = toApplicationHistoryInfo(en)
    +          
logDebug(s"${YarnTimelineUtils.describeApplicationHistoryInfo(historyInfo)}")
    +          Some(historyInfo)
    +        } catch {
    +          case e: Exception =>
    +            logWarning(s"Failed to parse entity. 
${YarnTimelineUtils.describeEntity(en) }", e)
    +            // skip this result
    +            None
    +        }
    +      }
    +      val incomplete = countIncompleteApplications(listing)
    +      logInfo(s"Found ${listing.size} applications: " +
    +          s"${listing.size-incomplete} complete and $incomplete 
incomplete")
    +      new ApplicationListingResults(timestamp, listing, None)
    +    } catch {
    +      case e: Exception =>
    +        logWarning(s"Failed to list entities from $timelineEndpoint", e)
    +        new ApplicationListingResults(now(), Nil, Some(e))
    +    }
    +  }
    +
    +  /**
    +   * List applications. 
    +   *
    +   * Also updates the cached values of the listing/last failure, depending
    +   * upon the outcome
    +   * If the timeline is  not enabled, returns an empty list
    +   * @param startup a flag to indicate this is the startup retrieval with 
different window policy
    +   * @return List of all known applications.
    +   */
    +  def listAndCacheApplications(startup: Boolean): 
ApplicationListingResults = {
    +    refreshCount.incrementAndGet()
    +    val history = getApplications().applications
    +
    +    val current = now()
    +    // work out the (exclusive) start of the new window
    +    val nextWindowStart = findStartOfWindow(history) match {
    +        // no window.
    +      case None => None
    +
    +      case Some(h) =>
    +        // inclusive on the one retrieved last time.
    +        // Why? we need to include the oldest incomplete entry in our range
    +        val inclusiveWindow = startTime(h) - 1
    +        // sanity check on window size
    +        val earliestWindow = if (windowLimitMs > 0) current - 
windowLimitMs else 0
    +        Some(Math.max(earliestWindow, inclusiveWindow))
    +    }
    +
    +    val results = listApplications(windowStart = nextWindowStart)
    +    this.synchronized {
    +      if (results.succeeded) {
    +        // on a success, the existing application list is merged
    +        // creating a new aggregate application list
    +        logDebug(s"Listed application count: ${results.size}")
    +        val merged = combineResults(history, results.applications)
    +        logDebug(s"Existing count: ${history.size}; merged = 
${merged.size} ")
    +        val sorted = sortApplicationsByStartTime(merged)
    +        // and a final result
    +        setApplications(new ApplicationListingResults(
    +          results.timestamp,
    +          sorted,
    +          None))
    +        resetLastFailure()
    +      } else {
    +        // on a failure, the failure cause is updated
    +        setLastFailure(results.failureCause.get, results.timestamp)
    +        // and the failure counter
    +        refreshFailedCount.incrementAndGet()
    +      }
    +    }
    +    results
    +  }
    +
    +  /**
    +   * List applications. This currently finds completed applications only.
    +   * 
    +   * If the timeline is  not enabled, returns an empty list
    +   * @return List of all known applications.
    +   */
    +  override def getListing(): Seq[ApplicationHistoryInfo] = {
    +    // get the current list
    +    val listing = getApplications().applications
    +    // and queue another refresh
    +    triggerRefresh()
    +    listing
    +  }
    +
    +  /**
    +   * Trigger a refresh
    +   */
    +  def triggerRefresh(): Unit = {
    +    refresher.refresh(now())
    +  }
    +
    +  /**
    +   * Return the current time
    +   * @return
    +   */
    +  def now(): Long = {
    +    System.currentTimeMillis()
    +  }
    +
    +  /**
    +   * Get the last refresh attempt (Which may or may not be successful)
    +   * @return the last refresh time
    +   */
    +  def getLastRefreshAttemptTime(): Long = {
    +    refresher.lastRefreshAttemptTime
    +  }
    +  
    +  /**
    +   * Look up the timeline entity
    +   * @param appId application ID
    +   * @return the entity associated with the given application
    +   * @throws FileNotFoundException if no entry was found
    +   */
    +  def getTimelineEntity(appId: String): TimelineEntity = {
    +    logDebug(s"GetTimelineEntity $appId")
    +    maybeCheckHealth()
    +    
getTimelineQueryClient().getEntity(YarnHistoryService.SPARK_EVENT_ENTITY_TYPE, 
appId)
    +  }
    +
    +
    +  /**
    +   * Returns the Spark UI for a specific application.
    +   * <p>
    +   * If the timeline is  not enabled, returns `None`
    +   * @param appId The application ID.
    +   * @param attemptId The application attempt ID (or `None` if there is no 
attempt ID).
    +   * @return The application's UI, or `None` if application is not found.
    +   */
    +  override def getAppUI(appId: String, attemptId: Option[String]): 
Option[SparkUI] = {
    +    getAppUI(appId)
    +  }
    +
    +  /**
    +   * Build the application UI for an application
    +   * <p>
    +   * If the timeline is  not enabled, returns `None`
    +   * @param appId The application ID.
    +   * @return The application's UI, or `None` if application is not found.
    +   */
    +  def getAppUI(appId: String): Option[SparkUI] = {
    +    logDebug(s"Request UI with appId $appId")
    +    if (!enabled) {
    +      // Timeline is disabled: return nothing
    +      return None
    +    }
    +    maybeCheckHealth()
    +    try {
    +      val entity = getTimelineEntity(appId)
    +
    +      if (log.isDebugEnabled) {
    +        logDebug(describeEntity(entity))
    +      }
    +      val bus = new SparkListenerBus() {}
    +      val appListener = new ApplicationEventListener()
    +      bus.addListener(appListener)
    +
    +      val ui = {
    +        val conf = this.sparkConf.clone()
    +        val appSecManager = new SecurityManager(conf)
    +        SparkUI.createHistoryUI(conf, bus, appSecManager, appId,
    +                                 HistoryServer.UI_PATH_PREFIX + s"/${appId 
}", entity.getStartTime)
    +      }
    +      val events = entity.getEvents
    +      logInfo(s"App $appId history contains ${events.size()} events")
    +
    +      events.reverse.foreach { event =>
    +        val sparkEvent = toSparkEvent(event)
    +        logDebug(s" event ${sparkEvent.toString }")
    +        bus.postToAll(sparkEvent)
    +      }
    +      ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED) } 
($appId)")
    +
    +      ui.getSecurityManager.setAcls(uiAclsEnabled)
    +      // make sure to set admin acls before view acls so they are properly 
picked up
    +      
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
    +      
ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
    +                                         
appListener.viewAcls.getOrElse(""))
    +      Some(ui)
    +    } catch {
    +      case e: FileNotFoundException =>
    +        logInfo(s"Unknown application $appId", e)
    +        setLastFailure(e)
    +        None
    +      case e: Exception =>
    +        logWarning(s"Failed to get attempt information for $appId", e)
    +        setLastFailure(e)
    +        None
    +    }
    +  }
    +
    +  /**
    +   * Get configuration information for the Web UI
    +   * @return A map with the configuration data. Data is shown in the order 
returned by the map.
    +   */
    +  override def getConfig(): Map[String, String] = {
    +    val timelineURI = getEndpointURI()
    +    logDebug(s"getConfig $timelineURI")
    +    this.synchronized {
    +      val applications = getApplications()
    +      val failure = getLastFailure()
    +      var state = Map(
    +        YarnHistoryProvider.KEY_PROVIDER_NAME -> "Apache Hadoop YARN 
Timeline Service",
    +        YarnHistoryProvider.KEY_START_TIME ->
    +            humanDateCurrentTZ(serviceStartTime, "(not started)"),
    +        YarnHistoryProvider.KEY_SERVICE_URL -> s"$timelineURI",
    +        YarnHistoryProvider.KEY_ENABLED ->
    +           (if (enabled) YarnHistoryProvider.TEXT_SERVICE_ENABLED
    +            else YarnHistoryProvider.TEXT_SERVICE_DISABLED),
    +        YarnHistoryProvider.KEY_LAST_UPDATED -> applications.updated,
    +        YarnHistoryProvider.KEY_CURRENT_TIME -> humanDateCurrentTZ(now(), 
"unknown")
    +      )
    +      // in a secure cluster, list the user name
    +      if (UserGroupInformation.isSecurityEnabled) {
    +        state = state +
    +            (YarnHistoryProvider.KEY_USERNAME -> 
UserGroupInformation.getCurrentUser.getUserName)
    +
    +      }
    +
    +      // on a failure, add failure specifics to the operations
    +      failure match {
    +        case Some((ex , date)) =>
    +          state = state ++
    +            Map(
    +              YarnHistoryProvider.KEY_LAST_FAILURE_TIME ->
    +                humanDateCurrentTZ(date.getTime, 
YarnHistoryProvider.TEXT_NEVER_UPDATED),
    +              YarnHistoryProvider.KEY_LAST_FAILURE -> ex.toString)
    +        case None =>
    +          // nothing
    +      }
    +      // add detailed information if enabled
    +      if (detailedInfo) {
    +        state = state ++ Map(
    +          YarnHistoryProvider.KEY_TOKEN_RENEWAL ->
    +            humanDateCurrentTZ(timelineQueryClient.lastTokenRenewal,
    +              YarnHistoryProvider.TEXT_NEVER_UPDATED),
    +          YarnHistoryProvider.KEY_TOKEN_RENEWAL_COUNT ->
    +            timelineQueryClient.tokenRenewalCount.toString,
    +          YarnHistoryProvider.KEY_TO_STRING -> s"$this",
    +          YarnHistoryProvider.KEY_MIN_REFRESH_INTERVAL -> 
refreshInterval.toString,
    +          YarnHistoryProvider.KEY_EVENT_FETCH_LIMIT -> 
eventFetchLimit.toString
    +        
    +        )
    +      }
    +      state
    +    }
    +
    +  }
    +
    +  def getEndpointURI(): URI = {
    +    timelineEndpoint.resolve("/")
    +  }
    +
    +  /**
    +   * Stub implementation of the "write event logs" operation, which isn't 
supported
    +   * by the timeline service
    +   * @throws SparkException always
    +   */
    +  override def writeEventLogs(appId: String, attemptId: Option[String],
    +      zipStream: ZipOutputStream): Unit = {
    +    throw new SparkException("Unsupported Feature")
    +  }
    +
    +  override def toString(): String = {
    +    s"YarnHistoryProvider bound to history server at $timelineEndpoint," +
    +    s" enabled = $enabled;" +
    +    s" refresh count = ${getRefreshCount()}; failed count = 
${getRefreshFailedCount()};" +
    +    s" last update ${applications.updated};" +
    +    s" history size ${applications.size};" +
    +    s" ${refresher}"
    +  }
    +
    +  /**
    +   * Comparison function that defines the sort order for the application 
listing.
    +   *
    +   * @return Whether `i1` should precede `i2`.
    +   */
    +  private def compareAppInfo(
    +      i1: ApplicationHistoryInfo,
    +      i2: ApplicationHistoryInfo): Boolean = {
    +    val a1 = i1.attempts.head
    +    val a2 = i2.attempts.head
    +    if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else 
a1.startTime >= a2.startTime
    +  }
    +
    +
    +  /**
    +   * This is the implementation of the triggered refresh logic.
    +   * It awaits events
    +   */
    +
    +  private[spark] class Refresher extends Runnable {
    +
    +    sealed trait RefreshActions;
    +    /** start the refresh **/
    +    case class Start() extends RefreshActions;
    +    /** refresh requested at the given time */
    +    case class RefreshRequest(time: Long) extends RefreshActions;
    +    /** stop */
    +    case class StopExecution() extends RefreshActions;
    +
    +    private val queue = new LinkedBlockingQueue[RefreshActions]()
    +    private val running = new AtomicBoolean(false)
    +    private var self: Thread = _
    +    private val _lastRefreshAttemptTime = new AtomicLong(0)
    +    private val _messagesProcessed = new AtomicLong(0)
    +    private val _refreshesExecuted = new AtomicLong(0)
    +
    +    /**
    +     * Bond to the thread then start it
    +     * @param t thread
    +     */
    +    def start(t: Thread) {
    +      this.synchronized {
    +        self = t;
    +        running.set(true)
    +        queue.add(Start())
    +        t.start()
    +      }
    +    }
    +
    +    /**
    +     * Request a refresh. If the request queue is empty, a refresh request
    +     * is queued.
    +     * @param time time request was made
    +     */
    +    def refresh(time: Long): Unit = {
    +      if (queue.isEmpty) {
    +        queue.add(RefreshRequest(time))
    +      }
    +    }
    +
    +    /**
    +     * Stop operation.
    +     * @return true if the stop was scheduled
    +     */
    +    def stopRefresher(): Boolean = {
    +      this.synchronized {
    +        if (isRunning()) {
    +          // yes, more than one stop may get issued. but it will
    +          // replace the previous one.
    +          queue.clear()
    +          queue.add(StopExecution())
    +          self.interrupt()
    +          true
    +        } else {
    +          false
    +        }
    +      }
    +    }
    +
    +    /**
    +     * Thread routine
    +     */
    +    override def run(): Unit = {
    +      try {
    +        var stopped = false;
    +        while (!stopped) {
    +          take match {
    --- End diff --
    
    I'd have agreed with you about the over-engineering until about 10 days 
ago. It's need becomes apparent once you have a few hundred applications in the 
history. While the windowed GET reduces the amount of data to be marshalled, 
you are still making regular requests and asking for the JSON back. And, as 
those updates need to include all incomplete applications (how else do you know 
they've finished), old-yet-incomplete applications can lead to large windows.  
The implementation in this patch doesn't have any scheduled load at all, except 
at startup.
    
    I'm not a great fan of the UX (the refresh is triggered by the web UI but 
isn't immediate), but don't have a good solution for scale on large YARN 
clusters without going near the web UI & again, adding a refresh meta-header if 
hinted by the history provider.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to