Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8744#discussion_r42539027
  
    --- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
    @@ -0,0 +1,1048 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history.yarn
    +
    +import java.net.{ConnectException, URI}
    +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
    +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
    +import org.apache.hadoop.yarn.client.api.TimelineClient
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
    +import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
    +import org.apache.spark.util.{SystemClock, Utils}
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * A Yarn Extension Service to post lifecycle events to a registered
    + * YARN Timeline Server.
    + */
    +private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
    +
    +  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
    +
    +  /** Simple state model implemented in an atomic integer */
    +  private val _serviceState = new AtomicInteger(CreatedState)
    +
    +  def serviceState: Int = {
    +    _serviceState.get()
    +  }
    +  def enterState(state: Int): Int = {
    +    logDebug(s"Entering state $state from $serviceState")
    +    _serviceState.getAndSet(state)
    +  }
    +
    +  /**
    +   * Spark context; valid once started
    +   */
    +  private var sparkContext: SparkContext = _
    +
    +  /** YARN configuration from the spark context */
    +  private var config: YarnConfiguration = _
    +
    +  /** application ID. */
    +  private var _applicationId: ApplicationId = _
    +
    +  /** attempt ID this will be null if the service is started in 
yarn-client mode */
    +  private var _attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** YARN timeline client */
    +  private var _timelineClient: Option[TimelineClient] = None
    +
    +  /** registered event listener */
    +  private var listener: Option[YarnEventListener] = None
    +
    +  /** Application name  from the spark start event */
    +  private var applicationName: String = _
    +
    +  /** Application ID*/
    +  private var sparkApplicationId: Option[String] = None
    +
    +  /** Optional Attempt ID from the spark start event */
    +  private var sparkApplicationAttemptId: Option[String] = None
    +
    +  /** user name as derived from `SPARK_USER` env var or `UGI` */
    +  private var userName = Utils.getCurrentUserName()
    +
    +  /** Clock for recording time */
    +  private val clock = new SystemClock()
    +
    +  /**
    +   * Start time of the application, as received in the start event.
    +   */
    +  private var startTime: Long = _
    +
    +  /**
    +   * Start time of the application, as received in the end event.
    +   */
    +  private var endTime: Long = _
    +
    +  /** number of events to batch up before posting*/
    +  private var _batchSize = DEFAULT_BATCH_SIZE
    +
    +  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
    +  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
    +
    +  /** limit on the total number of events permitted */
    +  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
    +
    +  /**
    +   * List of events which will be pulled into a timeline
    +   * entity when created
    +   */
    +  private var pendingEvents = new mutable.LinkedList[TimelineEvent]()
    +
    +  private var applicationStartEvent: Option[SparkListenerApplicationStart] 
= None
    +  private var applicationEndEvent: Option[SparkListenerApplicationEnd] = 
