This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f3891e3  [SPARK-31235][YARN] Separates different categories of 
applications
f3891e3 is described below

commit f3891e377f320ad212d198521dcdf5414830c063
Author: wang-zhun <wangzhun6...@gmail.com>
AuthorDate: Tue May 5 08:40:57 2020 -0500

    [SPARK-31235][YARN] Separates different categories of applications
    
    ### What changes were proposed in this pull request?
    This PR adds `spark.yarn.applicationType` to identify the application type
    
    ### Why are the changes needed?
    The current application defaults to the SPARK type.
    In fact, different types of applications have different characteristics and 
are suitable for different scenarios.For example: SPAKR-SQL, SPARK-STREAMING.
    I recommend distinguishing them by the parameter 
`spark.yarn.applicationType` so that we can more easily manage and maintain 
different types of applications.
    
    ### How was this patch tested?
    1.add UT
    2.Tested by verifying Yarn-UI `ApplicationType` in the following cases:
    - client and cluster mode
    
    Need additional explanation:
    limit cannot exceed 20 characters, can be empty or space
    The reasons are as follows:
    ```
    // org.apache.hadoop.yarn.server.resourcemanager.submitApplication.
     if (submissionContext.getApplicationType() == null) {
          submissionContext
            .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
    } else {
          // APPLICATION_TYPE_LENGTH = 20
          if (submissionContext.getApplicationType().length() > 
YarnConfiguration.APPLICATION_TYPE_LENGTH) {
            submissionContext.setApplicationType(submissionContext
              .getApplicationType().substring(0,
                YarnConfiguration.APPLICATION_TYPE_LENGTH));
          }
        }
    ```
    
    Closes #28009 from wang-zhun/SPARK-31235.
    
    Authored-by: wang-zhun <wangzhun6...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 docs/running-on-yarn.md                            |  9 +++
 .../org/apache/spark/deploy/yarn/Client.scala      |  2 +-
 .../org/apache/spark/deploy/yarn/config.scala      |  8 +++
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 82 +++++++++++++++++++++-
 4 files changed, 97 insertions(+), 4 deletions(-)

diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 166fb87..b58cd24 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -157,6 +157,15 @@ To use a custom metrics.properties for the application 
master and executors, upd
   <td>3.0.0</td>
 </tr>
 <tr>
+  <td><code>spark.yarn.applicationType</code></td>
+  <td><code>SPARK</code></td>
+  <td>
+    Defines more specific application types, e.g. <code>SPARK</code>, 
<code>SPARK-SQL</code>, <code>SPARK-STREAMING</code>,
+    <code>SPARK-MLLIB</code> and <code>SPARK-GRAPH</code>. Please be careful 
not to exceed 20 characters.
+  </td>
+  <td>3.1.0</td>
+</tr>
+<tr>
   <td><code>spark.yarn.driver.resource.{resource-type}.amount</code></td>
   <td><code>(none)</code></td>
   <td>
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6da6a8d..fc429d6 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -256,7 +256,7 @@ private[spark] class Client(
     appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
     appContext.setQueue(sparkConf.get(QUEUE_NAME))
     appContext.setAMContainerSpec(containerContext)
-    appContext.setApplicationType("SPARK")
+    appContext.setApplicationType(sparkConf.get(APPLICATION_TYPE))
 
     sparkConf.get(APPLICATION_TAGS).foreach { tags =>
       appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 3797491..b4257a4 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -111,6 +111,14 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val APPLICATION_TYPE = 
ConfigBuilder("spark.yarn.applicationType")
+    .doc("Type of this application," +
+      "it allows user to specify a more specific type for the application, 
such as SPARK," +
+      "SPARK-SQL, SPARK-STREAMING, SPARK-MLLIB and SPARK-GRAPH")
+    .version("3.1.0")
+    .stringConf
+    .createWithDefault("SPARK")
+
   /* File distribution. */
 
   private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 680ff99..b335e7f 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
 import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream}
 import java.net.URI
 import java.util.Properties
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap => MutableHashMap}
@@ -28,13 +29,20 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.MRJobConfig
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
+import org.apache.hadoop.yarn.api.protocolrecords.{GetNewApplicationResponse, 
SubmitApplicationRequest}
 import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.YarnClientApplication
