Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5423#discussion_r33467225
  
    --- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryProvider.scala
 ---
    @@ -0,0 +1,1015 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history.yarn
    +
    +import java.io.FileNotFoundException
    +import java.net.URI
    +import java.util.Date
    +import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
    +import java.util.zip.ZipOutputStream
    +
    +import scala.collection.JavaConversions._
    +
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
    +import org.apache.spark.deploy.history.yarn.rest.{JerseyBinding, 
TimelineQueryClient}
    +import org.apache.spark.deploy.history.{ApplicationHistoryInfo, 
ApplicationHistoryProvider, HistoryServer}
    +import org.apache.spark.scheduler.{ApplicationEventListener, 
SparkListenerBus}
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.{SparkException, Logging, SecurityManager, 
SparkConf}
    +
    +/**
    + * A  History provider which reads in the history from
    + * the YARN Timeline Service.
    + *
    + * The service is a remote HTTP service, so failure modes are
    + * different from simple file IO.
    + *
    + * 1. Application listings are asynchronous, and made on a schedule, though
    + * they can be forced (and the schedule disabled).
    + * 2. The results are cached and can be retrieved with 
[[getApplications()]].
    + * 3. The most recent failure of any operation is stored,
    + * The [[getLastFailure()]] call will return the last exception
    + * or `None`. It is shared across threads so is primarily there for
    + * tests and basic diagnostics.
    + * 4. Listing the details of a single application in [[getAppUI()]]
    + * is synchronous and *not* cached.
    + * 5. the [[maybeCheckHealth()]] call performs a health check as the 
initial
    + * binding operation of this instance. This call invokes 
[[TimelineQueryClient.healthCheck()]]
    + * for better diagnostics on binding failures -particularly configuration 
problems.
    + * 6. Every REST call, synchronous or asynchronous, will invoke 
[[maybeCheckHealth()]] until
    + * the health check eventually succeeds.
    + * <p>
    + * If the timeline is  not enabled, the API calls used by the web UI
    + * downgrade gracefully (returning empty entries), rather than fail.
    + * 
    + *
    + * @param sparkConf configuration of the provider
    + */
    +private[spark] class YarnHistoryProvider(sparkConf: SparkConf)
    +  extends ApplicationHistoryProvider with Logging {
    +
    +  /**
    +   * The configuration here is a YarnConfiguration built off the spark 
configuration
    +   * supplied in the constructor; this operation ensures that 
`yarn-default.xml`
    +   * and `yarn-site.xml` are pulled in. Options in the spark conf will 
override
    +   * those in the -default and -site XML resources which are not marked as 
final.
    +   */
    +  private val yarnConf = {
    +    new YarnConfiguration(SparkHadoopUtil.get.newConfiguration(sparkConf))
    +  }
    +
    +  /**
    +   * UI ACL option
    +   */
    +  private val uiAclsEnabled = 
sparkConf.getBoolean("spark.history.ui.acls.enable", false)
    +
    +  private val detailedInfo = 
sparkConf.getBoolean(YarnHistoryProvider.OPTION_DETAILED_INFO, false)
    +  private val NOT_STARTED = "<Not Started>"
    +
    +  /* minimum interval between each check for event log updates */
    +  private val refreshInterval = 
sparkConf.getLong(YarnHistoryProvider.OPTION_MIN_REFRESH_INTERVAL,
    +    YarnHistoryProvider.DEFAULT_MIN_REFRESH_INTERVAL_SECONDS) * 1000
    +
    +  /**
    +   * Window limit in milliseconds
    +   */
    +  private val windowLimitMs = 
sparkConf.getLong(YarnHistoryProvider.OPTION_WINDOW_LIMIT,
    +    YarnHistoryProvider.DEFAULT_WINDOW_LIMIT) * 1000
    +
    +  /**
    +   * Number of events to get
    +   */
    +  private val eventFetchLimit = 
sparkConf.getLong(YarnHistoryProvider.OPTION_EVENT_FETCH_LIMIT,
    +    YarnHistoryProvider.DEFAULT_EVENT_FETCH_LIMIT)
    +
    +  private val eventFetchOption: Option[Long] = if (eventFetchLimit > 0) 
Some(eventFetchLimit) else None
    +
    +  /**
    +   * Start time. Doesn't use the `now` call as tests can subclass that and
    +   * it won't be valid until after the subclass has been constructed
    +   */
    +  val serviceStartTime = System.currentTimeMillis()
    +
    +  /**
    +   * Timeline endpoint URI
    +   */
    +  protected val timelineEndpoint = createTimelineEndpoint()
    +
    +  /**
    +   * The timeline query client which uses the `jersey`
    +   * Jersey instance to talk to a timeline service running
    +   * at [[timelineEndpoint]], and creates a timeline (write) client 
instance
    +   * to handle token renewal
    +   *
    +   */
    +  protected val timelineQueryClient = {
    +    createTimelineQueryClient()
    +  }
    +
    +
    +  /**
    +   * Override point: create the timeline endpoint
    +   * @return a URI to the timeline web service
    +   */
    +  protected def createTimelineEndpoint(): URI = {
    +    getTimelineEndpoint(yarnConf)
    +  }
    +
    +  /**
    +   * Override point: create the timeline query client.
    +   * This is called during instance creation.
    +   * @return a timeline query client ot use for the duration
    +   *         of this instance
    +   */
    +  protected def createTimelineQueryClient(): TimelineQueryClient = {
    +    new TimelineQueryClient(timelineEndpoint, yarnConf, 
JerseyBinding.createClientConfig())
    +  }
    +
    +  /**
    +   * The empty listing, with a timestamp to indicate that the listing
    +   * has never taken place.
    +   */
    +  private val emptyListing = new ApplicationListingResults(0, Nil, None)
    +
    +  /**
    +   * List of applications. Initial result is empty
    +   */
    +  private var applications: ApplicationListingResults = emptyListing
    +  
    +  /**
    +   * Last exception seen and when
    +   */
    +  protected var lastFailureCause: Option[(Throwable, Date)] = None
    +
    +  private val refreshCount = new AtomicLong(0)
    +  private val refreshFailedCount = new AtomicLong(0)
    +
    +  /**
    +   * Health marker
    +   */
    +  private val healthy = new AtomicBoolean(false)
    +
    +  /**
    +   * Enabled flag
    +   */
    +  private val _enabled = timelineServiceEnabled(yarnConf)
    +
    +  /**
    +   * Atomic boolean used to signal to the refresh thread that it
    +   * must exit its loop.
    +   */
    +  private val stopRefresh = new AtomicBoolean(false)
    +
    +  /**
    +   * refresher
    +   */
    +  val refresher = new Refresher()
    +
    +  /**
    +   * Initialize the provider
    +   */
    +  init()
    +
    +  /**
    +   * Check the configuration and log whether or not it is enabled;
    +   * if it is enabled then the URL is logged too.
    +   */
    +  private def init(): Unit = {
    +    if (!enabled) {
    +      logError(YarnHistoryProvider.TEXT_SERVICE_DISABLED)
    +    } else {
    +      logInfo(YarnHistoryProvider.TEXT_SERVICE_ENABLED)
    +      logInfo(YarnHistoryProvider.KEY_SERVICE_URL + ": " + 
timelineEndpoint)
    +      logDebug(sparkConf.toDebugString)
    +      // get the thread time
    +      logInfo(s"refresh interval $refreshInterval milliseconds")
    +      if (refreshInterval < 0) {
    +        throw new 
Exception(YarnHistoryProvider.TEXT_INVALID_UPDATE_INTERVAL +
    +            s": ${refreshInterval/1000}")
    +      }
    +      startRefreshThread()
    +    }
    +  }
    +
    +
    +  /**
    +   * Stop the service. After this point operations will fail.
    +   */
    +  override def stop(): Unit = {
    +    logDebug(s"Stopping $this")
    +    // attempt to stop the refresh thread
    +    if (!stopRefreshThread()) {
    +      closeQueryClient()
    +    }
    +
    +  }
    +
    +  /**
    +   * Close the query client
    +   */
    +  def closeQueryClient(): Unit = {
    +    // and otherwise, stop the query client
    +    logDebug("Stopping Timeline client")
    +    timelineQueryClient.close()
    +  }
    +
    +  /**
    +   * Is the timeline service (and therefore this provider) enabled.
    +   * (override point for tests).
    +   *
    +   * Important: this is called during construction, so test-time subclasses
    +   * will be invoked before their own construction has taken place.
    +   * Code appropriately.
    +   * @return true if the provider/YARN configuration enables the timeline
    +   *         service.
    +   */
    +  def enabled: Boolean = {
    +    _enabled
    +  }
    +  
    +  /**
    +   * Get the timeline query client. Used internally to ease testing
    +   * @return the client.
    +   */
    +  def getTimelineQueryClient(): TimelineQueryClient = {
    +    timelineQueryClient
    +  }
    +
    +  /**
    +   * Set the last exception
    +   * @param ex exception seen
    +   */
    +  private def setLastFailure(ex: Throwable): Unit = {
    +    setLastFailure(ex, now())
    +  }
    +
    +  /**
    +   * Set the last exception
    +   * @param ex exception seen
    +   * @param timestamp the timestamp of the failure
    +   */
    +  private def setLastFailure(ex: Throwable, timestamp: Long): Unit = {
    +    this.synchronized {
    +      lastFailureCause = Some(ex, new Date(timestamp))
    +    }
    +  }
    +
    +  /**
    +   * Reset the failure info
    +   */
    +  private def resetLastFailure(): Unit = {
    +    this.synchronized {
    +      lastFailureCause = None
    +    }
    +  }
    +
    +  /**
    +   * Get the last exception
    +   * @return the last exception or  null
    +   */
    +  def getLastFailure(): Option[(Throwable, Date)] = {
    +    this.synchronized {
    +      lastFailureCause
    +    }
    +  }
    +
    +  /**
    +   * Query for the connection being healthy
    +   * @return
    +   */
    +  def isHealthy(): Boolean = {
    +    healthy.get()
    +  }
    +
    +  /**
    +   * Get that the health flag itself. This allows test code to initialize 
it properly.
    +   * Also: if accessed and set to false, it will trigger another health 
chek.
    +   * @return
    +   */
    +  protected def getHealthFlag(): AtomicBoolean = {
    +    healthy;
    +  }
    +
    +  /**
    +   * Thread safe accessor to application list
    +   * @return
    +   */
    +  def getApplications(): ApplicationListingResults = {
    +    this.synchronized(applications)
    +  }
    +
    +  /**
    +   * Thread safe call to update the application results
    +   * @param newVal new value
    +   */
    +  protected def setApplications(newVal: ApplicationListingResults): Unit = 
{
    +    this.synchronized {
    +      applications = newVal
    +    }
    +  }
    +
    +  /**
    +   * Health check to call before any other operation is attempted.
    +   * This is atomic, using the `healthy` flag to check.
    +   * If the endpoint is considered unhealthy then the healthy flag
    +   * is reset to false and an exception thrown.
    +   * @return true if the health check took place
    +   */
    +  protected def maybeCheckHealth(): Boolean = {
    +    val h = getHealthFlag();
    +    if (!h.getAndSet(true)) {
    +      val client = getTimelineQueryClient()
    +      try {
    +        client.healthCheck()
    +        true
    +      } catch {
    +        case e: Exception =>
    +          // failure
    +          logWarning(s"Health check of $client failed", e)
    +          setLastFailure(e)
    +          // reset health so another caller may attempt it.
    +          h.set(false)
    +          // propagate the failure
    +          throw e;
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Start the refresh thread with the given interval.
    +   *
    +   * When this thread exits, it will close the `timelineQueryClient`
    +   * instance
    +   */
    +  def startRefreshThread(): Unit = {
    +    logInfo(s"Starting timeline refresh thread")
    +    val thread = new Thread(refresher, s"YarnHistoryProvider Refresher")
    +    thread.setDaemon(true)
    +    refresher.start(thread)
    +  }
    +
    +  /**
    +   * Stop the refresh thread if there is one.
    +   *
    +   * This does not guarantee an immediate halt to the thread.
    +   * @return true if there was a refresh thread to stop
    +   */
    +  def stopRefreshThread(): Boolean = {
    +    refresher.stopRefresher()
    +  }
    +
    +  /**
    +   * Probe for the refresh thread running
    +   * @return true if the refresh thread has been created and is still alive
    +   */
    +  def isRefreshThreadRunning(): Boolean = {
    +    refresher.isRunning()
    +  }
    +
    +  def getRefreshCount(): Long = { refreshCount.get() }
    +  def getRefreshFailedCount(): Long = { refreshFailedCount.get() }
    +
    +  /**
    +   * List applications.
    +   * <p>
    +   * If the timeline is not enabled, returns an empty list
    +   * @return  the result of the last successful listing operation,
    +   *          or the `emptyListing` result if no listing has yet been 
successful
    +   */
    +   def listApplications(limit: Option[Long] = None,
    +      windowStart: Option[Long] = None,
    +      windowEnd: Option[Long] = None): ApplicationListingResults = {
    +    if (!enabled) {
    +      // Timeline is disabled: return the empty listing
    +      return emptyListing
    +    }
    +    try {
    +      maybeCheckHealth()
    +      val client = getTimelineQueryClient()
    +      logInfo(s"getListing from: $client")
    +      // get the timestamp after any health check
    +      val timestamp = now()
    +      val timelineEntities =
    +        client.listEntities(YarnHistoryService.SPARK_EVENT_ENTITY_TYPE,
    +          windowStart = windowStart,
    +          windowEnd = windowEnd,
    +          limit = limit)
    +
    +      val listing = timelineEntities.flatMap { en =>
    +        try {
    +          val historyInfo = toApplicationHistoryInfo(en)
    +          
logDebug(s"${YarnTimelineUtils.describeApplicationHistoryInfo(historyInfo)}")
    +          Some(historyInfo)
    +        } catch {
    +          case e: Exception =>
    +            logWarning(s"Failed to parse entity. 
${YarnTimelineUtils.describeEntity(en) }", e)
    +            // skip this result
    +            None
    +        }
    +      }
    +      val incomplete = countIncompleteApplications(listing)
    +      logInfo(s"Found ${listing.size} applications: " +
    +          s"${listing.size-incomplete} complete and $incomplete 
incomplete")
    +      new ApplicationListingResults(timestamp, listing, None)
    +    } catch {
    +      case e: Exception =>
    +        logWarning(s"Failed to list entities from $timelineEndpoint", e)
    +        new ApplicationListingResults(now(), Nil, Some(e))
    +    }
    +  }
    +
    +  /**
    +   * List applications. 
    +   *
    +   * Also updates the cached values of the listing/last failure, depending
    +   * upon the outcome
    +   * If the timeline is  not enabled, returns an empty list
    +   * @param startup a flag to indicate this is the startup retrieval with 
different window policy
    +   * @return List of all known applications.
    +   */
    +  def listAndCacheApplications(startup: Boolean): 
ApplicationListingResults = {
    +    refreshCount.incrementAndGet()
    +    val history = getApplications().applications
    +
    +    val current = now()
    +    // work out the (exclusive) start of the new window
    +    val nextWindowStart = findStartOfWindow(history) match {
    +        // no window.
    +      case None => None
    +
    +      case Some(h) =>
    +        // inclusive on the one retrieved last time.
    +        // Why? we need to include the oldest incomplete entry in our range
    +        val inclusiveWindow = startTime(h) - 1
    +        // sanity check on window size
    +        val earliestWindow = if (windowLimitMs > 0) current - 
windowLimitMs else 0
    +        Some(Math.max(earliestWindow, inclusiveWindow))
    +    }
    +
    +    val results = listApplications(windowStart = nextWindowStart)
    +    this.synchronized {
    +      if (results.succeeded) {
    +        // on a success, the existing application list is merged
    +        // creating a new aggregate application list
    +        logDebug(s"Listed application count: ${results.size}")
    +        val merged = combineResults(history, results.applications)
    +        logDebug(s"Existing count: ${history.size}; merged = 
${merged.size} ")
    +        val sorted = sortApplicationsByStartTime(merged)
    +        // and a final result
    +        setApplications(new ApplicationListingResults(
    +          results.timestamp,
    +          sorted,
    +          None))
    +        resetLastFailure()
    +      } else {
    +        // on a failure, the failure cause is updated
    +        setLastFailure(results.failureCause.get, results.timestamp)
    +        // and the failure counter
    +        refreshFailedCount.incrementAndGet()
    +      }
    +    }
    +    results
    +  }
    +
    +  /**
    +   * List applications. This currently finds completed applications only.
    +   * 
    +   * If the timeline is  not enabled, returns an empty list
    +   * @return List of all known applications.
    +   */
    +  override def getListing(): Seq[ApplicationHistoryInfo] = {
    +    // get the current list
    +    val listing = getApplications().applications
    +    // and queue another refresh
    +    triggerRefresh()
    +    listing
    +  }
    +
    +  /**
    +   * Trigger a refresh
    +   */
    +  def triggerRefresh(): Unit = {
    +    refresher.refresh(now())
    +  }
    +
    +  /**
    +   * Return the current time
    +   * @return
    +   */
    +  def now(): Long = {
    +    System.currentTimeMillis()
    +  }
    +
    +  /**
    +   * Get the last refresh attempt (Which may or may not be successful)
    +   * @return the last refresh time
    +   */
    +  def getLastRefreshAttemptTime(): Long = {
    +    refresher.lastRefreshAttemptTime
    +  }
    +  
    +  /**
    +   * Look up the timeline entity
    +   * @param appId application ID
    +   * @return the entity associated with the given application
    +   * @throws FileNotFoundException if no entry was found
    +   */
    +  def getTimelineEntity(appId: String): TimelineEntity = {
    +    logDebug(s"GetTimelineEntity $appId")
    +    maybeCheckHealth()
    +    
getTimelineQueryClient().getEntity(YarnHistoryService.SPARK_EVENT_ENTITY_TYPE, 
appId)
    +  }
    +
    +
    +  /**
    +   * Returns the Spark UI for a specific application.
    +   * <p>
    +   * If the timeline is  not enabled, returns `None`
    +   * @param appId The application ID.
    +   * @param attemptId The application attempt ID (or `None` if there is no 
attempt ID).
    +   * @return The application's UI, or `None` if application is not found.
    +   */
    +  override def getAppUI(appId: String, attemptId: Option[String]): 
Option[SparkUI] = {
    +    getAppUI(appId)
    +  }
    +
    +  /**
    +   * Build the application UI for an application
    +   * <p>
    +   * If the timeline is  not enabled, returns `None`
    +   * @param appId The application ID.
    +   * @return The application's UI, or `None` if application is not found.
    +   */
    +  def getAppUI(appId: String): Option[SparkUI] = {
    +    logDebug(s"Request UI with appId $appId")
    +    if (!enabled) {
    +      // Timeline is disabled: return nothing
    +      return None
    +    }
    +    maybeCheckHealth()
    +    try {
    +      val entity = getTimelineEntity(appId)
    +
    +      if (log.isDebugEnabled) {
    +        logDebug(describeEntity(entity))
    +      }
    +      val bus = new SparkListenerBus() {}
    +      val appListener = new ApplicationEventListener()
    +      bus.addListener(appListener)
    +
    +      val ui = {
    +        val conf = this.sparkConf.clone()
    +        val appSecManager = new SecurityManager(conf)
    +        SparkUI.createHistoryUI(conf, bus, appSecManager, appId,
    +                                 HistoryServer.UI_PATH_PREFIX + s"/${appId 
}", entity.getStartTime)
    +      }
    +      val events = entity.getEvents
    --- End diff --
    
    1. There's no partial retrieval in the ATS APIs; the assumption in the 
timeline server is that you don't call this often so there's less cost on the 
server. 
    2. If you ever look at where CPU load goes on the server when making these 
(or any of the timeline server calls), marshalling is a main expense. Maybe we 
should push for a thrift or PB format option in future.
    3. There's an implicit assumption in the history server that the cache 
fetch lookup is fast and doesn't fail; something async in the UI with some 
meta-refresh could address that in the HTML view, but not JSON. 
    4. Note that [SPARK-7889] unintentionally reduces the load placed on the 
timeline server by in-progress applications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to