Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39653626 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * <p> + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * <p> + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed */ + private val appEndEventProcessed = new AtomicBoolean(false) + + /** How many events have been received in the current entity */ + private var currentEventCount = 0 + + /** counter of events processed -that is have been through handleEvent()*/ + private val eventsProcessed: AtomicInteger = new AtomicInteger(0) + + /** counter of events queued. */ + private val eventsQueued: AtomicInteger = new AtomicInteger(0) + + /** how many event postings failed? */ + private val eventPostFailures: AtomicInteger = new AtomicInteger(0) + + /** how many flushes have taken place? */ + private val flushCount = new AtomicInteger(0) + + /** Event handler */ + private var eventHandlingThread: Option[Thread] = None + + /** + * Flag to indicate the thread is stopped; events aren't being + * processed. + */ + private val stopped: AtomicBoolean = new AtomicBoolean(true) + + /** + * boolean to track whether a thread is active or not, for tests to + * monitor and see if the thread has completed. + */ + private val postThreadActive: AtomicBoolean = new AtomicBoolean(false) + + /** + * object used for a lock on entity operations + */ + private val entityLock: AnyRef = new AnyRef + + /** + * How long to wait for shutdown before giving up + */ + private var maxTimeToWaitOnShutdown: Long = SHUTDOWN_WAIT_TIME + + /** Domain ID for entities: may be null */ + private var domainId: String = null + + /** URI to timeline web application -valid after `serviceStart()` */ + private var timelineWebappAddress: URI = _ + + /** + * Create a timeline client and start it. This does not update the + * `timelineClient` field, though it does verify that the field + * is unset. + * + * The method is private to the package so that tests can access it, which + * some of the mock tests do to override the timeline client creation. + * @return the timeline client + */ + private [yarn] def createTimelineClient(): TimelineClient = { + require(timelineClient.isEmpty, "timeline client already set") + YarnTimelineUtils.createTimelineClient(sparkContext) + } + + /** + * Get the timeline client. + * @return the client + * @throws Exception if the timeline client is not currently running + */ + def getTimelineClient: TimelineClient = { + timelineClient.getOrElse(throw new Exception("Timeline client not running")) + } + + /** + * Get the total number of processed events + * @return counter of events processed + */ + def getEventsProcessed: Int = { + eventsProcessed.get + } + + /** + * Get the total number of events queued + * @return the total event count + */ + def getEventsQueued: Int = { + eventsQueued.get + } + + /** + * Get the current size of the queue + * @return the current queue length + */ + def getQueueSize: Int = { + actionQueue.size + } + + /** + * Get the current batch size + * @return the batch size + */ + def getBatchSize: Int = { + batchSize + } + + /** + * Get the total number of failed posts events + * @return counter of timeline post operations which failed + */ + def getEventPostFailures: Int = { + eventPostFailures.get + } + + /** + * is the asynchronous posting thread active? + * @return true if the post thread has started; false if it has not yet/ever started, or + * if it has finished. + */ + def isPostThreadActive: Boolean = { + postThreadActive.get + } + + /** + * The YARN application ID of this history service + * @return the application ID provided when the service started + */ + def applicationId: ApplicationId = { _applicationId } + + /** + * The YARN attempt ID of this history service + * @return the attempt ID provided when the service started + */ + def attemptId: Option[ApplicationAttemptId] = { _attemptId } + + /** + * Reset the timeline client + * <p> + * 1. Stop the timeline client service if running. + * 2. set the `timelineClient` field to `None` + */ + def stopTimelineClient(): Unit = { + stopOptionalService(timelineClient) + timelineClient = None + } + + /** + * Create the timeline domain + * @return a domain string or null + */ + private def createTimelineDomain(): String = { + val sparkConf = sparkContext.getConf + val aclsFlag = sparkConf.getOption("spark.acls.enable") + val aclsOn = aclsFlag.getOrElse( + sparkConf.get("spark.ui.acls.enable", "false")).toBoolean + if (!aclsOn) { + logDebug("ACLs are disabled; not creating the timeline domain") + return null + } + val predefDomain = sparkConf.getOption(TIMELINE_DOMAIN) + if (predefDomain.isDefined) { + return predefDomain.get + } + val current = UserGroupInformation.getCurrentUser.getShortUserName + val adminAcls = stringToSet(sparkConf.get("spark.admin.acls", "")) + val viewAcls = stringToSet(sparkConf.get("spark.ui.view.acls", "")) + val modifyAcls = stringToSet(sparkConf.get("spark.modify.acls", "")) + + val readers = (adminAcls ++ modifyAcls ++ viewAcls).foldLeft(current)(_ + " " + _) + val writers = (adminAcls ++ modifyAcls).foldLeft(current)(_ + " " + _) + var domainId = DOMAIN_ID_PREFIX + _applicationId + logInfo(s"Creating domain $domainId with" + + s" readers: readers and writers: $writers") + + // create the timeline domain with the reader and writer permissions + val timelineDomain = new TimelineDomain() + timelineDomain.setId(domainId) + timelineDomain.setReaders(readers) + timelineDomain.setWriters(writers) + try { + getTimelineClient.putDomain(timelineDomain) + } catch { + case e: Exception => { + logError("cannot create the domain") + // fallback to default + domainId = null + } + } + domainId + } + + /** + * Start the service, calling the service's `init()` and `start()` actions in the + * correct order + * @param context spark context + * @param appId YARN application ID + * @param attemptId YARN attempt ID if known. Otherwise, `None`. + * @return true if the service is hooked up to the timeline service; that is: it is live + */ + def start(context: SparkContext, + appId: ApplicationId, + attemptId: Option[ApplicationAttemptId]): Boolean = { + require(context != null, "Null context parameter") + bindToYarnApplication(appId, attemptId) + + val yarnConf = new YarnConfiguration(context.hadoopConfiguration) + // the init() operation checks the state machine & prevents invocation out of sequence + init(yarnConf) + this.sparkContext = context + + // work out the attempt ID from the YARN attempt ID. No attempt, assume "1". + // this is assumed by the AM, which uses it when creating a path to an attempt + val attempt1 = attemptId match { + case Some(attempt) => attempt.getAttemptId.toString + case None => CLIENT_BACKEND_ATTEMPT_ID + } + setContextAppAndAttemptInfo(Some(appId.toString), Some(attempt1)) + batchSize = sparkContext.conf.getInt(BATCH_SIZE, batchSize) + + start() + if (timelineServiceEnabled) { + true + } else { + logInfo("Yarn timeline service not available, disabling client.") + false + } + } + + /** + * Service start. + * + * If the timeline client is enabled, + * Create the timeline client, the timeline domain, and the event handling thread. + * + * Irrespective of the timeline enabled flag, the service will attempt to register + * as a listener for events. They will merely be discarded. + */ + override protected def serviceStart { + require(sparkContext != null, "No spark context set") + val conf: Configuration = getConfig + if (timelineServiceEnabled) { + timelineWebappAddress = getTimelineEndpoint(conf) + logInfo(s"Starting $this") + logInfo(s"Spark events will be published to the Timeline service at $timelineWebappAddress") + timelineClient = Some(createTimelineClient) + domainId = createTimelineDomain + // declare that the processing is started + stopped.set(false) + eventHandlingThread = Some(new Thread(new Dequeue(), "HistoryEventHandlingThread")) + eventHandlingThread.get.start + } else { + logInfo("Timeline service is disabled") + } + // irrespective of state, hook up to the listener + val registered = registerListener + if (registered) { + logInfo(s"History Service listening for events: $this") + } else { + logInfo(s"History Service is not listening for events: $this") + } + super.serviceStart() + } + + /** + * Check the service configuration to see if the timeline service is enabled + * @return true if `YarnConfiguration.TIMELINE_SERVICE_ENABLED` + * is set. + */ + def timelineServiceEnabled: Boolean = { + YarnTimelineUtils.timelineServiceEnabled(getConfig) + } + + /** + * Return a summary of the service state to help diagnose problems + * during test runs, possibly even production + * @return a summary of the current service state + */ + override def toString: String = { + s"YarnHistoryService for application $applicationId attempt $attemptId;" + + s" endpoint=$timelineWebappAddress;" + + s" bonded to ATS=$bondedToATS;" + + s" listening=$listening;" + + s" batchSize=$batchSize;" + + s" flush count=$getFlushCount;" + + s" current queue size=$getQueueSize;" + + s" total number queued=$getEventsQueued, processed=$getEventsProcessed;" + + s" post failures=$getEventPostFailures;" + } + + /** + * Is the service listening to events from the spark context? + * @return true if it has registered as a listener + */ + def listening: Boolean = { + listener.isDefined + } + + /** + * Is the service hooked up to an ATS server. This does not + * check the validity of the link, only whether or not the service + * has been set up to talk to ATS. + * @return true if the service has a timeline client + */ + def bondedToATS: Boolean = { + timelineClient.isDefined + } + + /** + * Set the YARN binding information. This is called during startup. It is private + * to the package so that tests may update this data + * @param appId YARN application ID + * @param attemptId optional attempt ID + */ + private[yarn] def bindToYarnApplication(appId: ApplicationId, + attemptId: Option[ApplicationAttemptId]): Unit = { + require(appId != null, "Null appId parameter") + this._applicationId = appId + this._attemptId = attemptId + } + + /** + * Set the "spark" application and attempt information -the information + * provided in the start event. The attempt ID here may be `None`; even + * if set it may only be unique amongst the attempts of this application. + * That is: not unique enough to be used as the entity ID + * @param appId application ID + * @param attemptId attempt ID + */ + private def setContextAppAndAttemptInfo(appId: Option[String], + attemptId: Option[String]): Unit = { + logDebug(s"Setting application ID to $appId; attempt ID to $attemptId") + sparkApplicationId = appId + sparkApplicationAttemptId = attemptId + } + + /** + * Add the listener if it is not disabled. + * This is accessible in the same package purely for testing + * @return true if the register was enabled + */ + private [yarn] def registerListener: Boolean = { + assert(sparkContext != null, "Null context") + if (sparkContext.conf.getBoolean(REGISTER_LISTENER, true)) { + logDebug("Registering listener to spark context") + val l = new YarnEventListener(sparkContext, this) + listener = Some(l) + sparkContext.listenerBus.addListener(l) + true + } else { + false + } + } + + /** + * Queue an action, or if the service's `stopped` flag + * is set, discard it + * @param action action to process + * @return true if the event was queued + */ + def enqueue(action: QueuedAction): Boolean = { + if (!stopped.get) { + innerEnqueue(action) + true + } else { + if (timelineServiceEnabled) { + // if the timeline service was ever enabled, log the fact the event + // is being discarded. Don't do this if it was not, as it will + // only make the logs noisy. + logInfo(s"History service stopped; ignoring queued event : $action") + } + false + } + } + + /** + * Inner operation to queue the event. This does not check for service state + * @param event event to queue + */ + private def innerEnqueue(event: QueuedAction) = { + eventsQueued.incrementAndGet + logDebug(s"Enqueue $event") + actionQueue.add(event) + } + + /** + * Stop the service; this triggers flushing the queue and, if not already processed, + * a pushing out of an application end event. + * + * This operation will block for up to `maxTimeToWaitOnShutdown` milliseconds + * to await the asynchronous action queue completing. + */ + override protected def serviceStop { + // if the queue is live + if (!stopped.get) { + + if (!appEndEventProcessed.get()) { + // push out an application stop event if none has been received + logDebug("Generating a SparkListenerApplicationEnd during service stop()") + val current = now() + innerEnqueue(new HandleSparkEvent(SparkListenerApplicationEnd(current), current)) + } + + // flush + logInfo(s"Shutting down: pushing out ${actionQueue.size} events") + innerEnqueue(FlushTimelineEvents()) + + // stop operation + postThreadActive synchronized { + innerEnqueue(StopQueue()) + // now await that marker flag + postThreadActive.wait(maxTimeToWaitOnShutdown) + } + + if (!actionQueue.isEmpty) { + // likely cause is ATS not responding. + // interrupt the thread, albeit at the risk of app stop events + // being lost. There's not much else that can be done if + // ATS is being unresponsive + logWarning(s"Did not finish flushing actionQueue before " + + s"stopping ATSService, eventQueueBacklog= ${actionQueue.size}") + eventHandlingThread.foreach(_.interrupt()) + } + stopTimelineClient + logInfo(s"Stopped: $this") + } + } + + /** + * Get the current entity, creating one on demand if needed, setting up its entity Id + * from the current appID and attempt ID values, filters from the current name and user + * @return the current entity + */ + private def getOrCreateCurrentEntity() = { + curEntity.getOrElse { + currentEventCount = 0 + val entity = createTimelineEntity( + _applicationId, + _attemptId, + sparkApplicationId, + sparkApplicationAttemptId, + appStartEventProcessed.get(), + appName, + userName) + curEntity = Some(entity) + logDebug(s"Demand creation of new entity ${describeEntity(entity)}") + entity + } + } + + /** + * Perform any preflight checks. + * @param entity timeline entity to review. + */ + private[yarn] def preflightCheck(entity: TimelineEntity): Unit = { + require(entity.getStartTime != null, + s"No start time in ${describeEntity(entity)}") + } + + /** + * If there is any available entity to be sent, push to timeline server + */ + private def flushEntity(): Unit = { + if (entityList.nonEmpty) { + val count = flushCount.incrementAndGet() + logDebug(s"flushEntity #$count: list size ${entityList.size}") + var client = getTimelineClient + val lastUpdated = now() + // attempt to post each entity, removing from the list all of those + // which were successfully posted. The remainder are retained for + // followon attempts + entityList = entityList.filter { entity => { + if (entity == null) { + false + } else { + entity.addOtherInfo(FIELD_LAST_UPDATED, lastUpdated) + if (domainId != null) { + entity.setDomainId(domainId) + } + val entityDescription = describeEntity(entity) + logDebug(s"About to POST $entityDescription") + try { + val response: TimelinePutResponse = client.putEntities(entity) + val errors = response.getErrors + if (!response.getErrors.isEmpty) { + val err = errors.get(0) + eventPostFailures.addAndGet(errors.size()) + if (err.getErrorCode != 0) { + logError(s"Failed to post ${entityDescription}\n:${describeError(err)}") + } + } else { + // successful submission + logDebug(s"entity successfully posted: $entityDescription") + } + // whatever the outcome, this request is not re-issued + false + } catch { + case e: ConnectException => + eventPostFailures.incrementAndGet + logWarning(s"Connection exception submitting $entityDescription\n$e", e) + true + + case e: RuntimeException => + // this is probably a retry timeout event; some Hadoop versions don't + // rethrow the exception causing the problem, instead raising an RTE + eventPostFailures.incrementAndGet + logWarning(s"Runtime exception submitting $entityDescription\n$e", e) + // same policy as before: retry on these + true + + case e: Exception => + // something else has gone wrong. + eventPostFailures.incrementAndGet + logWarning(s"Could not handle history entity: $entityDescription\n$e", e) + false + } + } + } + } + logDebug(s"after pushEntities: ${entityList.size}") + } + } + + /** + * Process the action placed in the queue + * @param action action + * @return true if the queue processor must now exit + */ + private def processAction(action: QueuedAction): Boolean = { + action match { + + case StopQueue() => + logDebug("Stop queue action received") + true + + case event: HandleSparkEvent => + try { + handleEvent(event) + false + } catch { + case NonFatal(e) => false + } + + case FlushTimelineEvents() => + logDebug("Flush queue action received") + flushCurrentEventsToATS + false + } + } + + /** + * If the event reaches the batch size or flush is true, push events to ATS. + * + * @param event event. If null no event is queued, but the post-queue flush logic + * still applies + * @return true if the event should stop the service. + */ + private def handleEvent(event: HandleSparkEvent): Unit = { + var push = false + // if we receive a new appStart event, we always push + // not much contention here, only happens when service is stopped + entityLock synchronized { + logDebug(s"Processing event $event") + if (eventsProcessed.get() % 1000 == 0) { + logDebug(s"${eventsProcessed} events are processed") + } + val sparkEvent = event.sparkEvent + sparkEvent match { + case start: SparkListenerApplicationStart => + // we already have all information, + // flush it for old one to switch to new one + logDebug(s"Handling application start event: $event") + appName = start.appName + userName = start.sparkUser + startTime = start.time + if (startTime == 0) { + startTime = now() + } + setContextAppAndAttemptInfo( + start.appId, + start.appAttemptId) + // flush old entity + resetCurrentEntity + appStartEventProcessed.set(true) + appEndEventProcessed.set(false) + // force create the new one + val en = started(getOrCreateCurrentEntity, startTime) --- End diff -- cutting
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org