+import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.event.{Dispatcher, Event, EventHandler}
+import org.apache.hadoop.yarn.server.resourcemanager.{ClientRMService, 
RMAppManager, RMContext}
+import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter
+import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager
 import org.apache.hadoop.yarn.util.Records
 import org.mockito.ArgumentMatchers.{any, anyBoolean, anyShort, eq => meq}
-import org.mockito.Mockito.{spy, verify}
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
 import org.scalatest.Matchers
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils}
@@ -206,6 +214,74 @@ class ClientSuite extends SparkFunSuite with Matchers {
     appContext.getPriority.getPriority should be (1)
   }
 
+  test("specify a more specific type for the application") {
+    // When the type exceeds 20 characters will be truncated by yarn
+    val appTypes = Map(
+      1 -> ("", ""),
+      2 -> (" ", " "),
+      3 -> ("SPARK-SQL", "SPARK-SQL"),
+      4 -> ("012345678901234567890123", "01234567890123456789"))
+
+    for ((id, (sourceType, targetType)) <- appTypes) {
+      val sparkConf = new SparkConf().set("spark.yarn.applicationType", 
sourceType)
+      val args = new ClientArguments(Array())
+
+      val appContext = 
spy(Records.newRecord(classOf[ApplicationSubmissionContext]))
+      val appId = ApplicationId.newInstance(123456, id)
+      appContext.setApplicationId(appId)
+      val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+      val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+
+      val client = new Client(args, sparkConf, null)
+      val context = client.createApplicationSubmissionContext(
+        new YarnClientApplication(getNewApplicationResponse, appContext),
+        containerLaunchContext)
+
+      val yarnClient = mock(classOf[YarnClient])
+      when(yarnClient.submitApplication(any())).thenAnswer((invocationOnMock: 
InvocationOnMock) => {
+        val subContext = invocationOnMock.getArguments()(0)
+          .asInstanceOf[ApplicationSubmissionContext]
+        val request = Records.newRecord(classOf[SubmitApplicationRequest])
+        request.setApplicationSubmissionContext(subContext)
+
+        val rmContext = mock(classOf[RMContext])
+        val conf = mock(classOf[Configuration])
+        val map = new ConcurrentHashMap[ApplicationId, RMApp]()
+        when(rmContext.getRMApps).thenReturn(map)
+        val dispatcher = mock(classOf[Dispatcher])
+        when(rmContext.getDispatcher).thenReturn(dispatcher)
+        when[EventHandler[_]](dispatcher.getEventHandler).thenReturn(
+          new EventHandler[Event[_]] {
+            override def handle(event: Event[_]): Unit = {}
+          }
+        )
+        val writer = mock(classOf[RMApplicationHistoryWriter])
+        when(rmContext.getRMApplicationHistoryWriter).thenReturn(writer)
+        val publisher = mock(classOf[SystemMetricsPublisher])
+        when(rmContext.getSystemMetricsPublisher).thenReturn(publisher)
+        when(appContext.getUnmanagedAM).thenReturn(true)
+
+        val rmAppManager = new RMAppManager(rmContext,
+          null,
+          null,
+          mock(classOf[ApplicationACLsManager]),
+          conf)
+        val clientRMService = new ClientRMService(rmContext,
+          null,
+          rmAppManager,
+          null,
+          null,
+          null)
+        clientRMService.submitApplication(request)
+
+        assert(map.get(subContext.getApplicationId).getApplicationType === 
targetType)
+        null
+      })
+
+      yarnClient.submitApplication(context)
+    }
+  }
+
   test("spark.yarn.jars with multiple paths and globs") {
     val libs = Utils.createTempDir()
     val single = Utils.createTempDir()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to