praveenc7 commented on code in PR #16018:
URL: https://github.com/apache/pinot/pull/16018#discussion_r2220541322


##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:
##########
@@ -212,7 +212,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   /**
    * Approximate heap bytes used by the mutable JSON index at the time of 
index close.
    */
-  MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false);
+  MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false),
+  // Workload Budget exceeded counter
+  WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times 
workload budget exceeded");

Review Comment:
   Agree, let me add it there and make it both workload level and global level. 
Do you think table-level make sense here? I see an value in cases where 
multiple table share the same workload, providing insights into table-level 
exhausation



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -199,6 +203,18 @@ public BrokerResponse handleRequest(JsonNode request, 
@Nullable SqlNodeAndOption
         return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, 
errorMessage);
       }
 
+      String workloadName = 
sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.WORKLOAD_NAME);
+      boolean isSecondary = Boolean.parseBoolean(sqlNodeAndOptions.getOptions()
+          .getOrDefault(Broker.Request.QueryOptionKey.IS_SECONDARY_WORKLOAD, 
"false"));
+      if (workloadName != null && _workloadBudgetManager != null
+          && !_workloadBudgetManager.canAdmitQuery(workloadName, isSecondary)) 
{
+        String errorMessage = "Request " + requestId + ": " + query + " 
exceeds query quota for workload: "
+            + workloadName;
+        LOGGER.info(errorMessage);
+        requestContext.setErrorCode(QueryErrorCode.WORKLOAD_QUOTA_EXCEEDED);
+        return new 
BrokerResponseNative(QueryErrorCode.WORKLOAD_QUOTA_EXCEEDED, errorMessage);

Review Comment:
   We can but, isn't it good to fast exit and not waste resources if the query 
is eventually getting killed?



##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -145,6 +168,43 @@ private void startBudgetResetTask() {
     }, _enforcementWindowMs, _enforcementWindowMs, TimeUnit.MILLISECONDS);
   }
 
+  /**
+   * Determines whether a query for the given workload can be admitted under 
CPU-only budgets.
+   *
+   * <p>Admission rules:
+   * <ol>
+   *   <li>If the manager is disabled or no budget exists for the workload, 
always admit.</li>
+   *   <li>If CPU budget remains above zero, admit immediately.</li>
+   *   <li>Otherwise, reject (return false).</li>
+   * </ol>
+   *
+   * <p>Note: This method currently uses a strict check, where CPU and memory 
budgets must be above zero.
+   * This may be relaxed in the future to allow for a percentage of other 
remaining budget to be used. At that point,
+   * we can have different admission policies like: Strict, Stealing, etc.
+   *
+   * @param workload the workload identifier to check budget for
+   * @return true if the query may be accepted; false if budget is insufficient
+   */
+  public boolean canAdmitQuery(String workload, boolean isSecondary) {

Review Comment:
   Removed this



##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -47,10 +48,32 @@ public WorkloadBudgetManager(PinotConfiguration config) {
     }
     _enforcementWindowMs = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
         CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
+    initSecondaryWorkloadBudget(config);
     startBudgetResetTask();
     LOGGER.info("WorkloadBudgetManager initialized with enforcement window: 
{}ms", _enforcementWindowMs);
   }
 
