praveenc7 commented on code in PR #16018:
URL: https://github.com/apache/pinot/pull/16018#discussion_r2220541322
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -212,7 +212,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
/**
* Approximate heap bytes used by the mutable JSON index at the time of
index close.
*/
- MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false);
+ MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false),
+ // Workload Budget exceeded counter
+ WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times
workload budget exceeded");
Review Comment:
Agree, let me add it there and make it both workload level and global level.
Do you think table-level make sense here? I see an value in cases where
multiple table share the same workload, providing insights into table-level
exhausation
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -199,6 +203,18 @@ public BrokerResponse handleRequest(JsonNode request,
@Nullable SqlNodeAndOption
return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS,
errorMessage);
}
+ String workloadName =
sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.WORKLOAD_NAME);
+ boolean isSecondary = Boolean.parseBoolean(sqlNodeAndOptions.getOptions()
+ .getOrDefault(Broker.Request.QueryOptionKey.IS_SECONDARY_WORKLOAD,
"false"));
+ if (workloadName != null && _workloadBudgetManager != null
+ && !_workloadBudgetManager.canAdmitQuery(workloadName, isSecondary))
{
+ String errorMessage = "Request " + requestId + ": " + query + "
exceeds query quota for workload: "
+ + workloadName;
+ LOGGER.info(errorMessage);
+ requestContext.setErrorCode(QueryErrorCode.WORKLOAD_QUOTA_EXCEEDED);
+ return new
BrokerResponseNative(QueryErrorCode.WORKLOAD_QUOTA_EXCEEDED, errorMessage);
Review Comment:
We can but, isn't it good to fast exit and not waste resources if the query
is eventually getting killed?
##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -145,6 +168,43 @@ private void startBudgetResetTask() {
}, _enforcementWindowMs, _enforcementWindowMs, TimeUnit.MILLISECONDS);
}
+ /**
+ * Determines whether a query for the given workload can be admitted under
CPU-only budgets.
+ *
+ * <p>Admission rules:
+ * <ol>
+ * <li>If the manager is disabled or no budget exists for the workload,
always admit.</li>
+ * <li>If CPU budget remains above zero, admit immediately.</li>
+ * <li>Otherwise, reject (return false).</li>
+ * </ol>
+ *
+ * <p>Note: This method currently uses a strict check, where CPU and memory
budgets must be above zero.
+ * This may be relaxed in the future to allow for a percentage of other
remaining budget to be used. At that point,
+ * we can have different admission policies like: Strict, Stealing, etc.
+ *
+ * @param workload the workload identifier to check budget for
+ * @return true if the query may be accepted; false if budget is insufficient
+ */
+ public boolean canAdmitQuery(String workload, boolean isSecondary) {
Review Comment:
Removed this
##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -47,10 +48,32 @@ public WorkloadBudgetManager(PinotConfiguration config) {
}
_enforcementWindowMs =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
+ initSecondaryWorkloadBudget(config);
startBudgetResetTask();
LOGGER.info("WorkloadBudgetManager initialized with enforcement window:
{}ms", _enforcementWindowMs);
}
+ /**
+ * This budget is primarily meant to be used for queries that need to be
issued in a low priority manner.
+ * This is fixed budget allocated during host startup and used across all
secondary queries.
+ */
+ private void initSecondaryWorkloadBudget(PinotConfiguration config) {
+ _secondaryWorkloadName = config.getProperty(
+ CommonConstants.Accounting.CONFIG_OF_SECONDARY_WORKLOAD_NAME,
+ CommonConstants.Accounting.DEFAULT_SECONDARY_WORKLOAD_NAME);
+
+ double secondaryCpuPercentage = config.getProperty(
+ CommonConstants.Accounting.CONFIG_OF_SECONDARY_WORKLOAD_CPU_PERCENTAGE,
+ CommonConstants.Accounting.DEFAULT_SECONDARY_WORKLOAD_CPU_PERCENTAGE);
+
+ // The Secondary CPU budget is based on the CPU percentage allocated for
secondary workload.
+ // The memory budget is set to Long.MAX_VALUE for now, since we do not
have a specific memory budget for
+ // secondary queries.
+ long secondaryCpuBudget = (long) (secondaryCpuPercentage *
_enforcementWindowMs * 100_000L);
Review Comment:
Right, missed to add `availableProcessors`
##########
pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java:
##########
@@ -52,6 +52,7 @@ public enum QueryErrorCode {
BROKER_REQUEST_SEND(425, "BrokerRequestSend"),
SERVER_NOT_RESPONDING(427, "ServerNotResponding"),
TOO_MANY_REQUESTS(429, "TooManyRequests"),
+ WORKLOAD_QUOTA_EXCEEDED(429, "WorkloadQuotaExceeded"),
Review Comment:
I think the other place would be the `WorkloadAggregator`, let me see if
there is way to add this
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/WorkloadScheduler.java:
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.accounting.WorkloadBudgetManager;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.core.query.scheduler.resources.WorkloadResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler implementation that supports query admission control based on
workload-specific budgets.
+ *
+ * <p>This class integrates with the {@link WorkloadBudgetManager} to apply
CPU and memory budget enforcement
+ * for different workloads, including primary and secondary workloads.</p>
+ *
+ * <p>Secondary workload configuration is used for queries tagged as
"secondary". Queries that exceed their budget
+ * will be rejected.</p>
+ *
+ */
+public class WorkloadScheduler extends QueryScheduler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WorkloadScheduler.class);
+
+ private final WorkloadBudgetManager _workloadBudgetManager;
+ private final ServerMetrics _serverMetrics;
+
+ public WorkloadScheduler(PinotConfiguration config, QueryExecutor
queryExecutor, ServerMetrics metrics,
+ LongAccumulator latestQueryTime,
ThreadResourceUsageAccountant resourceUsageAccountant) {
+ super(config, queryExecutor, new WorkloadResourceManager(config,
resourceUsageAccountant), metrics,
+ latestQueryTime);
+ _serverMetrics = metrics;
+ _workloadBudgetManager =
Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
+ }
+
+ @Override
+ public String name() {
+ return "WorkloadScheduler";
+ }
+
+ @Override
+ public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+ if (!_isRunning) {
+ return shuttingDown(queryRequest);
+ }
+
+ boolean isSecondary =
QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions());
Review Comment:
Supporting secondary workloads breaks down into two concerns:
1. Defining the secondary workload name
2. Assigning a cost to that workload.
I moved all cost initialization into the WorkloadBudgetManager to keep the
scheduler focused on admission logic. However, passing the isSecondary flag
into the canAdmit API pollutes its intent—it shouldn’t need to know about
secondary workloads. To clean this up, I’ll separate how and where we look up
the secondary workload:
- In the scheduler, read the secondary workload name only for
backward‐compatibility routing.
- In WorkloadBudgetManager, read it again at startup to establish its budget
cost.
It does repeat some code, but this keeps each component responsible for its
own concern without overloading canAdmit with extra context.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/WorkloadResourceManager.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler.resources;
+
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+public class WorkloadResourceManager extends ResourceManager {
Review Comment:
I know, currently it is not different than `UnboundedResourceManager`. The
taught was to avoid any warranted change made to`UnboundedResourceManager`
doesn't effect our scheduler and also allow for this resource to evolve
differently, if needed in Future. However I get your point that we can create
ResourceManager when we have some new changes or diverge from
`UnboundedResourceManager`
##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -47,10 +48,32 @@ public WorkloadBudgetManager(PinotConfiguration config) {
}
_enforcementWindowMs =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
+ initSecondaryWorkloadBudget(config);
startBudgetResetTask();
LOGGER.info("WorkloadBudgetManager initialized with enforcement window:
{}ms", _enforcementWindowMs);
}
+ /**
+ * This budget is primarily meant to be used for queries that need to be
issued in a low priority manner.
+ * This is fixed budget allocated during host startup and used across all
secondary queries.
+ */
+ private void initSecondaryWorkloadBudget(PinotConfiguration config) {
+ _secondaryWorkloadName = config.getProperty(
Review Comment:
Made the DEFAULT_SECONDARY_WORKLOAD_CPU_PERCENTAGE to be 0, so the
secondaryWorkload won't be turned on by default
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]