Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5423#discussion_r33466020
  
    --- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryProvider.scala
 ---
    @@ -0,0 +1,1015 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history.yarn
    +
    +import java.io.FileNotFoundException
    +import java.net.URI
    +import java.util.Date
    +import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
    +import java.util.zip.ZipOutputStream
    +
    +import scala.collection.JavaConversions._
    +
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
    +import org.apache.spark.deploy.history.yarn.rest.{JerseyBinding, 
TimelineQueryClient}
    +import org.apache.spark.deploy.history.{ApplicationHistoryInfo, 
ApplicationHistoryProvider, HistoryServer}
    +import org.apache.spark.scheduler.{ApplicationEventListener, 
SparkListenerBus}
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.{SparkException, Logging, SecurityManager, 
SparkConf}
    +
    +/**
    + * A  History provider which reads in the history from
    + * the YARN Timeline Service.
    + *
    + * The service is a remote HTTP service, so failure modes are
    + * different from simple file IO.
    + *
    + * 1. Application listings are asynchronous, and made on a schedule, though
    + * they can be forced (and the schedule disabled).
    + * 2. The results are cached and can be retrieved with 
[[getApplications()]].
    + * 3. The most recent failure of any operation is stored,
    + * The [[getLastFailure()]] call will return the last exception
    + * or `None`. It is shared across threads so is primarily there for
    + * tests and basic diagnostics.
    + * 4. Listing the details of a single application in [[getAppUI()]]
    + * is synchronous and *not* cached.
    + * 5. the [[maybeCheckHealth()]] call performs a health check as the 
initial
    + * binding operation of this instance. This call invokes 
[[TimelineQueryClient.healthCheck()]]
    + * for better diagnostics on binding failures -particularly configuration 
problems.
    + * 6. Every REST call, synchronous or asynchronous, will invoke 
[[maybeCheckHealth()]] until
    + * the health check eventually succeeds.
    + * <p>
    + * If the timeline is  not enabled, the API calls used by the web UI
    + * downgrade gracefully (returning empty entries), rather than fail.
    + * 
    + *
    + * @param sparkConf configuration of the provider
    + */
    +private[spark] class YarnHistoryProvider(sparkConf: SparkConf)
    +  extends ApplicationHistoryProvider with Logging {
    +
    +  /**
    +   * The configuration here is a YarnConfiguration built off the spark 
configuration
    +   * supplied in the constructor; this operation ensures that 
`yarn-default.xml`
    +   * and `yarn-site.xml` are pulled in. Options in the spark conf will 
override
    +   * those in the -default and -site XML resources which are not marked as 
final.
    +   */
    +  private val yarnConf = {
    +    new YarnConfiguration(SparkHadoopUtil.get.newConfiguration(sparkConf))
    +  }
    +
    +  /**
    +   * UI ACL option
    +   */
    +  private val uiAclsEnabled = 
sparkConf.getBoolean("spark.history.ui.acls.enable", false)
    +
    +  private val detailedInfo = 
sparkConf.getBoolean(YarnHistoryProvider.OPTION_DETAILED_INFO, false)
    +  private val NOT_STARTED = "<Not Started>"
    +
    +  /* minimum interval between each check for event log updates */
    +  private val refreshInterval = 
sparkConf.getLong(YarnHistoryProvider.OPTION_MIN_REFRESH_INTERVAL,
    +    YarnHistoryProvider.DEFAULT_MIN_REFRESH_INTERVAL_SECONDS) * 1000
    +
    +  /**
    +   * Window limit in milliseconds
    +   */
    +  private val windowLimitMs = 
sparkConf.getLong(YarnHistoryProvider.OPTION_WINDOW_LIMIT,
    +    YarnHistoryProvider.DEFAULT_WINDOW_LIMIT) * 1000
    +
    +  /**
    +   * Number of events to get
    +   */
    +  private val eventFetchLimit = 
sparkConf.getLong(YarnHistoryProvider.OPTION_EVENT_FETCH_LIMIT,
    +    YarnHistoryProvider.DEFAULT_EVENT_FETCH_LIMIT)
    +
    +  private val eventFetchOption: Option[Long] = if (eventFetchLimit > 0) 
Some(eventFetchLimit) else None
    +
    +  /**
    +   * Start time. Doesn't use the `now` call as tests can subclass that and
    +   * it won't be valid until after the subclass has been constructed
    +   */
    +  val serviceStartTime = System.currentTimeMillis()
    +
    +  /**
    +   * Timeline endpoint URI
    +   */
    +  protected val timelineEndpoint = createTimelineEndpoint()
    +
    +  /**
    +   * The timeline query client which uses the `jersey`
    +   * Jersey instance to talk to a timeline service running
    +   * at [[timelineEndpoint]], and creates a timeline (write) client 
instance
    +   * to handle token renewal
    +   *
    +   */
    +  protected val timelineQueryClient = {
    +    createTimelineQueryClient()
    +  }
    +
    +
    +  /**
    +   * Override point: create the timeline endpoint
    +   * @return a URI to the timeline web service
    +   */
    +  protected def createTimelineEndpoint(): URI = {
    +    getTimelineEndpoint(yarnConf)
    +  }
    +
    +  /**
    +   * Override point: create the timeline query client.
    +   * This is called during instance creation.
    +   * @return a timeline query client ot use for the duration
    +   *         of this instance
    +   */
    +  protected def createTimelineQueryClient(): TimelineQueryClient = {
    +    new TimelineQueryClient(timelineEndpoint, yarnConf, 
JerseyBinding.createClientConfig())
    +  }
    +
    +  /**
    +   * The empty listing, with a timestamp to indicate that the listing
    +   * has never taken place.
    +   */
    +  private val emptyListing = new ApplicationListingResults(0, Nil, None)
    +
    +  /**
    +   * List of applications. Initial result is empty
    +   */
    +  private var applications: ApplicationListingResults = emptyListing
    +  
    +  /**
    +   * Last exception seen and when
    +   */
    +  protected var lastFailureCause: Option[(Throwable, Date)] = None
    +
    +  private val refreshCount = new AtomicLong(0)
    +  private val refreshFailedCount = new AtomicLong(0)
    +
    +  /**
    +   * Health marker
    +   */
    +  private val healthy = new AtomicBoolean(false)
    +
    +  /**
    +   * Enabled flag
    +   */
    +  private val _enabled = timelineServiceEnabled(yarnConf)
    +
    +  /**
    +   * Atomic boolean used to signal to the refresh thread that it
    +   * must exit its loop.
    +   */
    +  private val stopRefresh = new AtomicBoolean(false)
    +
    +  /**
    +   * refresher
    +   */
    +  val refresher = new Refresher()
    +
    +  /**
    +   * Initialize the provider
    +   */
    +  init()
    +
    +  /**
    +   * Check the configuration and log whether or not it is enabled;
    +   * if it is enabled then the URL is logged too.
    +   */
    +  private def init(): Unit = {
    +    if (!enabled) {
    +      logError(YarnHistoryProvider.TEXT_SERVICE_DISABLED)
    --- End diff --
    
    I always use constants where tests will refer back to them; stops making 
the tests brittle to change. This text is looked for in 
{{DisabledProviderDiagnosticsSuite}}, which verifies that a GET / returns with 
an error message, not some failure, when ATS is disabled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to