+  /**
+   * This budget is primarily meant to be used for queries that need to be 
issued in a low priority manner.
+   * This is fixed budget allocated during host startup and used across all 
secondary queries.
+   */
+  private void initSecondaryWorkloadBudget(PinotConfiguration config) {
+    _secondaryWorkloadName = config.getProperty(
+        CommonConstants.Accounting.CONFIG_OF_SECONDARY_WORKLOAD_NAME,
+        CommonConstants.Accounting.DEFAULT_SECONDARY_WORKLOAD_NAME);
+
+    double secondaryCpuPercentage = config.getProperty(
+        CommonConstants.Accounting.CONFIG_OF_SECONDARY_WORKLOAD_CPU_PERCENTAGE,
+        CommonConstants.Accounting.DEFAULT_SECONDARY_WORKLOAD_CPU_PERCENTAGE);
+
+    // The Secondary CPU budget is based on the CPU percentage allocated for 
secondary workload.
+    // The memory budget is set to Long.MAX_VALUE for now, since we do not 
have a specific memory budget for
+    // secondary queries.
+    long secondaryCpuBudget = (long) (secondaryCpuPercentage * 
_enforcementWindowMs * 100_000L);

Review Comment:
   Right, missed to add `availableProcessors`



##########
pinot-spi/src/main/java/org/apache/pinot/spi/exception/QueryErrorCode.java:
##########
@@ -52,6 +52,7 @@ public enum QueryErrorCode {
   BROKER_REQUEST_SEND(425, "BrokerRequestSend"),
   SERVER_NOT_RESPONDING(427, "ServerNotResponding"),
   TOO_MANY_REQUESTS(429, "TooManyRequests"),
+  WORKLOAD_QUOTA_EXCEEDED(429, "WorkloadQuotaExceeded"),

Review Comment:
   I think the other place would be the `WorkloadAggregator`, let me see if 
there is way to add this



##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/WorkloadScheduler.java:
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.accounting.WorkloadBudgetManager;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.core.query.scheduler.resources.WorkloadResourceManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler implementation that supports query admission control based on 
workload-specific budgets.
+ *
+ * <p>This class integrates with the {@link WorkloadBudgetManager} to apply 
CPU and memory budget enforcement
+ * for different workloads, including primary and secondary workloads.</p>
+ *
+ * <p>Secondary workload configuration is used for queries tagged as 
"secondary". Queries that exceed their budget
+ * will be rejected.</p>
+ *
+ */
+public class WorkloadScheduler extends QueryScheduler {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WorkloadScheduler.class);
+
+  private final WorkloadBudgetManager _workloadBudgetManager;
+  private final ServerMetrics _serverMetrics;
+
+  public WorkloadScheduler(PinotConfiguration config, QueryExecutor 
queryExecutor, ServerMetrics metrics,
+                           LongAccumulator latestQueryTime, 
ThreadResourceUsageAccountant resourceUsageAccountant) {
+    super(config, queryExecutor, new WorkloadResourceManager(config, 
resourceUsageAccountant), metrics,
+        latestQueryTime);
+    _serverMetrics = metrics;
+    _workloadBudgetManager = 
Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
+  }
+
+  @Override
+  public String name() {
+    return "WorkloadScheduler";
+  }
+
+  @Override
+  public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+    if (!_isRunning) {
+      return shuttingDown(queryRequest);
+    }
+
+    boolean isSecondary = 
QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions());

Review Comment:
   Supporting secondary workloads breaks down into two concerns:
   
   1. Defining the secondary workload name
   2. Assigning a cost to that workload.
   
   I moved all cost initialization into the WorkloadBudgetManager to keep the 
scheduler focused on admission logic. However, passing the isSecondary flag 
into the canAdmit API pollutes its intent—it shouldn’t need to know about 
secondary workloads. To clean this up, I’ll separate how and where we look up 
the secondary workload:
   
   - In the scheduler, read the secondary workload name only for 
backward‐compatibility routing.
   - In WorkloadBudgetManager, read it again at startup to establish its budget 
cost.
   
   It does repeat some code, but this keeps each component responsible for its 
own concern without overloading canAdmit with extra context.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/WorkloadResourceManager.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler.resources;
+
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+public class WorkloadResourceManager extends ResourceManager {

Review Comment:
   I know, currently it is not different than `UnboundedResourceManager`. The 
taught was to avoid any warranted change made to`UnboundedResourceManager` 
doesn't effect  our scheduler and also allow for this resource to evolve 
differently, if needed in Future. However I get your point that we can create 
ResourceManager when we have some new changes or diverge from 
`UnboundedResourceManager`



##########
pinot-spi/src/main/java/org/apache/pinot/core/accounting/WorkloadBudgetManager.java:
##########
@@ -47,10 +48,32 @@ public WorkloadBudgetManager(PinotConfiguration config) {
     }
     _enforcementWindowMs = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS,
         CommonConstants.Accounting.DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS);
+    initSecondaryWorkloadBudget(config);
     startBudgetResetTask();
     LOGGER.info("WorkloadBudgetManager initialized with enforcement window: 
{}ms", _enforcementWindowMs);
   }
 
+  /**
+   * This budget is primarily meant to be used for queries that need to be 
issued in a low priority manner.
+   * This is fixed budget allocated during host startup and used across all 
secondary queries.
+   */
+  private void initSecondaryWorkloadBudget(PinotConfiguration config) {
+    _secondaryWorkloadName = config.getProperty(

Review Comment:
   Made the DEFAULT_SECONDARY_WORKLOAD_CPU_PERCENTAGE to be 0, so the 
secondaryWorkload won't be turned on by default



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to