Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8744#discussion_r39635411
  
    --- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
    @@ -0,0 +1,981 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.history.yarn
    +
    +import java.net.{ConnectException, URI}
    +import java.util.concurrent.LinkedBlockingQueue
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
    +
    +import scala.collection.mutable.LinkedList
    +import scala.util.control.NonFatal
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.service.{AbstractService, Service}
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
    +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
    +import org.apache.hadoop.yarn.client.api.TimelineClient
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
    +import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster.YarnExtensionService
    +import org.apache.spark.util.{SystemClock, Clock, Utils}
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * Implements a Hadoop service with the init/start logic replaced by that
    + * of the YarnService.
    + * <p>
    + * As [[AbstractService]] implements `close()`, routing
    + * to its `stop` method, calling `close()` is sufficient
    + * to stop the service instance.
    + * <p>
    + * However, when registered to receive spark events, the service will 
continue to
    + * receive them until the spark context is stopped. Events received when 
this service
    + * is in a `STOPPED` state will be discarded.
    + */
    +private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
    +  with YarnExtensionService with Logging {
    +
    +  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
    +  /**
    +   * Spark context; valid once started
    +   */
    +  private var sparkContext: SparkContext = _
    +
    +  /** application ID. */
    +  private var _applicationId: ApplicationId = _
    +
    +  /** attempt ID this will be null if the service is started in 
yarn-client mode */
    +  private var _attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** YARN timeline client */
    +  private var timelineClient: Option[TimelineClient] = None
    +
    +  /** registered event listener */
    +  private var listener: Option[YarnEventListener] = None
    +
    +  /** Applicaton name */
    +  private var appName: String = _
    +
    +  /** Application ID from the spark start event */
    +  private var sparkApplicationId: Option[String] = None
    +
    +  /** Optional Attempt ID from the spark start event */
    +  private var sparkApplicationAttemptId: Option[String] = None
    +
    +  /** user name as derived from `SPARK_USER` env var or `UGI` */
    +  private var userName: String = Utils.getCurrentUserName
    +
    +  /**
    +   * Clock for recording time
    +   */
    +  private val clock: Clock = new SystemClock()
    +
    +  /**
    +   * Start time of the application, as received in the start event.
    +   */
    +  private var startTime: Long = _
    +
    +  /** number of events to batch up before posting*/
    +  private var batchSize: Int = DEFAULT_BATCH_SIZE
    +
    +  /** queue of actions*/
    +  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
    +
    +  /** cache layer to handle timeline client failure.*/
    +  private var entityList = new LinkedList[TimelineEntity]
    +
    +  /** current entity; wil be created on demand. */
    +  private var curEntity: Option[TimelineEntity] = None
    +
    +  /** Has a start event been processed? */
    +  private val appStartEventProcessed = new AtomicBoolean(false)
    +
    +  /* has the application event event been processed */
    +  private val appEndEventProcessed = new AtomicBoolean(false)
    +
    +  /** How many events have been received in the current entity */
    +  private var currentEventCount = 0
    +
    +  /** counter of events processed -that is have been through 
handleEvent()*/
    +  private val eventsProcessed: AtomicInteger = new AtomicInteger(0)
    +
    +  /** counter of events queued. */
    +  private val eventsQueued: AtomicInteger = new AtomicInteger(0)
    +
    +  /** how many event postings failed? */
    +  private val eventPostFailures: AtomicInteger = new AtomicInteger(0)
    +
    +  /** how many flushes have taken place? */
    +  private val flushCount = new AtomicInteger(0)
    +
    +  /** Event handler */
    +  private var eventHandlingThread: Option[Thread] = None
    +
    +  /**
    +   * Flag to indicate the thread is stopped; events aren't being
    +   * processed.
    +   */
    +  private val stopped: AtomicBoolean = new AtomicBoolean(true)
    +
    +  /**
    +   * boolean to track whether a thread is active or not, for tests to
    +   * monitor and see if the thread has completed.
    +   */
    +  private val postThreadActive: AtomicBoolean = new AtomicBoolean(false)
    +
    +  /**
    +   * object used for a lock on entity operations
    +   */
    +  private val entityLock: AnyRef = new AnyRef
    +
    +  /**
    +   * How long to wait for shutdown before giving up
    +   */
    +  private var maxTimeToWaitOnShutdown: Long = SHUTDOWN_WAIT_TIME
    +
    +  /** Domain ID for entities: may be null */
    +  private var domainId: String = null
    +
    +  /** URI to timeline web application -valid after `serviceStart()` */
    +  private var timelineWebappAddress: URI = _
    +
    +  /**
    +   * Create a timeline client and start it. This does not update the
    +   * `timelineClient` field, though it does verify that the field
    +   * is unset.
    +   *
    +   * The method is private to the package so that tests can access it, 
which
    +   * some of the mock tests do to override the timeline client creation.
    +   * @return the timeline client
    +   */
    +  private [yarn] def createTimelineClient(): TimelineClient = {
    +    require(timelineClient.isEmpty, "timeline client already set")
    +    YarnTimelineUtils.createTimelineClient(sparkContext)
    +  }
    +
    +  /**
    +   * Get the timeline client.
    +   * @return the client
    +   * @throws Exception if the timeline client is not currently running
    +   */
    +  def getTimelineClient: TimelineClient = {
    +    timelineClient.getOrElse(throw new Exception("Timeline client not 
running"))
    --- End diff --
    
    OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to