This is an automated email from the ASF dual-hosted git repository.

taoyang pushed a commit to branch app-backoff-mechanism
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit d82c488b4f48a6fd294d3d47948f44d8e04fc837
Author: Tao Yang <taoy...@apache.org>
AuthorDate: Wed Apr 9 09:43:40 2025 +0800

    YARN-11809. Support application backoff mechanism for CapacityScheduler.
---
 .../scheduler/SchedulerApplicationAttempt.java     |  23 ++-
 .../activities/ActivityDiagnosticConstant.java     |   3 +
 .../scheduler/capacity/AbstractLeafQueue.java      |  76 ++++++-
 .../capacity/CapacitySchedulerConfiguration.java   | 128 ++++++++++++
 .../capacity/TestCapacitySchedulerAppBackoff.java  | 220 +++++++++++++++++++++
 5 files changed, 448 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 7f36b5c7e06..b080b994cb6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -184,7 +184,16 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   private ConcurrentHashMultiset<SchedulerRequestKey>
       missedNonPartitionedReqSchedulingOpportunity =
       ConcurrentHashMultiset.create();
-  
+
+  /**
+   * Tracks the total number of times the application has missed scheduling
+   * opportunities, which will be incremented when the scheduler cannot 
allocate
+   * resources for the application, and reset to 0 when the scheduler
+   * successfully allocates resources for the application or transitions
+   * the application to the backoff state.
+   */
+  private final AtomicLong appMissedSchedulingOpportunities = new AtomicLong();
+
   // Time of the last container scheduled at the current allowed level
   protected Map<SchedulerRequestKey, Long> lastScheduledContainer =
       new ConcurrentHashMap<>();
@@ -1106,6 +1115,18 @@ void setSchedulingOpportunities(SchedulerRequestKey 
schedulerKey, int count) {
     schedulingOpportunities.setCount(schedulerKey, count);
   }
 
+  public void addAppMissedSchedulingOpportunities() {
+    appMissedSchedulingOpportunities.incrementAndGet();
+  }
+
+  public void resetAppMissedSchedulingOpportunities() {
+    appMissedSchedulingOpportunities.set(0);
+  }
+
+  public long getAppMissedSchedulingOpportunities() {
+    return appMissedSchedulingOpportunities.get();
+  }
+
   private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
     long currentTimeMillis = System.currentTimeMillis();
     // Don't walk the whole container list if the resources were computed
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
index 3eb6dc24e09..6d98ff750dd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
@@ -70,6 +70,9 @@ public class ActivityDiagnosticConstant {
   public final static String APPLICATION_DO_NOT_NEED_RESOURCE =
       "Application does not need more resource";
 
+  public static final String APPLICATION_IN_BACKOFF_STATE =
+      "Application is in backoff state due to reaching missed scheduling 
threshold";
+
   /*
    * Request level diagnostics
    */
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
index 6f176a69691..c6ea4b0fa11 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
@@ -31,6 +31,7 @@
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.StringUtils;
@@ -39,6 +40,8 @@
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.thirdparty.com.google.common.cache.Cache;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -154,6 +157,13 @@ public class AbstractLeafQueue extends AbstractCSQueue {
   private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
   private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
 
+  // Backoff related variables
+  private final boolean appBackoffEnabled;
+  private long appBackoffIntervalMs = 0L;
+  private long appBackoffMissedThreshold = 0L;
+  // Cache of applications that are in backoff state
+  private Cache<ApplicationId, Boolean> appsInBackoffState = null;
+
   public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     this(queueContext, queueName, parent, old, false);
@@ -170,6 +180,26 @@ public AbstractLeafQueue(CapacitySchedulerQueueContext 
queueContext,
 
     // One time initialization is enough since it is static ordering policy
     this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps<>();
+
+    // Initialize the backoff configurations
+    CapacitySchedulerConfiguration conf = queueContext.getConfiguration();
+    appBackoffEnabled = conf.isAppBackoffEnabled(queuePath);
+    if (appBackoffEnabled) {
+      appBackoffIntervalMs = conf.getAppBackoffIntervalMs(queuePath);
+      if (appBackoffIntervalMs <= 0) {
+        throw new IOException(
+            "Backoff interval must be greater than 0 for queue: " + queuePath);
+      }
+      appBackoffMissedThreshold =
+          conf.getAppBackoffMissedThreshold(queuePath);
+      if (appBackoffMissedThreshold <= 0) {
+        throw new IOException(
+            "Backoff app missed threshold must be greater than 0 for queue: "
+                + queuePath);
+      }
+      appsInBackoffState = CacheBuilder.newBuilder().expireAfterWrite(
+          appBackoffIntervalMs, TimeUnit.MILLISECONDS).build();
+    }
   }
 
   @SuppressWarnings("checkstyle:nowhitespaceafter")
@@ -314,7 +344,10 @@ protected void setupQueueConfigs(Resource clusterResource) 
throws
               + defaultAppPriorityPerQueue + "\npriority = " + priority
               + "\nmaxLifetime = " + getMaximumApplicationLifetime()
               + " seconds" + "\ndefaultLifetime = "
-              + getDefaultApplicationLifetime() + " seconds");
+              + getDefaultApplicationLifetime() + " seconds"
+              + "\nbackoffEnabled = " + appBackoffEnabled
+              + "\nbackoffIntervalMs = " + appBackoffIntervalMs
+              + "\nbackoffAppMissedThreshold = " + appBackoffMissedThreshold);
     } finally {
       writeLock.unlock();
     }
