Repository: spark Updated Branches: refs/heads/master 6f671d04f -> b92d823ad
http://git-wip-us.apache.org/repos/asf/spark/blob/b92d823a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 3474112..d162b4c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -19,22 +19,21 @@ package org.apache.spark.scheduler.cluster import org.apache.spark._ import org.apache.hadoop.conf.Configuration -import org.apache.spark.deploy.yarn.YarnAllocationHandler +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils /** - * - * This scheduler launches executors through Yarn - by calling into Client to launch ExecutorLauncher as AM. + * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM. */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { +private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) + extends TaskSchedulerImpl(sc) { def this(sc: SparkContext) = this(sc, new Configuration()) // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 - val retval = YarnAllocationHandler.lookupRack(conf, host) - if (retval != null) Some(retval) else None + Option(YarnSparkHadoopUtil.lookupRack(conf, host)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/b92d823a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 833e249..a5f537d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} -import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil} +import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl import scala.collection.mutable.ArrayBuffer @@ -60,10 +60,7 @@ private[spark] class YarnClientSchedulerBackend( val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( - "--class", "notused", - "--jar", null, // The primary jar will be added dynamically in SparkContext. - "--args", hostport, - "--am-class", classOf[ExecutorLauncher].getName + "--args", hostport ) // process any optional arguments, given either as environment variables http://git-wip-us.apache.org/repos/asf/spark/blob/b92d823a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 9aeca4a..69f4022 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -18,16 +18,17 @@ package org.apache.spark.scheduler.cluster import org.apache.spark._ -import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler} +import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration /** - * - * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done + * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of + * ApplicationMaster, etc is done */ -private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { +private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) + extends TaskSchedulerImpl(sc) { logInfo("Created YarnClusterScheduler") @@ -42,7 +43,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) // By default, rack is unknown override def getRackForHost(hostPort: String): Option[String] = { val host = Utils.parseHostPort(hostPort)._1 - val retval = YarnAllocationHandler.lookupRack(conf, host) + val retval = YarnSparkHadoopUtil.lookupRack(conf, host) if (retval != null) Some(retval) else None } @@ -51,4 +52,10 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } + + override def stop() { + super.stop() + ApplicationMaster.sparkContextStopped(sc) + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/b92d823a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala deleted file mode 100644 index 1c4005f..0000000 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ /dev/null @@ -1,413 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.IOException -import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.atomic.AtomicReference - -import scala.collection.JavaConversions._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.util.ShutdownHookManager -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.hadoop.yarn.webapp.util.WebAppUtils - -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.util.{SignalLogger, Utils} - - -/** - * An application master that runs the user's driver program and allocates executors. - */ -class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, - sparkConf: SparkConf) extends Logging { - - def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = - this(args, new Configuration(), sparkConf) - - def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) - - private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - private var appAttemptId: ApplicationAttemptId = _ - private var userThread: Thread = _ - private val fs = FileSystem.get(yarnConf) - - private var yarnAllocator: YarnAllocationHandler = _ - private var isFinished: Boolean = false - private var uiAddress: String = _ - private var uiHistoryAddress: String = _ - private val maxAppAttempts: Int = conf.getInt( - YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) - private var isLastAMRetry: Boolean = true - private var amClient: AMRMClient[ContainerRequest] = _ - - // Default to numExecutors * 2, with minimum of 3 - private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", - sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) - - private var registered = false - - def run() { - // Set the web ui port to be ephemeral for yarn so we don't conflict with - // other spark processes running on the same box - System.setProperty("spark.ui.port", "0") - - // When running the AM, the Spark master is always "yarn-cluster" - System.setProperty("spark.master", "yarn-cluster") - - // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using. - ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) - - appAttemptId = ApplicationMaster.getApplicationAttemptId() - logInfo("ApplicationAttemptId: " + appAttemptId) - isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts - amClient = AMRMClient.createAMRMClient() - amClient.init(yarnConf) - amClient.start() - - // setup AmIpFilter for the SparkUI - do this before we start the UI - addAmIpFilter() - - ApplicationMaster.register(this) - - // Call this to force generation of secret so it gets populated into the - // Hadoop UGI. This has to happen before the startUserClass which does a - // doAs in order for the credentials to be passed on to the executor containers. - val securityMgr = new SecurityManager(sparkConf) - - // Start the user's JAR - userThread = startUserClass() - - // This a bit hacky, but we need to wait until the spark.driver.port property has - // been set by the Thread executing the user class. - waitForSparkContextInitialized() - - // Do this after Spark master is up and SparkContext is created so that we can register UI Url. - synchronized { - if (!isFinished) { - registerApplicationMaster() - registered = true - } - } - - // Allocate all containers - allocateExecutors() - - // Launch thread that will heartbeat to the RM so it won't think the app has died. - launchReporterThread() - - // Wait for the user class to finish - userThread.join() - - System.exit(0) - } - - // add the yarn amIpFilter that Yarn requires for properly securing the UI - private def addAmIpFilter() { - val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" - System.setProperty("spark.ui.filters", amFilter) - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts : Array[String] = proxy.split(":") - val uriBase = "http://" + proxy + - System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) - - val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase - System.setProperty( - "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params) - } - - private def registerApplicationMaster(): RegisterApplicationMasterResponse = { - logInfo("Registering the ApplicationMaster") - amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) - } - - private def startUserClass(): Thread = { - logInfo("Starting the user JAR in a separate Thread") - System.setProperty("spark.executor.instances", args.numExecutors.toString) - val mainMethod = Class.forName( - args.userClass, - false, - Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) - val t = new Thread { - override def run() { - var succeeded = false - try { - // Copy - val mainArgs = new Array[String](args.userArgs.size) - args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) - mainMethod.invoke(null, mainArgs) - // Some apps have "System.exit(0)" at the end. The user thread will stop here unless - // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. - succeeded = true - } finally { - logDebug("Finishing main") - isLastAMRetry = true - if (succeeded) { - ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) - } else { - ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED) - } - } - } - } - t.setName("Driver") - t.start() - t - } - - // This needs to happen before allocateExecutors() - private def waitForSparkContextInitialized() { - logInfo("Waiting for Spark context initialization") - try { - var sparkContext: SparkContext = null - ApplicationMaster.sparkContextRef.synchronized { - var numTries = 0 - val waitTime = 10000L - val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) - while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries - && !isFinished) { - logInfo("Waiting for Spark context initialization ... " + numTries) - numTries = numTries + 1 - ApplicationMaster.sparkContextRef.wait(waitTime) - } - sparkContext = ApplicationMaster.sparkContextRef.get() - assert(sparkContext != null || numTries >= maxNumTries) - - if (sparkContext != null) { - uiAddress = sparkContext.ui.appUIHostPort - uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf) - this.yarnAllocator = YarnAllocationHandler.newAllocator( - yarnConf, - amClient, - appAttemptId, - args, - sparkContext.preferredNodeLocationData, - sparkContext.getConf) - } else { - logWarning("Unable to retrieve SparkContext in spite of waiting for %d, maxNumTries = %d". - format(numTries * waitTime, maxNumTries)) - this.yarnAllocator = YarnAllocationHandler.newAllocator( - yarnConf, - amClient, - appAttemptId, - args, - sparkContext.getConf) - } - } - } - } - - private def allocateExecutors() { - try { - logInfo("Requesting" + args.numExecutors + " executors.") - // Wait until all containers have launched - yarnAllocator.addResourceRequests(args.numExecutors) - yarnAllocator.allocateResources() - // Exits the loop if the user thread exits. - - while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive - && !isFinished) { - checkNumExecutorsFailed() - allocateMissingExecutor() - yarnAllocator.allocateResources() - Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL) - } - } - logInfo("All executors have launched.") - } - - private def allocateMissingExecutor() { - val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - - yarnAllocator.getNumPendingAllocate - if (missingExecutorCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingExecutorCount)) - yarnAllocator.addResourceRequests(missingExecutorCount) - } - } - - private def checkNumExecutorsFailed() { - if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - logInfo("max number of executor failures reached") - finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of executor failures reached") - // make sure to stop the user thread - val sparkContext = ApplicationMaster.sparkContextRef.get() - if (sparkContext != null) { - logInfo("Invoking sc stop from checkNumExecutorsFailed") - sparkContext.stop() - } else { - logError("sparkContext is null when should shutdown") - } - } - } - - private def launchReporterThread(): Thread = { - // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. - val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - - // we want to be reasonably responsive without causing too many requests to RM. - val schedulerInterval = - sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) - - // must be <= timeoutInterval / 2. - val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval)) - - val t = new Thread { - override def run() { - while (userThread.isAlive && !isFinished) { - checkNumExecutorsFailed() - allocateMissingExecutor() - logDebug("Sending progress") - yarnAllocator.allocateResources() - Thread.sleep(interval) - } - } - } - // Setting to daemon status, though this is usually not a good idea. - t.setDaemon(true) - t.start() - logInfo("Started progress reporter thread - heartbeat interval : " + interval) - t - } - - def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") { - synchronized { - if (isFinished) { - return - } - isFinished = true - - logInfo("Unregistering ApplicationMaster with " + status) - if (registered) { - amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) - } - } - } - - /** - * Clean up the staging directory. - */ - private def cleanupStagingDir() { - var stagingDirPath: Path = null - try { - val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean - if (!preserveFiles) { - stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) - if (stagingDirPath == null) { - logError("Staging directory is null") - return - } - logInfo("Deleting staging directory " + stagingDirPath) - fs.delete(stagingDirPath, true) - } - } catch { - case ioe: IOException => - logError("Failed to cleanup staging dir " + stagingDirPath, ioe) - } - } - - // The shutdown hook that runs when a signal is received AND during normal close of the JVM. - class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable { - - def run() { - logInfo("AppMaster received a signal.") - // We need to clean up staging dir before HDFS is shut down - // make sure we don't delete it until this is the last AM - if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() - } - } - -} - -object ApplicationMaster extends Logging { - // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be - // optimal as more containers are available. Might need to handle this better. - private val ALLOCATE_HEARTBEAT_INTERVAL = 100 - - private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]() - - val sparkContextRef: AtomicReference[SparkContext] = - new AtomicReference[SparkContext](null) - - def register(master: ApplicationMaster) { - applicationMasters.add(master) - } - - /** - * Called from YarnClusterScheduler to notify the AM code that a SparkContext has been - * initialized in the user code. - */ - def sparkContextInitialized(sc: SparkContext): Boolean = { - var modified = false - sparkContextRef.synchronized { - modified = sparkContextRef.compareAndSet(null, sc) - sparkContextRef.notifyAll() - } - - // Add a shutdown hook - as a best effort in case users do not call sc.stop or do - // System.exit. - // Should not really have to do this, but it helps YARN to evict resources earlier. - // Not to mention, prevent the Client from declaring failure even though we exited properly. - // Note that this will unfortunately not properly clean up the staging files because it gets - // called too late, after the filesystem is already shutdown. - if (modified) { - Runtime.getRuntime().addShutdownHook(new Thread with Logging { - // This is not only logs, but also ensures that log system is initialized for this instance - // when we are actually 'run'-ing. - logInfo("Adding shutdown hook for context " + sc) - - override def run() { - logInfo("Invoking sc stop from shutdown hook") - sc.stop() - // Best case ... - for (master <- applicationMasters) { - master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) - } - } - }) - } - - // Wait for initialization to complete and at least 'some' nodes to get allocated. - modified - } - - def getApplicationAttemptId(): ApplicationAttemptId = { - val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) - val containerId = ConverterUtils.toContainerId(containerIdString) - val appAttemptId = containerId.getApplicationAttemptId() - appAttemptId - } - - def main(argStrings: Array[String]) { - SignalLogger.register(log) - val args = new ApplicationMasterArguments(argStrings) - SparkHadoopUtil.get.runAsSparkUser { () => - new ApplicationMaster(args).run() - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/b92d823a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala deleted file mode 100644 index e093fe4..0000000 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.Socket -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import akka.actor._ -import akka.remote._ -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} -import org.apache.spark.util.{Utils, AkkaUtils} -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter -import org.apache.spark.scheduler.SplitInfo -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.hadoop.yarn.webapp.util.WebAppUtils - -/** - * An application master that allocates executors on behalf of a driver that is running outside - * the cluster. - * - * This is used only in yarn-client mode. - */ -class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) - extends Logging { - - def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = - this(args, new Configuration(), sparkConf) - - def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) - - private var appAttemptId: ApplicationAttemptId = _ - private var reporterThread: Thread = _ - private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - - private var yarnAllocator: YarnAllocationHandler = _ - private var driverClosed: Boolean = false - private var isFinished: Boolean = false - private var registered: Boolean = false - - private var amClient: AMRMClient[ContainerRequest] = _ - - // Default to numExecutors * 2, with minimum of 3 - private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", - sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) - - val securityManager = new SecurityManager(sparkConf) - val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, - conf = sparkConf, securityManager = securityManager)._1 - var actor: ActorRef = _ - - // This actor just working as a monitor to watch on Driver Actor. - class MonitorActor(driverUrl: String) extends Actor { - - var driver: ActorSelection = _ - - override def preStart() { - logInfo("Listen to driver: " + driverUrl) - driver = context.actorSelection(driverUrl) - // Send a hello message to establish the connection, after which - // we can monitor Lifecycle Events. - driver ! "Hello" - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) - } - - override def receive = { - case x: DisassociatedEvent => - logInfo(s"Driver terminated or disconnected! Shutting down. $x") - driverClosed = true - case x: AddWebUIFilter => - logInfo(s"Add WebUI Filter. $x") - driver ! x - } - } - - def run() { - amClient = AMRMClient.createAMRMClient() - amClient.init(yarnConf) - amClient.start() - - appAttemptId = ApplicationMaster.getApplicationAttemptId() - synchronized { - if (!isFinished) { - registerApplicationMaster() - registered = true - } - } - - waitForSparkMaster() - addAmIpFilter() - - // Allocate all containers - allocateExecutors() - - // Launch a progress reporter thread, else app will get killed after expiration - // (def: 10mins) timeout ensure that progress is sent before - // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. - - val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // we want to be reasonably responsive without causing too many requests to RM. - val schedulerInterval = - System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong - // must be <= timeoutInterval / 2. - val interval = math.min(timeoutInterval / 2, schedulerInterval) - - reporterThread = launchReporterThread(interval) - - - // Wait for the reporter thread to Finish. - reporterThread.join() - - finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) - actorSystem.shutdown() - - logInfo("Exited") - System.exit(0) - } - - private def registerApplicationMaster(): RegisterApplicationMasterResponse = { - val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "") - logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress") - amClient.registerApplicationMaster(Utils.localHostName(), 0, appUIAddress) - } - - // add the yarn amIpFilter that Yarn requires for properly securing the UI - private def addAmIpFilter() { - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts = proxy.split(":") - val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) - val uriBase = "http://" + proxy + proxyBase - val amFilter = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase - val amFilterName = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" - actor ! AddWebUIFilter(amFilterName, amFilter, proxyBase) - } - - private def waitForSparkMaster() { - logInfo("Waiting for Spark driver to be reachable.") - var driverUp = false - val hostport = args.userArgs(0) - val (driverHost, driverPort) = Utils.parseHostPort(hostport) - while(!driverUp) { - try { - val socket = new Socket(driverHost, driverPort) - socket.close() - logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) - driverUp = true - } catch { - case e: Exception => - logError("Failed to connect to driver at %s:%s, retrying ...". - format(driverHost, driverPort)) - Thread.sleep(100) - } - } - sparkConf.set("spark.driver.host", driverHost) - sparkConf.set("spark.driver.port", driverPort.toString) - - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( - SparkEnv.driverActorSystemName, - driverHost, - driverPort.toString, - CoarseGrainedSchedulerBackend.ACTOR_NAME) - - actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") - } - - - private def allocateExecutors() { - // TODO: should get preferredNodeLocationData from SparkContext, just fake a empty one for now. - val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = - scala.collection.immutable.Map() - - yarnAllocator = YarnAllocationHandler.newAllocator( - yarnConf, - amClient, - appAttemptId, - args, - preferredNodeLocationData, - sparkConf) - - logInfo("Requesting " + args.numExecutors + " executors.") - // Wait until all containers have launched - yarnAllocator.addResourceRequests(args.numExecutors) - yarnAllocator.allocateResources() - while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) && - !isFinished) { - checkNumExecutorsFailed() - allocateMissingExecutor() - yarnAllocator.allocateResources() - Thread.sleep(100) - } - - logInfo("All executors have launched.") - } - - private def allocateMissingExecutor() { - val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - - yarnAllocator.getNumPendingAllocate - if (missingExecutorCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingExecutorCount)) - yarnAllocator.addResourceRequests(missingExecutorCount) - } - } - - private def checkNumExecutorsFailed() { - if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of executor failures reached") - } - } - - private def launchReporterThread(_sleepTime: Long): Thread = { - val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime - - val t = new Thread { - override def run() { - while (!driverClosed && !isFinished) { - checkNumExecutorsFailed() - allocateMissingExecutor() - logDebug("Sending progress") - yarnAllocator.allocateResources() - Thread.sleep(sleepTime) - } - } - } - // setting to daemon status, though this is usually not a good idea. - t.setDaemon(true) - t.start() - logInfo("Started progress reporter thread - sleep time : " + sleepTime) - t - } - - def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") { - synchronized { - if (isFinished) { - return - } - logInfo("Unregistering ApplicationMaster with " + status) - if (registered) { - val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "") - amClient.unregisterApplicationMaster(status, appMessage, trackingUrl) - } - isFinished = true - } - } - -} - -object ExecutorLauncher { - def main(argStrings: Array[String]) { - val args = new ApplicationMasterArguments(argStrings) - SparkHadoopUtil.get.runAsSparkUser { () => - new ExecutorLauncher(args).run() - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/b92d823a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 0a46174..4d51449 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -17,12 +17,9 @@ package org.apache.spark.deploy.yarn -import java.lang.{Boolean => JBoolean} -import java.util.{Collections, Set => JSet} import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} import java.util.concurrent.atomic.AtomicInteger -import scala.collection import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -32,20 +29,13 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol import org.apache.hadoop.yarn.api.records.ApplicationAttemptId -import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus} +import org.apache.hadoop.yarn.api.records.{Container, ContainerId} import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest} import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse} import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.util.{RackResolver, Records} - - -object AllocationType extends Enumeration { - type AllocationType = Value - val HOST, RACK, ANY = Value -} +import org.apache.hadoop.yarn.util.Records // TODO: // Too many params. @@ -61,16 +51,14 @@ object AllocationType extends Enumeration { * Acquires resources for executors from a ResourceManager and launches executors in new containers. */ private[yarn] class YarnAllocationHandler( - val conf: Configuration, - val amClient: AMRMClient[ContainerRequest], - val appAttemptId: ApplicationAttemptId, - val maxExecutors: Int, - val executorMemory: Int, - val executorCores: Int, - val preferredHostToCount: Map[String, Int], - val preferredRackToCount: Map[String, Int], - val sparkConf: SparkConf) - extends Logging { + conf: Configuration, + sparkConf: SparkConf, + amClient: AMRMClient[ContainerRequest], + appAttemptId: ApplicationAttemptId, + args: ApplicationMasterArguments, + preferredNodes: collection.Map[String, collection.Set[SplitInfo]]) + extends YarnAllocator with Logging { + // These three are locked on allocatedHostToContainersMap. Complementary data structures // allocatedHostToContainersMap : containers which are running : host, Set<containerid> // allocatedContainerToHostMap: container to host mapping. @@ -92,7 +80,7 @@ private[yarn] class YarnAllocationHandler( // Additional memory overhead - in mb. private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - YarnAllocationHandler.MEMORY_OVERHEAD) + YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. @@ -103,11 +91,15 @@ private[yarn] class YarnAllocationHandler( private val lastResponseId = new AtomicInteger() private val numExecutorsFailed = new AtomicInteger() - def getNumPendingAllocate: Int = numPendingAllocate.intValue + private val maxExecutors = args.numExecutors + private val executorMemory = args.executorMemory + private val executorCores = args.executorCores + private val (preferredHostToCount, preferredRackToCount) = + generateNodeToWeight(conf, preferredNodes) - def getNumExecutorsRunning: Int = numExecutorsRunning.intValue + override def getNumExecutorsRunning: Int = numExecutorsRunning.intValue - def getNumExecutorsFailed: Int = numExecutorsFailed.intValue + override def getNumExecutorsFailed: Int = numExecutorsFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { container.getResource.getMemory >= (executorMemory + memoryOverhead) @@ -119,7 +111,9 @@ private[yarn] class YarnAllocationHandler( amClient.releaseAssignedContainer(containerId) } - def allocateResources() { + override def allocateResources() = { + addResourceRequests(maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()) + // We have already set the container request. Poll the ResourceManager for a response. // This doubles as a heartbeat if there are no pending container requests. val progressIndicator = 0.1f @@ -204,7 +198,7 @@ private[yarn] class YarnAllocationHandler( // For rack local containers if (remainingContainers != null) { - val rack = YarnAllocationHandler.lookupRack(conf, candidateHost) + val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost) if (rack != null) { val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0) val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - @@ -273,7 +267,7 @@ private[yarn] class YarnAllocationHandler( // To be safe, remove the container from `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) - val rack = YarnAllocationHandler.lookupRack(conf, executorHostname) + val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname) allocatedHostToContainersMap.synchronized { val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]()) @@ -360,7 +354,7 @@ private[yarn] class YarnAllocationHandler( allocatedContainerToHostMap.remove(containerId) // TODO: Move this part outside the synchronized block? - val rack = YarnAllocationHandler.lookupRack(conf, host) + val rack = YarnSparkHadoopUtil.lookupRack(conf, host) if (rack != null) { val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1 if (rackCount > 0) { @@ -393,9 +387,9 @@ private[yarn] class YarnAllocationHandler( for (container <- hostContainers) { val candidateHost = container.getNodes.last - assert(YarnAllocationHandler.ANY_HOST != candidateHost) + assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost) - val rack = YarnAllocationHandler.lookupRack(conf, candidateHost) + val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost) if (rack != null) { var count = rackToCounts.getOrElse(rack, 0) count += 1 @@ -409,7 +403,7 @@ private[yarn] class YarnAllocationHandler( AllocationType.RACK, rack, count, - YarnAllocationHandler.PRIORITY) + YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) } requestedContainers @@ -431,7 +425,7 @@ private[yarn] class YarnAllocationHandler( retval } - def addResourceRequests(numExecutors: Int) { + private def addResourceRequests(numExecutors: Int) { val containerRequests: List[ContainerRequest] = if (numExecutors <= 0 || preferredHostToCount.isEmpty) { logDebug("numExecutors: " + numExecutors + ", host preferences: " + @@ -440,9 +434,9 @@ private[yarn] class YarnAllocationHandler( AllocationType.ANY, resource = null, numExecutors, - YarnAllocationHandler.PRIORITY).toList + YarnSparkHadoopUtil.RM_REQUEST_PRIORITY).toList } else { - // Request for all hosts in preferred nodes and for numExecutors - + // Request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size) for ((candidateHost, candidateCount) <- preferredHostToCount) { @@ -453,7 +447,7 @@ private[yarn] class YarnAllocationHandler( AllocationType.HOST, candidateHost, requiredCount, - YarnAllocationHandler.PRIORITY) + YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) } } val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests( @@ -463,7 +457,7 @@ private[yarn] class YarnAllocationHandler( AllocationType.ANY, resource = null, numExecutors, - YarnAllocationHandler.PRIORITY) + YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) val containerRequestBuffer = new ArrayBuffer[ContainerRequest]( hostContainerRequests.size + rackContainerRequests.size() + anyContainerRequests.size) @@ -512,7 +506,7 @@ private[yarn] class YarnAllocationHandler( // There must be a third request, which is ANY. That will be specially handled. requestType match { case AllocationType.HOST => { - assert(YarnAllocationHandler.ANY_HOST != resource) + assert(YarnSparkHadoopUtil.ANY_HOST != resource) val hostname = resource val nodeLocal = constructContainerRequests( Array(hostname), @@ -521,7 +515,7 @@ private[yarn] class YarnAllocationHandler( priority) // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler. - YarnAllocationHandler.populateRackInfo(conf, hostname) + YarnSparkHadoopUtil.populateRackInfo(conf, hostname) nodeLocal } case AllocationType.RACK => { @@ -554,88 +548,6 @@ private[yarn] class YarnAllocationHandler( } requests } -} - -object YarnAllocationHandler { - - val ANY_HOST = "*" - // All requests are issued with same priority : we do not (yet) have any distinction between - // request types (like map/reduce in hadoop for example) - val PRIORITY = 1 - - // Additional memory overhead - in mb. - val MEMORY_OVERHEAD = 384 - - // Host to rack map - saved from allocation requests. We are expecting this not to change. - // Note that it is possible for this to change : and ResurceManager will indicate that to us via - // update response to allocate. But we are punting on handling that for now. - private val hostToRack = new ConcurrentHashMap[String, String]() - private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]() - - - def newAllocator( - conf: Configuration, - amClient: AMRMClient[ContainerRequest], - appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments, - sparkConf: SparkConf - ): YarnAllocationHandler = { - new YarnAllocationHandler( - conf, - amClient, - appAttemptId, - args.numExecutors, - args.executorMemory, - args.executorCores, - Map[String, Int](), - Map[String, Int](), - sparkConf) - } - - def newAllocator( - conf: Configuration, - amClient: AMRMClient[ContainerRequest], - appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments, - map: collection.Map[String, - collection.Set[SplitInfo]], - sparkConf: SparkConf - ): YarnAllocationHandler = { - val (hostToSplitCount, rackToSplitCount) = generateNodeToWeight(conf, map) - new YarnAllocationHandler( - conf, - amClient, - appAttemptId, - args.numExecutors, - args.executorMemory, - args.executorCores, - hostToSplitCount, - rackToSplitCount, - sparkConf) - } - - def newAllocator( - conf: Configuration, - amClient: AMRMClient[ContainerRequest], - appAttemptId: ApplicationAttemptId, - maxExecutors: Int, - executorMemory: Int, - executorCores: Int, - map: collection.Map[String, collection.Set[SplitInfo]], - sparkConf: SparkConf - ): YarnAllocationHandler = { - val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) - new YarnAllocationHandler( - conf, - amClient, - appAttemptId, - maxExecutors, - executorMemory, - executorCores, - hostToCount, - rackToCount, - sparkConf) - } // A simple method to copy the split info map. private def generateNodeToWeight( @@ -654,7 +566,7 @@ object YarnAllocationHandler { val hostCount = hostToCount.getOrElse(host, 0) hostToCount.put(host, hostCount + splits.size) - val rack = lookupRack(conf, host) + val rack = YarnSparkHadoopUtil.lookupRack(conf, host) if (rack != null){ val rackCount = rackToCount.getOrElse(host, 0) rackToCount.put(host, rackCount + splits.size) @@ -664,42 +576,4 @@ object YarnAllocationHandler { (hostToCount.toMap, rackToCount.toMap) } - def lookupRack(conf: Configuration, host: String): String = { - if (!hostToRack.contains(host)) { - populateRackInfo(conf, host) - } - hostToRack.get(host) - } - - def fetchCachedHostsForRack(rack: String): Option[Set[String]] = { - Option(rackToHostSet.get(rack)).map { set => - val convertedSet: collection.mutable.Set[String] = set - // TODO: Better way to get a Set[String] from JSet. - convertedSet.toSet - } - } - - def populateRackInfo(conf: Configuration, hostname: String) { - Utils.checkHost(hostname) - - if (!hostToRack.containsKey(hostname)) { - // If there are repeated failures to resolve, all to an ignore list. - val rackInfo = RackResolver.resolve(conf, hostname) - if (rackInfo != null && rackInfo.getNetworkLocation != null) { - val rack = rackInfo.getNetworkLocation - hostToRack.put(hostname, rack) - if (! rackToHostSet.containsKey(rack)) { - rackToHostSet.putIfAbsent(rack, - Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]())) - } - rackToHostSet.get(rack).add(hostname) - - // TODO(harvey): Figure out what this comment means... - // Since RackResolver caches, we are disabling this for now ... - } /* else { - // right ? Else we will keep calling rack resolver in case we cant resolve rack info ... - hostToRack.put(hostname, null) - } */ - } - } } http://git-wip-us.apache.org/repos/asf/spark/blob/b92d823a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala new file mode 100644 index 0000000..e8b8d9b --- /dev/null +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.{Map, Set} + +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.protocolrecords._ +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.ConverterUtils +import org.apache.hadoop.yarn.webapp.util.WebAppUtils + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.scheduler.SplitInfo +import org.apache.spark.util.Utils + + +/** + * YarnRMClient implementation for the Yarn stable API. + */ +private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging { + + private var amClient: AMRMClient[ContainerRequest] = _ + private var uiHistoryAddress: String = _ + + override def register( + conf: YarnConfiguration, + sparkConf: SparkConf, + preferredNodeLocations: Map[String, Set[SplitInfo]], + uiAddress: String, + uiHistoryAddress: String) = { + amClient = AMRMClient.createAMRMClient() + amClient.init(conf) + amClient.start() + this.uiHistoryAddress = uiHistoryAddress + + logInfo("Registering the ApplicationMaster") + amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) + new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args, + preferredNodeLocations) + } + + override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = + amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) + + override def getAttemptId() = { + val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) + val containerId = ConverterUtils.toContainerId(containerIdString) + val appAttemptId = containerId.getApplicationAttemptId() + appAttemptId + } + + override def getProxyHostAndPort(conf: YarnConfiguration) = WebAppUtils.getProxyHostAndPort(conf) + + override def getMaxRegAttempts(conf: YarnConfiguration) = + conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org