This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f76d7cc31e7 [SPARK-41585][YARN] The Spark exclude node functionality 
for YARN should work independently of dynamic allocation
f76d7cc31e7 is described below

commit f76d7cc31e72778b62647264e5b90059de888c20
Author: Luca Canali <luca.can...@cern.ch>
AuthorDate: Mon Mar 20 14:01:53 2023 -0500

    [SPARK-41585][YARN] The Spark exclude node functionality for YARN should 
work independently of dynamic allocation
    
    ### What changes were proposed in this pull request?
    The Spark exclude node functionality for Spark on YARN, introduced in 
[SPARK-26688](https://issues.apache.org/jira/browse/SPARK-26688), allows users 
to specify a list of node names that are excluded from resource allocation. 
This is done using the configuration parameter: `spark.yarn.exclude.nodes`
    The feature currently works only for executors allocated via dynamic 
allocation. To use the feature on Spark 3.3.1, for example, one may need also 
to configure `spark.dynamicAllocation.minExecutors=0` and 
`spark.executor.instances=0`, therefore relying on executor resource allocation 
only via dynamic allocation.
    
    ### Why are the changes needed?
    This proposes to extend the use of Spark exclude node functionality for 
YARN beyond dynamic allocation, which I believe makes it more consistent also 
with what the documentation reports for this feature/configuration parameter.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, this allows using the executor exclude nodes feature for Spark on YARN 
also when not using dynamical allocation.
    
    ### How was this patch tested?
    A unit test has been added for this + manual tests on a YARN cluster.
    
    Closes #39127 from LucaCanali/excludeNodesBesidesDynamicAllocation.
    
    Authored-by: Luca Canali <luca.can...@cern.ch>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala     |  2 ++
 .../spark/deploy/yarn/YarnAllocatorHealthTrackerSuite.scala    | 10 ++++++++++
 2 files changed, 12 insertions(+)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala
index bfe0face8c2..2f5297107e6 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala
@@ -144,6 +144,8 @@ private[spark] class YarnAllocatorNodeHealthTracker(
     val now = failureTracker.clock.getTimeMillis()
     allocatorExcludedNodeList.retain { (_, expiryTime) => expiryTime > now }
   }
+
+  refreshExcludedNodes
 }
 
 /**
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorHealthTrackerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorHealthTrackerSuite.scala
index c2fd5ff3165..c722b565549 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorHealthTrackerSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorHealthTrackerSuite.scala
@@ -97,6 +97,16 @@ class YarnAllocatorHealthTrackerSuite extends SparkFunSuite 
with Matchers
     verify(amClientMock, times(0)).updateBlacklist(Collections.emptyList(), 
Collections.emptyList())
   }
 
+  test("SPARK-41585 YARN Exclude Nodes should work independently of dynamic 
allocation") {
+    sparkConf.set(YARN_EXCLUDE_NODES, Seq("host1", "host2"))
+    val yarnHealthTracker = createYarnAllocatorHealthTracker(sparkConf)
+
+    // Check that host1 and host2 are in the exclude list
+    // Note, this covers also non-dynamic allocation
+    verify(amClientMock)
+      .updateBlacklist(Arrays.asList("host1", "host2"), 
Collections.emptyList())
+  }
+
   test("combining scheduler and allocation excluded node list") {
     sparkConf.set(YARN_EXCLUDE_NODES, Seq("initial1", "initial2"))
     val yarnHealthTracker = createYarnAllocatorHealthTracker(sparkConf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to