@@ -1212,6 +1245,33 @@ public CSAssignment assignContainers(Resource 
clusterResource,
          assignmentIterator.hasNext();) {
       FiCaSchedulerApp application = assignmentIterator.next();
 
+      // Check for backoff state
+      if (isAppInBackoffState(application.getApplicationId())) {
+        // Skip if this app is still in backoff state
+        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+            activitiesManager, node, application, application.getPriority(),
+            ActivityDiagnosticConstant.APPLICATION_IN_BACKOFF_STATE);
+        continue;
+      }
+
+      // Check for missed scheduling opportunities
+      if (isAppShouldEnterBackoffState(application)) {
+        // Don't assign containers to this app when the missed opportunities 
reached the threshold.
+        LOG.info("Skip scheduling for application {} as it has reached the "
+            + "missed scheduling threshold {}, the backoff interval is {} ms.",
+            application.getApplicationId(), appBackoffMissedThreshold,
+            appBackoffIntervalMs);
+        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+            activitiesManager, node, application, application.getPriority(),
+            ActivityDiagnosticConstant.APPLICATION_IN_BACKOFF_STATE);
+        // Add the app to the backoff state, to prevent further scheduling
+        // attempts during the backoff period.
+        appsInBackoffState.put(application.getApplicationId(), true);
+        // Reset missed scheduling opportunities
+        application.resetAppMissedSchedulingOpportunities();
+        continue;
+      }
+
       ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
           node, SystemClock.getInstance().getTime(), application);
 
