Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5423#discussion_r33466420
  
    --- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryProvider.scala
 ---
    @@ -0,0 +1,1015 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history.yarn
    +
    +import java.io.FileNotFoundException
    +import java.net.URI
    +import java.util.Date
    +import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
    +import java.util.zip.ZipOutputStream
    +
    +import scala.collection.JavaConversions._
    +
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
    +import org.apache.spark.deploy.history.yarn.rest.{JerseyBinding, 
TimelineQueryClient}
    +import org.apache.spark.deploy.history.{ApplicationHistoryInfo, 
ApplicationHistoryProvider, HistoryServer}
    +import org.apache.spark.scheduler.{ApplicationEventListener, 
SparkListenerBus}
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.{SparkException, Logging, SecurityManager, 
SparkConf}
    +
    +/**
    + * A  History provider which reads in the history from
    + * the YARN Timeline Service.
    + *
    + * The service is a remote HTTP service, so failure modes are
    + * different from simple file IO.
    + *
    + * 1. Application listings are asynchronous, and made on a schedule, though
    + * they can be forced (and the schedule disabled).
    + * 2. The results are cached and can be retrieved with 
[[getApplications()]].
    + * 3. The most recent failure of any operation is stored,
    + * The [[getLastFailure()]] call will return the last exception
    + * or `None`. It is shared across threads so is primarily there for
    + * tests and basic diagnostics.
    + * 4. Listing the details of a single application in [[getAppUI()]]
    + * is synchronous and *not* cached.
    + * 5. the [[maybeCheckHealth()]] call performs a health check as the 
initial
    + * binding operation of this instance. This call invokes 
[[TimelineQueryClient.healthCheck()]]
    + * for better diagnostics on binding failures -particularly configuration 
problems.
    + * 6. Every REST call, synchronous or asynchronous, will invoke 
[[maybeCheckHealth()]] until
    + * the health check eventually succeeds.
    + * <p>
    + * If the timeline is  not enabled, the API calls used by the web UI
    + * downgrade gracefully (returning empty entries), rather than fail.
    + * 
    + *
    + * @param sparkConf configuration of the provider
    + */
    +private[spark] class YarnHistoryProvider(sparkConf: SparkConf)
    +  extends ApplicationHistoryProvider with Logging {
    +
    +  /**
    +   * The configuration here is a YarnConfiguration built off the spark 
configuration
    +   * supplied in the constructor; this operation ensures that 
`yarn-default.xml`
    +   * and `yarn-site.xml` are pulled in. Options in the spark conf will 
override
    +   * those in the -default and -site XML resources which are not marked as 
final.
    +   */
    +  private val yarnConf = {
    +    new YarnConfiguration(SparkHadoopUtil.get.newConfiguration(sparkConf))
    +  }
    +
    +  /**
    +   * UI ACL option
    +   */
    +  private val uiAclsEnabled = 
sparkConf.getBoolean("spark.history.ui.acls.enable", false)
    +
    +  private val detailedInfo = 
sparkConf.getBoolean(YarnHistoryProvider.OPTION_DETAILED_INFO, false)
    +  private val NOT_STARTED = "<Not Started>"
    +
    +  /* minimum interval between each check for event log updates */
    +  private val refreshInterval = 
sparkConf.getLong(YarnHistoryProvider.OPTION_MIN_REFRESH_INTERVAL,
    +    YarnHistoryProvider.DEFAULT_MIN_REFRESH_INTERVAL_SECONDS) * 1000
    +
    +  /**
    +   * Window limit in milliseconds
    +   */
    +  private val windowLimitMs = 
sparkConf.getLong(YarnHistoryProvider.OPTION_WINDOW_LIMIT,
    +    YarnHistoryProvider.DEFAULT_WINDOW_LIMIT) * 1000
    +
    +  /**
    +   * Number of events to get
    +   */
    +  private val eventFetchLimit = 
sparkConf.getLong(YarnHistoryProvider.OPTION_EVENT_FETCH_LIMIT,
    +    YarnHistoryProvider.DEFAULT_EVENT_FETCH_LIMIT)
    +
    +  private val eventFetchOption: Option[Long] = if (eventFetchLimit > 0) 
Some(eventFetchLimit) else None
    +
    +  /**
    +   * Start time. Doesn't use the `now` call as tests can subclass that and
    +   * it won't be valid until after the subclass has been constructed
    +   */
    +  val serviceStartTime = System.currentTimeMillis()
    +
    +  /**
    +   * Timeline endpoint URI
    +   */
    +  protected val timelineEndpoint = createTimelineEndpoint()
    +
    +  /**
    +   * The timeline query client which uses the `jersey`
    +   * Jersey instance to talk to a timeline service running
    +   * at [[timelineEndpoint]], and creates a timeline (write) client 
instance
    +   * to handle token renewal
    +   *
    +   */
    +  protected val timelineQueryClient = {
    +    createTimelineQueryClient()
    +  }
    +
    +
    +  /**
    +   * Override point: create the timeline endpoint
    +   * @return a URI to the timeline web service
    +   */
    +  protected def createTimelineEndpoint(): URI = {
    +    getTimelineEndpoint(yarnConf)
    +  }
    +
    +  /**
    +   * Override point: create the timeline query client.
    +   * This is called during instance creation.
    +   * @return a timeline query client ot use for the duration
    +   *         of this instance
    +   */
    +  protected def createTimelineQueryClient(): TimelineQueryClient = {
    +    new TimelineQueryClient(timelineEndpoint, yarnConf, 
JerseyBinding.createClientConfig())
    +  }
    +
    +  /**
    +   * The empty listing, with a timestamp to indicate that the listing
    +   * has never taken place.
    +   */
    +  private val emptyListing = new ApplicationListingResults(0, Nil, None)
    +
    +  /**
    +   * List of applications. Initial result is empty
    +   */
    +  private var applications: ApplicationListingResults = emptyListing
    +  
    +  /**
    +   * Last exception seen and when
    +   */
    +  protected var lastFailureCause: Option[(Throwable, Date)] = None
    +
    +  private val refreshCount = new AtomicLong(0)
    +  private val refreshFailedCount = new AtomicLong(0)
    +
    +  /**
    +   * Health marker
    +   */
    +  private val healthy = new AtomicBoolean(false)
    +
    +  /**
    +   * Enabled flag
    +   */
    +  private val _enabled = timelineServiceEnabled(yarnConf)
    +
    +  /**
    +   * Atomic boolean used to signal to the refresh thread that it
    +   * must exit its loop.
    +   */
    +  private val stopRefresh = new AtomicBoolean(false)
    +
    +  /**
    +   * refresher
    +   */
    +  val refresher = new Refresher()
    +
    +  /**
    +   * Initialize the provider
    +   */
    +  init()
    +
    +  /**
    +   * Check the configuration and log whether or not it is enabled;
    +   * if it is enabled then the URL is logged too.
    +   */
    +  private def init(): Unit = {
    +    if (!enabled) {
    +      logError(YarnHistoryProvider.TEXT_SERVICE_DISABLED)
    +    } else {
    +      logInfo(YarnHistoryProvider.TEXT_SERVICE_ENABLED)
    +      logInfo(YarnHistoryProvider.KEY_SERVICE_URL + ": " + 
timelineEndpoint)
    +      logDebug(sparkConf.toDebugString)
    +      // get the thread time
    +      logInfo(s"refresh interval $refreshInterval milliseconds")
    +      if (refreshInterval < 0) {
    +        throw new 
Exception(YarnHistoryProvider.TEXT_INVALID_UPDATE_INTERVAL +
    +            s": ${refreshInterval/1000}")
    +      }
    +      startRefreshThread()
    +    }
    +  }
    +
    +
    +  /**
    +   * Stop the service. After this point operations will fail.
    +   */
    +  override def stop(): Unit = {
    +    logDebug(s"Stopping $this")
    +    // attempt to stop the refresh thread
    +    if (!stopRefreshThread()) {
    +      closeQueryClient()
    +    }
    +
    +  }
    +
    +  /**
    +   * Close the query client
    +   */
    +  def closeQueryClient(): Unit = {
    +    // and otherwise, stop the query client
    +    logDebug("Stopping Timeline client")
    +    timelineQueryClient.close()
    +  }
    +
    +  /**
    +   * Is the timeline service (and therefore this provider) enabled.
    +   * (override point for tests).
    +   *
    +   * Important: this is called during construction, so test-time subclasses
    +   * will be invoked before their own construction has taken place.
    +   * Code appropriately.
    +   * @return true if the provider/YARN configuration enables the timeline
    +   *         service.
    +   */
    +  def enabled: Boolean = {
    +    _enabled
    +  }
    +  
    +  /**
    +   * Get the timeline query client. Used internally to ease testing
    +   * @return the client.
    +   */
    +  def getTimelineQueryClient(): TimelineQueryClient = {
    +    timelineQueryClient
    +  }
    +
    +  /**
    +   * Set the last exception
    +   * @param ex exception seen
    +   */
    +  private def setLastFailure(ex: Throwable): Unit = {
    +    setLastFailure(ex, now())
    +  }
    +
    +  /**
    +   * Set the last exception
    +   * @param ex exception seen
    +   * @param timestamp the timestamp of the failure
    +   */
    +  private def setLastFailure(ex: Throwable, timestamp: Long): Unit = {
    +    this.synchronized {
    +      lastFailureCause = Some(ex, new Date(timestamp))
    +    }
    +  }
    +
    +  /**
    +   * Reset the failure info
    +   */
    +  private def resetLastFailure(): Unit = {
    +    this.synchronized {
    +      lastFailureCause = None
    +    }
    +  }
    +
    +  /**
    +   * Get the last exception
    +   * @return the last exception or  null
    +   */
    +  def getLastFailure(): Option[(Throwable, Date)] = {
    +    this.synchronized {
    +      lastFailureCause
    +    }
    +  }
    +
    +  /**
    +   * Query for the connection being healthy
    +   * @return
    +   */
    +  def isHealthy(): Boolean = {
    +    healthy.get()
    +  }
    +
    +  /**
    +   * Get that the health flag itself. This allows test code to initialize 
it properly.
    +   * Also: if accessed and set to false, it will trigger another health 
chek.
    +   * @return
    +   */
    +  protected def getHealthFlag(): AtomicBoolean = {
    +    healthy;
    --- End diff --
    
    It tries to bootstrap the connection with some basic health checks, rather 
than just exceptions. The common problem, based on my own experience, is 
getting the URL to the timeline service wrong, so having some HTML coming back 
instead of JSON. Jersey just generates some json-unmarshalling error there 
which is meaningless except to people that recognise that stack trace as 
"something other than Jersey came back".
    
    It's flipped to true briefly to disable the health check & stop re-entrant 
calls on different threads; if the single executing thread fails it is reverted 
back to unhealthy. That is: the first check assumes it will work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to