This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f3891e3 [SPARK-31235][YARN] Separates different categories of applications f3891e3 is described below commit f3891e377f320ad212d198521dcdf5414830c063 Author: wang-zhun <wangzhun6...@gmail.com> AuthorDate: Tue May 5 08:40:57 2020 -0500 [SPARK-31235][YARN] Separates different categories of applications ### What changes were proposed in this pull request? This PR adds `spark.yarn.applicationType` to identify the application type ### Why are the changes needed? The current application defaults to the SPARK type. In fact, different types of applications have different characteristics and are suitable for different scenarios.For example: SPAKR-SQL, SPARK-STREAMING. I recommend distinguishing them by the parameter `spark.yarn.applicationType` so that we can more easily manage and maintain different types of applications. ### How was this patch tested? 1.add UT 2.Tested by verifying Yarn-UI `ApplicationType` in the following cases: - client and cluster mode Need additional explanation: limit cannot exceed 20 characters, can be empty or space The reasons are as follows: ``` // org.apache.hadoop.yarn.server.resourcemanager.submitApplication. if (submissionContext.getApplicationType() == null) { submissionContext .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE); } else { // APPLICATION_TYPE_LENGTH = 20 if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) { submissionContext.setApplicationType(submissionContext .getApplicationType().substring(0, YarnConfiguration.APPLICATION_TYPE_LENGTH)); } } ``` Closes #28009 from wang-zhun/SPARK-31235. Authored-by: wang-zhun <wangzhun6...@gmail.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- docs/running-on-yarn.md | 9 +++ .../org/apache/spark/deploy/yarn/Client.scala | 2 +- .../org/apache/spark/deploy/yarn/config.scala | 8 +++ .../org/apache/spark/deploy/yarn/ClientSuite.scala | 82 +++++++++++++++++++++- 4 files changed, 97 insertions(+), 4 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 166fb87..b58cd24 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -157,6 +157,15 @@ To use a custom metrics.properties for the application master and executors, upd <td>3.0.0</td> </tr> <tr> + <td><code>spark.yarn.applicationType</code></td> + <td><code>SPARK</code></td> + <td> + Defines more specific application types, e.g. <code>SPARK</code>, <code>SPARK-SQL</code>, <code>SPARK-STREAMING</code>, + <code>SPARK-MLLIB</code> and <code>SPARK-GRAPH</code>. Please be careful not to exceed 20 characters. + </td> + <td>3.1.0</td> +</tr> +<tr> <td><code>spark.yarn.driver.resource.{resource-type}.amount</code></td> <td><code>(none)</code></td> <td> diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6da6a8d..fc429d6 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -256,7 +256,7 @@ private[spark] class Client( appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark")) appContext.setQueue(sparkConf.get(QUEUE_NAME)) appContext.setAMContainerSpec(containerContext) - appContext.setApplicationType("SPARK") + appContext.setApplicationType(sparkConf.get(APPLICATION_TYPE)) sparkConf.get(APPLICATION_TAGS).foreach { tags => appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava)) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 3797491..b4257a4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -111,6 +111,14 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val APPLICATION_TYPE = ConfigBuilder("spark.yarn.applicationType") + .doc("Type of this application," + + "it allows user to specify a more specific type for the application, such as SPARK," + + "SPARK-SQL, SPARK-STREAMING, SPARK-MLLIB and SPARK-GRAPH") + .version("3.1.0") + .stringConf + .createWithDefault("SPARK") + /* File distribution. */ private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive") diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 680ff99..b335e7f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream} import java.net.URI import java.util.Properties +import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap => MutableHashMap} @@ -28,13 +29,20 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse +import org.apache.hadoop.yarn.api.protocolrecords.{GetNewApplicationResponse, SubmitApplicationRequest} import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.YarnClientApplication +import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.event.{Dispatcher, Event, EventHandler} +import org.apache.hadoop.yarn.server.resourcemanager.{ClientRMService, RMAppManager, RMContext} +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager import org.apache.hadoop.yarn.util.Records import org.mockito.ArgumentMatchers.{any, anyBoolean, anyShort, eq => meq} -import org.mockito.Mockito.{spy, verify} +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils} @@ -206,6 +214,74 @@ class ClientSuite extends SparkFunSuite with Matchers { appContext.getPriority.getPriority should be (1) } + test("specify a more specific type for the application") { + // When the type exceeds 20 characters will be truncated by yarn + val appTypes = Map( + 1 -> ("", ""), + 2 -> (" ", " "), + 3 -> ("SPARK-SQL", "SPARK-SQL"), + 4 -> ("012345678901234567890123", "01234567890123456789")) + + for ((id, (sourceType, targetType)) <- appTypes) { + val sparkConf = new SparkConf().set("spark.yarn.applicationType", sourceType) + val args = new ClientArguments(Array()) + + val appContext = spy(Records.newRecord(classOf[ApplicationSubmissionContext])) + val appId = ApplicationId.newInstance(123456, id) + appContext.setApplicationId(appId) + val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) + val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + + val client = new Client(args, sparkConf, null) + val context = client.createApplicationSubmissionContext( + new YarnClientApplication(getNewApplicationResponse, appContext), + containerLaunchContext) + + val yarnClient = mock(classOf[YarnClient]) + when(yarnClient.submitApplication(any())).thenAnswer((invocationOnMock: InvocationOnMock) => { + val subContext = invocationOnMock.getArguments()(0) + .asInstanceOf[ApplicationSubmissionContext] + val request = Records.newRecord(classOf[SubmitApplicationRequest]) + request.setApplicationSubmissionContext(subContext) + + val rmContext = mock(classOf[RMContext]) + val conf = mock(classOf[Configuration]) + val map = new ConcurrentHashMap[ApplicationId, RMApp]() + when(rmContext.getRMApps).thenReturn(map) + val dispatcher = mock(classOf[Dispatcher]) + when(rmContext.getDispatcher).thenReturn(dispatcher) + when[EventHandler[_]](dispatcher.getEventHandler).thenReturn( + new EventHandler[Event[_]] { + override def handle(event: Event[_]): Unit = {} + } + ) + val writer = mock(classOf[RMApplicationHistoryWriter]) + when(rmContext.getRMApplicationHistoryWriter).thenReturn(writer) + val publisher = mock(classOf[SystemMetricsPublisher]) + when(rmContext.getSystemMetricsPublisher).thenReturn(publisher) + when(appContext.getUnmanagedAM).thenReturn(true) + + val rmAppManager = new RMAppManager(rmContext, + null, + null, + mock(classOf[ApplicationACLsManager]), + conf) + val clientRMService = new ClientRMService(rmContext, + null, + rmAppManager, + null, + null, + null) + clientRMService.submitApplication(request) + + assert(map.get(subContext.getApplicationId).getApplicationType === targetType) + null + }) + + yarnClient.submitApplication(context) + } + } + test("spark.yarn.jars with multiple paths and globs") { val libs = Utils.createTempDir() val single = Utils.createTempDir() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org