None
    +
    +  /** Has a start event been processed? */
    +  private val appStartEventProcessed = new AtomicBoolean(false)
    +
    +  /* has the application event event been processed */
    +  private val appEndEventProcessed = new AtomicBoolean(false)
    +
    +  /** counter of events processed -that is have been through 
handleEvent()*/
    +  private val _eventsProcessed = new AtomicLong(0)
    +
    +  /** counter of events queued. */
    +  private val _eventsQueued = new AtomicLong(0)
    +
    +  private val _entityPostAttempts = new AtomicLong(0)
    +  private val _entityPostSuccesses = new AtomicLong(0)
    +  /** how many entity postings failed? */
    +  private val _entityPostFailures = new AtomicLong(0)
    +  private val _eventsDropped = new AtomicLong(0)
    +
    +  /** how many flushes have taken place? */
    +  private val flushCount = new AtomicLong(0)
    +
    +  /** Event handler */
    +  private var eventHandlingThread: Option[Thread] = None
    +
    +  /**
    +   * Flag to indicate the thread is stopped; events aren't being
    +   * processed.
    +   */
    +  private val stopped = new AtomicBoolean(true)
    +
    +  /**
    +   * Boolean to track whether a thread is active or not, for tests to
    +   * monitor and see if the thread has completed.
    +   */
    +  private val postThreadActive = new AtomicBoolean(false)
    +
    +  /** How long to wait for shutdown before giving up */
    +  private var shutdownWaitTime = 0L
    +
    +  /**
    +   * What is the initial and incrementing interval for POST retries?
    +   */
    +  private var retryInterval = 0L
    +
    +  /** Domain ID for entities: may be null */
    +  private var domainId: Option[String] = None
    +
    +  /** URI to timeline web application -valid after `serviceStart()` */
    +  private var _timelineWebappAddress: URI = _
    +
    +  /**
    +   * Create a timeline client and start it. This does not update the
    +   * `timelineClient` field, though it does verify that the field
    +   * is unset.
    +   *
    +   * The method is private to the package so that tests can access it, 
which
    +   * some of the mock tests do to override the timeline client creation.
    +   * @return the timeline client
    +   */
    +  private[yarn] def createTimelineClient(): TimelineClient = {
    +    require(_timelineClient.isEmpty, "timeline client already set")
    +    YarnTimelineUtils.createTimelineClient(sparkContext)
    +  }
    +
    +  /**
    +   * Get the timeline client.
    +   * @return the client
    +   * @throws Exception if the timeline client is not currently running
    +   */
    +  def timelineClient: TimelineClient = {
    +    require(_timelineClient.isDefined)
    +    _timelineClient.get
    +  }
    +
    +  /**
    +   * Get the total number of events dropped due to the queue of
    +   * outstanding posts being too long
    +   * @return counter of events processed
    +   */
    +
    +  def eventsDropped: Long = {
    +    _eventsDropped.get()
    +  }
    +
    +  /**
    +   * Get the total number of processed events, those handled in the 
back-end thread without
    +   * being rejected
    +   * @return counter of events processed
    +   */
    +  def eventsProcessed: Long = {
    +    _eventsProcessed.get
    +  }
    +
    +  /**
    +   * Get the total number of events queued
    +   * @return the total event count
    +   */
    +  def eventsQueued: Long = {
    +    _eventsQueued.get
    +  }
    +
    +  /**
    +   * Get the current size of the queue
    +   * @return the current queue length
    +   */
    +  def getQueueSize: Int = {
    +    _entityQueue.size()
    +  }
    +
    +  /**
    +   * Get the current batch size
    +   * @return the batch size
    +   */
    +  def batchSize: Int = {
    +    _batchSize
    +  }
    +
    +  /**
    +   * Query the counter of attempts to post events
    +   * @return
    +   */
    +  def postAttempts: Long = _entityPostAttempts.get()
    +
    +  /**
    +   * Get the total number of failed post operations
    +   * @return counter of timeline post operations which failed
    +   */
    +  def postFailures: Long = {
    +    _entityPostFailures.get
    +  }
    +
    +  /**
    +   * Query the counter of post successes
    +   * @return the number of successful posts
    +   */
    +  def postSuccesses: Long = _entityPostSuccesses.get()
    +
    +  /**
    +   * is the asynchronous posting thread active?
    +   * @return true if the post thread has started; false if it has not 
yet/ever started, or
    +   *         if it has finished.
    +   */
    +  def isPostThreadActive: Boolean = {
    +    postThreadActive.get
    +  }
    +
    +  /**
    +   * The YARN application ID of this history service
    +   * @return the application ID provided when the service started
    +   */
    +  def applicationId: ApplicationId =  _applicationId
    +
    +  /**
    +   * The YARN attempt ID of this history service
    +   * @return the attempt ID provided when the service started
    +   */
    +  def attemptId: Option[ApplicationAttemptId] =  _attemptId
    +
    +  /**
    +   * Reset the timeline client
    +   * <p>
    +   * 1. Stop the timeline client service if running.
    +   * 2. set the `timelineClient` field to `None`
    +   */
    +  def stopTimelineClient(): Unit = {
    +    _timelineClient.foreach(_.stop())
    +    _timelineClient = None
    +  }
    +
    +  /**
    +   * Create the timeline domain.
    +   *
    +   * A Timeline Domain is a uniquely identified 'namespace' for accessing 
parts of the timeline.
    +   * Security levels are are managed at the domain level, so one is 
created if the
    +   * spark acls are enabled. Full access is then granted to the current 
user,
    +   * all users in the configuration options `"spark.modify.acls"` and 
`"spark.admin.acls"`;
    +   * read access to those users and those listed in `"spark.ui.view.acls"`
    +   *
    +   * @return an optional domain string. If `None`, then no domain was 
created.
    +   */
    +  private def createTimelineDomain(): Option[String] = {
    +    val sparkConf = sparkContext.getConf
    +    val aclsOn = sparkConf.getBoolean("spark.ui.acls.enable",
    +        sparkConf.getBoolean("spark.acls.enable", false))
    +    if (!aclsOn) {
    +      logDebug("ACLs are disabled; not creating the timeline domain")
    +      return None
    +    }
    +    val predefDomain = sparkConf.getOption(TIMELINE_DOMAIN)
    +    if (predefDomain.isDefined) {
    +      logDebug(s"Using predefined domain $predefDomain")
    +      return predefDomain
    +    }
    +    val current = UserGroupInformation.getCurrentUser.getShortUserName
    +    val adminAcls  = stringToSet(sparkConf.get("spark.admin.acls", ""))
    +    val viewAcls = stringToSet(sparkConf.get("spark.ui.view.acls", ""))
    +    val modifyAcls = stringToSet(sparkConf.get("spark.modify.acls", ""))
    +
    +    val readers = (Seq(current) ++ adminAcls ++ modifyAcls ++ 
viewAcls).mkString(" ")
    +    val writers = (Seq(current) ++ adminAcls ++ modifyAcls).mkString(" ")
    +    var domain = DOMAIN_ID_PREFIX + _applicationId
    +    logInfo(s"Creating domain $domain with readers: $readers and writers: 
$writers")
    +
    +    // create the timeline domain with the reader and writer permissions
    +    val timelineDomain = new TimelineDomain()
    +    timelineDomain.setId(domain)
    +    timelineDomain.setReaders(readers)
    +    timelineDomain.setWriters(writers)
    +    try {
    +      timelineClient.putDomain(timelineDomain)
    +      Some(domain)
    +    } catch {
    +      case e: Exception => {
    +        logError(s"cannot create the domain $domain", e)
    +        // fallback to default
    +        None
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Start the service, calling the service's `init()` and `start()` 
actions in the
    +   * correct order
    +   * @param binding binding to the spark application and YARN
    +   */
    +  override def start(binding: YarnExtensionServiceBinding): Unit = {
    +    val oldstate = enterState(StartedState)
    +    if (oldstate != CreatedState) {
    +      // state model violation
    +      _serviceState.set(oldstate)
    +      throw new IllegalArgumentException(s"Cannot start the service from 
state $oldstate")
    +    }
    +    val context = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    require(context != null, "Null context parameter")
    +    bindToYarnApplication(appId, attemptId)
    +    this.sparkContext = context
    +
    +    this.config = new YarnConfiguration(context.hadoopConfiguration)
    +
    +    val sparkConf = sparkContext.conf
    +
    +    // work out the attempt ID from the YARN attempt ID. No attempt, 
assume "1".
    +    // this is assumed by the AM, which uses it when creating a path to an 
attempt
    +    val attempt1 = attemptId match {
    +      case Some(attempt) => attempt.getAttemptId.toString
    +      case None => CLIENT_BACKEND_ATTEMPT_ID
    +    }
    +    setContextAppAndAttemptInfo(Some(appId.toString), Some(attempt1))
    +    _batchSize = sparkConf.getInt(BATCH_SIZE, _batchSize)
    +    _postQueueLimit = sparkConf.getInt(POST_QUEUE_LIMIT, _postQueueLimit)
    +    retryInterval = 1000 * sparkConf.getTimeAsSeconds(POST_RETRY_INTERVAL,
    --- End diff --
    
    getTimeAsMs would mean the default would be in millis. Asking for it in s 
and then multiplying by *1000 gives a time unit slightly better for humans. 
This is the same pattern as used in `FsHistoryProvider`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to