@@ -1302,6 +1362,9 @@ public CSAssignment assignContainers(Resource 
clusterResource,
         ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
             parent.getQueuePath(), getQueuePath(),
             ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+        // Reset missed scheduling opportunities after successfully allocating
+        // resources for the application.
+        application.resetAppMissedSchedulingOpportunities();
         return assignment;
       } else if (assignment.getSkippedType()
           == CSAssignment.SkippedType.OTHER) {
@@ -1309,6 +1372,8 @@ public CSAssignment assignContainers(Resource 
clusterResource,
             activitiesManager, application.getApplicationId(),
             ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
         application.updateNodeInfoForAMDiagnostics(node);
+        // Add missed scheduling opportunities for the application
+        application.addAppMissedSchedulingOpportunities();
       } else if (assignment.getSkippedType()
           == CSAssignment.SkippedType.QUEUE_LIMIT) {
         ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
@@ -1335,6 +1400,15 @@ public CSAssignment assignContainers(Resource 
clusterResource,
     return CSAssignment.NULL_ASSIGNMENT;
   }
 
+  public boolean isAppInBackoffState(ApplicationId appId) {
+    return appBackoffEnabled && appsInBackoffState.getIfPresent(appId) != null;
+  }
+
+  public boolean isAppShouldEnterBackoffState(FiCaSchedulerApp application) {
+    return appBackoffEnabled &&
+        application.getAppMissedSchedulingOpportunities() >= 
appBackoffMissedThreshold;
+  }
+
   @Override
   public boolean accept(Resource cluster,
       ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index ea5c892ce3e..f7506d983fa 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -76,6 +76,7 @@
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes.getAutoCreatedQueueObjectTemplateConfPrefix;
 import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes.getNodeLabelPrefix;
@@ -3036,4 +3037,131 @@ private String normalizePolicyName(String policyName) {
           "Could not instantiate " + "NodesSortingPolicy: " + policyName, e);
     }
   }
+
+  /**
+   * Configuration keys for enabling backoff mechanism for a queue.
+   * When enabled, applications in the queue will be temporarily skipped
+   * if they fail to schedule tasks after a certain number of opportunities.
+   */
+  @Private
+  public static final String BACKOFF_ENABLED = "app-backoff.enabled";
+
+  /**
+   * Default value for enabling backoff mechanism.
+   */
+  @Private
+  public static final boolean DEFAULT_BACKOFF_ENABLED = false;
+
+  /**
+   * Configuration key indicating the duration for which an application
+   * in backoff state will be skipped during the scheduling process.
+   */
+  @Private
+  public static final String APP_BACKOFF_INTERVAL_MS = 
"app-backoff.interval-ms";
+
+  /**
+   * Default value for the backoff duration in milliseconds.
+   */
+  @Private
+  public static final long DEFAULT_APP_BACKOFF_INTERVAL_MS = 3000L;
+
+  /**
+   * Configuration key for the threshold of missed scheduling opportunities
+   * before an application is put into backoff state.
+   */
+  @Private
+  public static final String APP_BACKOFF_MISSED_THRESHOLD =
+      "app-backoff.missed-threshold";
+
+  /**
+   * Default value for missed opportunities' threshold.
+   */
+  @Private
+  public static final long DEFAULT_APP_BACKOFF_MISSED_THRESHOLD = 3L;
+
+  /**
+   * Get the global default value for backoff enabled setting.
+   * @return true if backoff is enabled, false otherwise
+   */
+  public boolean getGlobalAppBackoffEnabled() {
+    return getBoolean(PREFIX + BACKOFF_ENABLED, DEFAULT_BACKOFF_ENABLED);
+  }
+
+  /**
+   * Get the global default value for backoff interval in milliseconds.
+   * @return the backoff interval in milliseconds
+   */
+  public long getGlobalAppBackoffIntervalMs() {
+    return getTimeDuration(PREFIX + APP_BACKOFF_INTERVAL_MS,
+        DEFAULT_APP_BACKOFF_INTERVAL_MS, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Get the global default value for missed opportunities' threshold.
+   * @return the missed opportunities threshold
+   */
+  public long getGlobalAppBackoffMissedThreshold() {
+    return getLong(PREFIX + APP_BACKOFF_MISSED_THRESHOLD,
+        DEFAULT_APP_BACKOFF_MISSED_THRESHOLD);
+  }
+
+  /**
+   * Check if app-backoff is enabled for a specific queue.
+   * @param queue the queue path
+   * @return true if app-backoff is enabled for the queue, false otherwise
+   */
+  public boolean isAppBackoffEnabled(QueuePath queue) {
+    return getBoolean(getQueuePrefix(queue) + BACKOFF_ENABLED,
+        getGlobalAppBackoffEnabled());
+  }
+
+  /**
+   * Get the app-backoff interval in milliseconds for a specific queue.
+   * @param queue the queue path
+   * @return the app-backoff interval in milliseconds
+   */
+  public long getAppBackoffIntervalMs(QueuePath queue) {
+    return getTimeDuration(getQueuePrefix(queue) + APP_BACKOFF_INTERVAL_MS,
+        getGlobalAppBackoffIntervalMs(), TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Get the missed opportunities threshold for a specific queue.
+   * @param queue the queue path
+   * @return the missed opportunities threshold
+   */
+  public long getAppBackoffMissedThreshold(QueuePath queue) {
+    return getLong(getQueuePrefix(queue) + APP_BACKOFF_MISSED_THRESHOLD,
+        getGlobalAppBackoffMissedThreshold());
+  }
+
+  /**
+   * Set the app-backoff enabled flag for a specific queue (for testing).
+   * @param queue the queue path
+   * @param enabled the backoff enabled flag
+   */
+  @VisibleForTesting
+  public void setAppBackoffEnabled(QueuePath queue, boolean enabled) {
+    setBoolean(getQueuePrefix(queue) + BACKOFF_ENABLED, enabled);
+  }
+
+  /**
+   * Set the app-backoff interval in milliseconds for a specific queue (for 
testing).
+   * @param queue the queue path
+   * @param intervalMs the backoff interval in milliseconds
+   */
+  @VisibleForTesting
+  public void setAppBackoffIntervalMs(QueuePath queue, long intervalMs) {
+    setLong(getQueuePrefix(queue) + APP_BACKOFF_INTERVAL_MS, intervalMs);
+  }
+
+  /**
+   * Set the app-backoff missed opportunities threshold for a specific queue 
(for testing).
+   * @param queue the queue path
+   * @param threshold the missed opportunities threshold
+   */
+  @VisibleForTesting
+  public void setAppBackoffMissedThreshold(QueuePath queue, long threshold) {
+    setLong(getQueuePrefix(queue) + APP_BACKOFF_MISSED_THRESHOLD, threshold);
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAppBackoff.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAppBackoff.java
new file mode 100644
index 00000000000..1affd6aa62c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAppBackoff.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+
+import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+import static 
org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.APP_BACKOFF_MISSED_THRESHOLD;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.BACKOFF_ENABLED;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.APP_BACKOFF_INTERVAL_MS;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_APP_BACKOFF_MISSED_THRESHOLD;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_APP_BACKOFF_INTERVAL_MS;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A3;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCapacitySchedulerAppBackoff {
+
+  @Test
+  public void testAppBackoffConfUpdate() throws Exception {
+    // Setup initial queue configuration
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    conf.setAppBackoffEnabled(A1, true);
+    conf.setAppBackoffIntervalMs(A1, 10000L);
+    conf.setAppBackoffMissedThreshold(A1, 50L);
+    conf.setAppBackoffEnabled(B2, true);
+
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Verify initial configuration
+    assertTrue(cs.getConfiguration().isAppBackoffEnabled(A1));
+    assertEquals(10000L, cs.getConfiguration().getAppBackoffIntervalMs(A1));
+    assertEquals(50L, cs.getConfiguration().getAppBackoffMissedThreshold(A1));
+    assertTrue(cs.getConfiguration().isAppBackoffEnabled(B2));
+    assertEquals(DEFAULT_APP_BACKOFF_INTERVAL_MS,
+        cs.getConfiguration().getAppBackoffIntervalMs(B2));
+    assertEquals(DEFAULT_APP_BACKOFF_MISSED_THRESHOLD,
+        cs.getConfiguration().getAppBackoffMissedThreshold(B2));
+    assertFalse(cs.getConfiguration().isAppBackoffEnabled(A2));
+    assertFalse(cs.getConfiguration().isAppBackoffEnabled(A3));
+    assertFalse(cs.getConfiguration().isAppBackoffEnabled(B1));
+    assertFalse(cs.getConfiguration().isAppBackoffEnabled(B3));
+
+    // Update configuration: enabled backoff
+    conf.setBoolean(PREFIX + BACKOFF_ENABLED, true);
+    conf.setLong(PREFIX + APP_BACKOFF_MISSED_THRESHOLD, 5L);
+    conf.setLong(PREFIX + APP_BACKOFF_INTERVAL_MS, 5000L);
+    // Disabled for A1
+    conf.setAppBackoffEnabled(A1, false);
+
+    // Reinitialize the scheduler with updated configuration
+    cs.reinitialize(conf, rm.getRMContext());
+
+    // Verify updated configuration
+    CapacitySchedulerConfiguration newConf = cs.getConfiguration();
+    assertTrue(newConf.isAppBackoffEnabled(B2));
+    assertEquals(5L, newConf.getAppBackoffMissedThreshold(B2));
+    assertEquals(5000L, newConf.getAppBackoffIntervalMs(B2));
+    assertFalse(newConf.isAppBackoffEnabled(A1));
+    assertTrue(newConf.isAppBackoffEnabled(A2));
+    assertTrue(newConf.isAppBackoffEnabled(A3));
+    assertTrue(newConf.isAppBackoffEnabled(B1));
+    assertTrue(newConf.isAppBackoffEnabled(B3));
+
+    rm.stop();
+  }
+
+  @Test
+  public void testSchedulingWithAppBackoffEnabled() throws Exception {
+    // Setup backoff conf for queue A1
+    long appBackoffIntervalMs = 100L;
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    enabledMultiNodesPlacement(conf);
+    conf.setAppBackoffEnabled(A1, true);
+    conf.setAppBackoffIntervalMs(A1, appBackoffIntervalMs);
+    conf.setAppBackoffMissedThreshold(A1, 3L);
+
+    // Register a node
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB);
+
+    // Submit an application in queue A1
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm)
+            .withAppName("app1")
+            .withUser("user")
+            .withAcls(null)
+            .withQueue(A1.getLeafName())
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+    // Submit a request that cannot be satisfied due to the
+    // placement-constraint condition
+    PlacementConstraint pc = targetIn("node",
+        allocationTag("hbase-master")).build();
+    SchedulingRequest schedulingRequest = SchedulingRequest.newInstance(
+        1, Priority.newInstance(1), ExecutionTypeRequest.newInstance(), null,
+        ResourceSizing.newInstance(1, Resource.newInstance(2 * GB, 1)), pc);
+    am.addSchedulingRequest(ImmutableList.of(schedulingRequest));
+    am.doHeartbeat();
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    AbstractLeafQueue queueA1 =
+        (AbstractLeafQueue) cs.getQueue(A1.getLeafName());
+    FiCaSchedulerApp schedulerApp =
+        cs.getApplicationAttempt(am.getApplicationAttemptId());
+
+    // Simulate missed scheduling opportunities
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(
+          rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+    }
+    assertFalse(queueA1.isAppInBackoffState(app.getApplicationId()));
+    assertEquals(3L, schedulerApp.getAppMissedSchedulingOpportunities());
+
+    // Make the app enter backoff state when it reaches the missed threshold
+    cs.handle(new NodeUpdateSchedulerEvent(
+        rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+
+    // Verify app is in backoff state
+    assertTrue(queueA1.isAppInBackoffState(app.getApplicationId()));
+    assertEquals(0L, schedulerApp.getAppMissedSchedulingOpportunities());
+
+    // Wait for the backoff interval to expire
+    GenericTestUtils.waitFor(
+        () -> !queueA1.isAppInBackoffState(app.getApplicationId()),
+        appBackoffIntervalMs, appBackoffIntervalMs * 2);
+
+    // Verify app is no longer in backoff state after the backoff interval
+    assertFalse(queueA1.isAppInBackoffState(app.getApplicationId()));
+
+    // Simulate another missed scheduling opportunity
+    cs.handle(new NodeUpdateSchedulerEvent(
+        rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+    assertFalse(queueA1.isAppInBackoffState(app.getApplicationId()));
+    assertEquals(1L, schedulerApp.getAppMissedSchedulingOpportunities());
+
+    // Request another request which can be allocated at first
+    am.allocate("*", 2 * GB, 1, new ArrayList<>());
+    cs.handle(new NodeUpdateSchedulerEvent(
+        rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+
+    // new request should be allocated and app is not in backoff state
+    assertFalse(queueA1.isAppInBackoffState(schedulerApp.getApplicationId()));
+    assertEquals(0L, schedulerApp.getAppMissedSchedulingOpportunities());
+
+    rm.stop();
+  }
+
+  private void enabledMultiNodesPlacement(CapacitySchedulerConfiguration conf) 
{
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    conf.setBoolean(MULTI_NODE_PLACEMENT_ENABLED, true);
+    conf.setBoolean(PREFIX + MULTI_NODE_PLACEMENT_ENABLED, true);
+    conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
+        "resource-based");
+    conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
+        "resource-based");
+    String policyName =
+        CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME +
+            ".resource-based.class";
+    conf.set(policyName, ResourceUsageMultiNodeLookupPolicy.class.getName());
+    
conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
+        true);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to