This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 51729e67a86 extend disk utilization check to offline segment upload 
(#17579)
51729e67a86 is described below

commit 51729e67a86724b4edda0061c34cd201d27044fe
Author: mluvin-stripe <[email protected]>
AuthorDate: Tue Mar 3 16:08:30 2026 -0500

    extend disk utilization check to offline segment upload (#17579)
---
 .../pinot/controller/BaseControllerStarter.java    | 16 ++++++++-----
 .../apache/pinot/controller/ControllerConf.java    | 20 +++++++++++++++++
 .../PinotSegmentUploadDownloadRestletResource.java | 26 ++++++++++++++++++++++
 .../controller/validation/UtilizationChecker.java  |  4 +++-
 4 files changed, 60 insertions(+), 6 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 4560e265fa7..d8373ca4486 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -231,7 +231,6 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected TableSizeReader _tableSizeReader;
   protected StorageQuotaChecker _storageQuotaChecker;
   protected final List<UtilizationChecker> _utilizationCheckers = new 
ArrayList<>();
-  protected DiskUtilizationChecker _diskUtilizationChecker;
   protected ResourceUtilizationManager _resourceUtilizationManager;
   protected RebalancePreChecker _rebalancePreChecker;
   protected TableRebalanceManager _tableRebalanceManager;
@@ -379,6 +378,13 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
             MAX_STATE_TRANSITIONS_PER_RESOURCE, constraintItemResource);
   }
 
+  protected boolean shouldAddUtilizationChecker(boolean 
isSpecificUtilizationCheckerEnabled) {
+    // Add utilization checker if:
+    // 1. All resource utilization checkers are enabled (on by default for 
backwards compatibility), OR
+    // 2. This specific utilization checker is enabled
+    return _config.isAllResourceUtilizationCheckersEnabled() || 
isSpecificUtilizationCheckerEnabled;
+  }
+
   protected void addUtilizationChecker(UtilizationChecker utilizationChecker) {
     _utilizationCheckers.add(utilizationChecker);
   }
@@ -616,7 +622,6 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       throw new RuntimeException("Failed to register cluster config change 
handler", e);
     }
 
-
     SegmentCompletionConfig segmentCompletionConfig = new 
SegmentCompletionConfig(_config);
 
     _segmentCompletionManager =
@@ -636,8 +641,10 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, 
_controllerMetrics, _leadControllerManager,
         _helixResourceManager, _config);
 
-    _diskUtilizationChecker = new 
DiskUtilizationChecker(_helixResourceManager, _config);
-    addUtilizationChecker(_diskUtilizationChecker);
+    DiskUtilizationChecker diskUtilizationChecker = new 
DiskUtilizationChecker(_helixResourceManager, _config);
+    if 
(shouldAddUtilizationChecker(_config.isDiskUtilizationCheckerEnabled())) {
+      addUtilizationChecker(diskUtilizationChecker);
+    }
     _resourceUtilizationManager = new ResourceUtilizationManager(_config, 
_utilizationCheckers);
     _rebalancePreChecker = 
RebalancePreCheckerFactory.create(_config.getRebalancePreCheckerClass());
     _rebalancePreChecker.init(_helixResourceManager, _executorService, 
_config.getRebalanceDiskUtilizationThreshold());
@@ -711,7 +718,6 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         bind(_tenantRebalancer).to(TenantRebalancer.class);
         bind(_tableSizeReader).to(TableSizeReader.class);
         bind(_storageQuotaChecker).to(StorageQuotaChecker.class);
