This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 51729e67a86 extend disk utilization check to offline segment upload
(#17579)
51729e67a86 is described below
commit 51729e67a86724b4edda0061c34cd201d27044fe
Author: mluvin-stripe <[email protected]>
AuthorDate: Tue Mar 3 16:08:30 2026 -0500
extend disk utilization check to offline segment upload (#17579)
---
.../pinot/controller/BaseControllerStarter.java | 16 ++++++++-----
.../apache/pinot/controller/ControllerConf.java | 20 +++++++++++++++++
.../PinotSegmentUploadDownloadRestletResource.java | 26 ++++++++++++++++++++++
.../controller/validation/UtilizationChecker.java | 4 +++-
4 files changed, 60 insertions(+), 6 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 4560e265fa7..d8373ca4486 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -231,7 +231,6 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected TableSizeReader _tableSizeReader;
protected StorageQuotaChecker _storageQuotaChecker;
protected final List<UtilizationChecker> _utilizationCheckers = new
ArrayList<>();
- protected DiskUtilizationChecker _diskUtilizationChecker;
protected ResourceUtilizationManager _resourceUtilizationManager;
protected RebalancePreChecker _rebalancePreChecker;
protected TableRebalanceManager _tableRebalanceManager;
@@ -379,6 +378,13 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
MAX_STATE_TRANSITIONS_PER_RESOURCE, constraintItemResource);
}
+ protected boolean shouldAddUtilizationChecker(boolean
isSpecificUtilizationCheckerEnabled) {
+ // Add utilization checker if:
+ // 1. All resource utilization checkers are enabled (on by default for
backwards compatibility), OR
+ // 2. This specific utilization checker is enabled
+ return _config.isAllResourceUtilizationCheckersEnabled() ||
isSpecificUtilizationCheckerEnabled;
+ }
+
protected void addUtilizationChecker(UtilizationChecker utilizationChecker) {
_utilizationCheckers.add(utilizationChecker);
}
@@ -616,7 +622,6 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
throw new RuntimeException("Failed to register cluster config change
handler", e);
}
-
SegmentCompletionConfig segmentCompletionConfig = new
SegmentCompletionConfig(_config);
_segmentCompletionManager =
@@ -636,8 +641,10 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
_storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader,
_controllerMetrics, _leadControllerManager,
_helixResourceManager, _config);
- _diskUtilizationChecker = new
DiskUtilizationChecker(_helixResourceManager, _config);
- addUtilizationChecker(_diskUtilizationChecker);
+ DiskUtilizationChecker diskUtilizationChecker = new
DiskUtilizationChecker(_helixResourceManager, _config);
+ if
(shouldAddUtilizationChecker(_config.isDiskUtilizationCheckerEnabled())) {
+ addUtilizationChecker(diskUtilizationChecker);
+ }
_resourceUtilizationManager = new ResourceUtilizationManager(_config,
_utilizationCheckers);
_rebalancePreChecker =
RebalancePreCheckerFactory.create(_config.getRebalancePreCheckerClass());
_rebalancePreChecker.init(_helixResourceManager, _executorService,
_config.getRebalanceDiskUtilizationThreshold());
@@ -711,7 +718,6 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
bind(_tenantRebalancer).to(TenantRebalancer.class);
bind(_tableSizeReader).to(TableSizeReader.class);
bind(_storageQuotaChecker).to(StorageQuotaChecker.class);
- bind(_diskUtilizationChecker).to(DiskUtilizationChecker.class);
bind(_resourceUtilizationManager).to(ResourceUtilizationManager.class);
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 22d7accb777..308b3062f58 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -97,6 +97,7 @@ public class ControllerConf extends PinotConfiguration {
// Used to determine whether to use group commit idealstate on segment
completion
public static final String
CONTROLLER_SEGMENT_COMPLETION_GROUP_COMMIT_ENABLED =
"controller.segment.completion.group.commit.enabled";
+
public enum ControllerMode {
DUAL, PINOT_ONLY, HELIX_ONLY
}
@@ -371,6 +372,14 @@ public class ControllerConf extends PinotConfiguration {
public static final String DISK_UTILIZATION_CHECK_TIMEOUT_MS =
"controller.disk.utilization.check.timeoutMs";
public static final String DISK_UTILIZATION_PATH =
"controller.disk.utilization.path";
public static final String ENABLE_RESOURCE_UTILIZATION_CHECK =
"controller.enable.resource.utilization.check";
+ // Explicitly enables all resource utilization checkers
+ public static final String ENABLE_ALL_RESOURCE_UTILIZATION_CHECKERS =
+ "controller.enable.all.resource.utilization.checkers";
+ // If controller.enable.all.resource.utilization.checkers = false, each
individual utilization checker can be enabled.
+ // When a new resource utilization checker is added, a new config must be
added to have the option to specifically
+ // enable/disable it.
+ public static final String ENABLE_DISK_UTILIZATION_CHECKER =
+ "controller.enable.disk.utilization.checker";
public static final String RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY =
"controller.resource.utilization.checker.initial.delay";
public static final String RESOURCE_UTILIZATION_CHECKER_FREQUENCY =
@@ -405,6 +414,9 @@ public class ControllerConf extends PinotConfiguration {
public static final int DEFAULT_DISK_UTILIZATION_CHECK_TIMEOUT_MS = 30_000;
public static final String DEFAULT_DISK_UTILIZATION_PATH =
"/home/pinot/data";
public static final boolean DEFAULT_ENABLE_RESOURCE_UTILIZATION_CHECK =
false;
+ // Include all resource utilization checkers by default
+ public static final boolean DEFAULT_ENABLE_ALL_RESOURCE_UTILIZATION_CHECKERS
= true;
+ public static final boolean DEFAULT_ENABLE_DISK_UTILIZATION_CHECKER = false;
public static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY
= 300L; // 5 minutes
public static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_FREQUENCY =
300L; // 5 minutes
public static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
@@ -1140,6 +1152,14 @@ public class ControllerConf extends PinotConfiguration {
return getProperty(ENABLE_RESOURCE_UTILIZATION_CHECK,
DEFAULT_ENABLE_RESOURCE_UTILIZATION_CHECK);
}
+ public boolean isAllResourceUtilizationCheckersEnabled() {
+ return getProperty(ENABLE_ALL_RESOURCE_UTILIZATION_CHECKERS,
DEFAULT_ENABLE_ALL_RESOURCE_UTILIZATION_CHECKERS);
+ }
+
+ public boolean isDiskUtilizationCheckerEnabled() {
+ return getProperty(ENABLE_DISK_UTILIZATION_CHECKER,
DEFAULT_ENABLE_DISK_UTILIZATION_CHECKER);
+ }
+
public boolean getEnableBatchMessageMode() {
return getProperty(ENABLE_BATCH_MESSAGE_MODE,
DEFAULT_ENABLE_BATCH_MESSAGE_MODE);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 75ff7eb298f..d57c7926b77 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -96,7 +96,9 @@ import
org.apache.pinot.controller.api.upload.SegmentValidationUtils;
import org.apache.pinot.controller.api.upload.ZKOperator;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
+import org.apache.pinot.controller.validation.ResourceUtilizationManager;
import org.apache.pinot.controller.validation.StorageQuotaChecker;
+import org.apache.pinot.controller.validation.UtilizationChecker;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
@@ -159,6 +161,9 @@ public class PinotSegmentUploadDownloadRestletResource {
@Inject
AccessControlFactory _accessControlFactory;
+ @Inject
+ ResourceUtilizationManager _resourceUtilizationManager;
+
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Path("/segments/{tableName}/{segmentName}")
@@ -406,6 +411,27 @@ public class PinotSegmentUploadDownloadRestletResource {
SegmentValidationUtils.checkStorageQuota(segmentName,
segmentSizeInBytes, untarredSegmentSizeInBytes, tableConfig,
_storageQuotaChecker);
+ // Perform resource utilization checks
+ UtilizationChecker.CheckResult isResourceUtilizationWithinLimits =
+
_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType,
+ UtilizationChecker.CheckPurpose.OFFLINE_SEGMENT_UPLOAD);
+ if (isResourceUtilizationWithinLimits ==
UtilizationChecker.CheckResult.FAIL) {
+ _controllerMetrics.setOrUpdateTableGauge(tableNameWithType,
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
+ 1L);
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Resource utilization limit exceeded for table: %s,
rejecting upload for segment: %s",
+ tableNameWithType,
+ segmentName),
+ Response.Status.FORBIDDEN);
+ } else if (isResourceUtilizationWithinLimits ==
UtilizationChecker.CheckResult.UNDETERMINED) {
+ LOGGER.warn(
+ "Resource utilization status could not be determined for table:
{}. Will allow segment upload to "
+ + "proceed.",
+ tableNameWithType);
+ }
+ _controllerMetrics.setOrUpdateTableGauge(tableNameWithType,
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
+ 0L);
+
// Encrypt segment
String crypterNameInTableConfig =
tableConfig.getValidationConfig().getCrypterClassName();
Pair<String, File> encryptionInfo =
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
index 7b122bb98f1..7fd1fddaeec 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
@@ -54,7 +54,9 @@ public interface UtilizationChecker {
enum CheckPurpose {
// REALTIME_INGESTION if the check is performed from the realtime
ingestion code path to pause ingestion
// TASK_GENERATION if the check is performed from the task generation
framework to pause creation of new tasks
- REALTIME_INGESTION, TASK_GENERATION
+ // OFFLINE_SEGMENT_UPLOAD if the check is performed from the offline
segment upload code path to reject a segment
+ // upload
+ REALTIME_INGESTION, TASK_GENERATION, OFFLINE_SEGMENT_UPLOAD
}
enum CheckResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]