-        bind(_diskUtilizationChecker).to(DiskUtilizationChecker.class);
         bind(_resourceUtilizationManager).to(ResourceUtilizationManager.class);
         
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 22d7accb777..308b3062f58 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -97,6 +97,7 @@ public class ControllerConf extends PinotConfiguration {
   // Used to determine whether to use group commit idealstate on segment 
completion
   public static final String 
CONTROLLER_SEGMENT_COMPLETION_GROUP_COMMIT_ENABLED =
       "controller.segment.completion.group.commit.enabled";
+
   public enum ControllerMode {
     DUAL, PINOT_ONLY, HELIX_ONLY
   }
@@ -371,6 +372,14 @@ public class ControllerConf extends PinotConfiguration {
   public static final String DISK_UTILIZATION_CHECK_TIMEOUT_MS = 
"controller.disk.utilization.check.timeoutMs";
   public static final String DISK_UTILIZATION_PATH = 
"controller.disk.utilization.path";
   public static final String ENABLE_RESOURCE_UTILIZATION_CHECK = 
"controller.enable.resource.utilization.check";
+  // Explicitly enables all resource utilization checkers
+  public static final String ENABLE_ALL_RESOURCE_UTILIZATION_CHECKERS =
+      "controller.enable.all.resource.utilization.checkers";
+  // If controller.enable.all.resource.utilization.checkers = false, each 
individual utilization checker can be enabled.
+  // When a new resource utilization checker is added, a new config must be 
added to have the option to specifically
+  // enable/disable it.
+  public static final String ENABLE_DISK_UTILIZATION_CHECKER =
+      "controller.enable.disk.utilization.checker";
   public static final String RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY =
       "controller.resource.utilization.checker.initial.delay";
   public static final String RESOURCE_UTILIZATION_CHECKER_FREQUENCY =
@@ -405,6 +414,9 @@ public class ControllerConf extends PinotConfiguration {
   public static final int DEFAULT_DISK_UTILIZATION_CHECK_TIMEOUT_MS = 30_000;
   public static final String DEFAULT_DISK_UTILIZATION_PATH = 
"/home/pinot/data";
   public static final boolean DEFAULT_ENABLE_RESOURCE_UTILIZATION_CHECK = 
false;
+  // Include all resource utilization checkers by default
+  public static final boolean DEFAULT_ENABLE_ALL_RESOURCE_UTILIZATION_CHECKERS 
= true;
+  public static final boolean DEFAULT_ENABLE_DISK_UTILIZATION_CHECKER = false;
   public static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY 
= 300L; // 5 minutes
   public static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_FREQUENCY = 
300L; // 5 minutes
   public static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
@@ -1140,6 +1152,14 @@ public class ControllerConf extends PinotConfiguration {
     return getProperty(ENABLE_RESOURCE_UTILIZATION_CHECK, 
DEFAULT_ENABLE_RESOURCE_UTILIZATION_CHECK);
   }
 
+  public boolean isAllResourceUtilizationCheckersEnabled() {
+    return getProperty(ENABLE_ALL_RESOURCE_UTILIZATION_CHECKERS, 
DEFAULT_ENABLE_ALL_RESOURCE_UTILIZATION_CHECKERS);
+  }
+
+  public boolean isDiskUtilizationCheckerEnabled() {
+    return getProperty(ENABLE_DISK_UTILIZATION_CHECKER, 
DEFAULT_ENABLE_DISK_UTILIZATION_CHECKER);
+  }
+
   public boolean getEnableBatchMessageMode() {
     return getProperty(ENABLE_BATCH_MESSAGE_MODE, 
DEFAULT_ENABLE_BATCH_MESSAGE_MODE);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 75ff7eb298f..d57c7926b77 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -96,7 +96,9 @@ import 
org.apache.pinot.controller.api.upload.SegmentValidationUtils;
 import org.apache.pinot.controller.api.upload.ZKOperator;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.retention.RetentionManager;
+import org.apache.pinot.controller.validation.ResourceUtilizationManager;
 import org.apache.pinot.controller.validation.StorageQuotaChecker;
+import org.apache.pinot.controller.validation.UtilizationChecker;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
@@ -159,6 +161,9 @@ public class PinotSegmentUploadDownloadRestletResource {
   @Inject
   AccessControlFactory _accessControlFactory;
 
+  @Inject
+  ResourceUtilizationManager _resourceUtilizationManager;
+
   @GET
   @Produces(MediaType.APPLICATION_OCTET_STREAM)
   @Path("/segments/{tableName}/{segmentName}")
@@ -406,6 +411,27 @@ public class PinotSegmentUploadDownloadRestletResource {
       SegmentValidationUtils.checkStorageQuota(segmentName, 
segmentSizeInBytes, untarredSegmentSizeInBytes, tableConfig,
           _storageQuotaChecker);
 
+      // Perform resource utilization checks
+      UtilizationChecker.CheckResult isResourceUtilizationWithinLimits =
+          
_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType,
+              UtilizationChecker.CheckPurpose.OFFLINE_SEGMENT_UPLOAD);
+      if (isResourceUtilizationWithinLimits == 
UtilizationChecker.CheckResult.FAIL) {
+        _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
+            1L);
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Resource utilization limit exceeded for table: %s, 
rejecting upload for segment: %s",
+                tableNameWithType,
+                segmentName),
+            Response.Status.FORBIDDEN);
+      } else if (isResourceUtilizationWithinLimits == 
UtilizationChecker.CheckResult.UNDETERMINED) {
+        LOGGER.warn(
+            "Resource utilization status could not be determined for table: 
{}. Will allow segment upload to "
+                + "proceed.",
+            tableNameWithType);
+      }
+      _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
+          0L);
+
       // Encrypt segment
       String crypterNameInTableConfig = 
tableConfig.getValidationConfig().getCrypterClassName();
       Pair<String, File> encryptionInfo =
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
index 7b122bb98f1..7fd1fddaeec 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
@@ -54,7 +54,9 @@ public interface UtilizationChecker {
   enum CheckPurpose {
     // REALTIME_INGESTION if the check is performed from the realtime 
ingestion code path to pause ingestion
     // TASK_GENERATION if the check is performed from the task generation 
framework to pause creation of new tasks
-    REALTIME_INGESTION, TASK_GENERATION
+    // OFFLINE_SEGMENT_UPLOAD if the check is performed from the offline 
segment upload code path to reject a segment
+    // upload
+    REALTIME_INGESTION, TASK_GENERATION, OFFLINE_SEGMENT_UPLOAD
   }
 
   enum CheckResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to