This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new abb38c3f26 Add support for application-level query quota. (#14226)
abb38c3f26 is described below
commit abb38c3f26ac27f13302417dd298db4b7eb3d90b
Author: Bolek Ziobrowski <[email protected]>
AuthorDate: Tue Oct 22 15:55:53 2024 +0200
Add support for application-level query quota. (#14226)
Adds a way to throttle queries (executed with both v1 or v2 engine) based
on applicationName query option.
Queries such as :
```
set applicationName='test';
select * from tables
```
---
.../broker/api/resources/PinotBrokerDebug.java | 11 +
.../BrokerUserDefinedMessageHandlerFactory.java | 27 ++
.../HelixExternalViewBasedQueryQuotaManager.java | 284 ++++++++++++++++++---
.../pinot/broker/queryquota/QueryQuotaManager.java | 14 +
.../requesthandler/BaseBrokerRequestHandler.java | 13 +
.../MultiStageBrokerRequestHandler.java | 2 +-
...elixExternalViewBasedQueryQuotaManagerTest.java | 153 ++++++++++-
.../BaseSingleStageBrokerRequestHandlerTest.java | 1 +
.../ApplicationQpsQuotaRefreshMessage.java | 61 +++++
.../pinot/common/metadata/ZKMetadataProvider.java | 72 ++++++
.../pinot/controller/api/resources/Constants.java | 1 +
.../PinotApplicationQuotaRestletResource.java | 139 ++++++++++
.../resources/PinotDatabaseRestletResource.java | 2 +-
.../helix/core/PinotHelixResourceManager.java | 44 ++++
.../java/org/apache/pinot/core/auth/Actions.java | 2 +
.../tests/BaseClusterIntegrationTest.java | 29 ++-
.../tests/QueryQuotaClusterIntegrationTest.java | 168 ++++++++++--
.../apache/pinot/spi/utils/CommonConstants.java | 4 +
18 files changed, 955 insertions(+), 72 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index 78a6dd324f..a220bc53a5 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -323,4 +323,15 @@ public class PinotBrokerDebug {
@ApiParam(value = "Name of the database") @PathParam("databaseName")
String databaseName) {
return
String.valueOf(_queryQuotaManager.getDatabaseQueryQuota(databaseName));
}
+
+ @GET
+ @Path("debug/applicationQuotas/{applicationName}")
+ @Produces(MediaType.TEXT_PLAIN)
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_APPLICATION_QUERY_QUOTA)
+ @ApiOperation(value = "Get the active query quota being imposed on the
application", notes = "This is a debug "
+ + "endpoint, and won't maintain backward compatibility")
+ public String getApplicationQueryQuota(
+ @ApiParam(value = "Name of the application")
@PathParam("applicationName") String applicationName) {
+ return
String.valueOf(_queryQuotaManager.getApplicationQueryQuota(applicationName));
+ }
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
index 2c2cc33532..f4da13621e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerUserDefinedMessageHandlerFactory.java
@@ -25,6 +25,7 @@ import
org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
@@ -65,6 +66,8 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
return new RebuildRoutingTableMessageHandler(new
RoutingTableRebuildMessage(message), context);
case DatabaseConfigRefreshMessage.REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE:
return new RefreshDatabaseConfigMessageHandler(new
DatabaseConfigRefreshMessage(message), context);
+ case ApplicationQpsQuotaRefreshMessage.REFRESH_APP_QUOTA_MSG_SUB_TYPE:
+ return new RefreshApplicationQpsQuotaMessageHandler(new
ApplicationQpsQuotaRefreshMessage(message), context);
default:
// NOTE: Log a warning and return no-op message handler for
unsupported message sub-types. This can happen when
// a new message sub-type is added, and the sender gets deployed
first while receiver is still running the
@@ -162,6 +165,30 @@ public class BrokerUserDefinedMessageHandlerFactory
implements MessageHandlerFac
}
}
+ private class RefreshApplicationQpsQuotaMessageHandler extends
MessageHandler {
+ final String _applicationName;
+
+ RefreshApplicationQpsQuotaMessageHandler(ApplicationQpsQuotaRefreshMessage
applicationQpsAuotaRefreshMessage,
+ NotificationContext context) {
+ super(applicationQpsAuotaRefreshMessage, context);
+ _applicationName =
applicationQpsAuotaRefreshMessage.getApplicationName();
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() {
+
_queryQuotaManager.createOrUpdateApplicationRateLimiter(_applicationName);
+ HelixTaskResult result = new HelixTaskResult();
+ result.setSuccess(true);
+ return result;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ LOGGER.error("Got error while refreshing query quota for application: {}
(error code: {}, error type: {})",
+ _applicationName, code, type, e);
+ }
+ }
+
private class RebuildRoutingTableMessageHandler extends MessageHandler {
final String _tableNameWithType;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index b0684d0bc8..48c5c33d0a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
/**
* This class is to support the qps quota feature.
- * It allows performing qps quota check at table level and database level
+ * It allows performing qps quota check at table level, database and
application level.
* For table level check it depends on the broker source change to update the
dynamic rate limit,
* which means it gets updated when a new table added or a broker restarted.
* For database level check it depends on the broker as well as cluster config
and database config change
@@ -67,6 +67,11 @@ import org.slf4j.LoggerFactory;
* - the database config is updated
* - new table is assigned to the broker (rate limiter is created if not
present)
* - broker added or removed from cluster
+ * For application level check it depends on the broker as well as cluster
config and application quota change
+ * to update the dynamic rate limit, which means it gets updated when
+ * - the default query quota at cluster config is updated
+ * - the application quota is updated (e.g. via rest api)
+ * - broker added or removed from cluster
*/
public class HelixExternalViewBasedQueryQuotaManager implements
ClusterChangeHandler, QueryQuotaManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
@@ -81,7 +86,9 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
private final AtomicInteger _lastKnownBrokerResourceVersion = new
AtomicInteger(-1);
private final Map<String, QueryQuotaEntity> _rateLimiterMap = new
ConcurrentHashMap<>();
private final Map<String, QueryQuotaEntity> _databaseRateLimiterMap = new
ConcurrentHashMap<>();
+ private final Map<String, QueryQuotaEntity> _applicationRateLimiterMap = new
ConcurrentHashMap<>();
private double _defaultQpsQuotaForDatabase;
+ private double _defaultQpsQuotaForApplication;
private HelixManager _helixManager;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -98,29 +105,66 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
_helixManager = helixManager;
_propertyStore = _helixManager.getHelixPropertyStore();
_defaultQpsQuotaForDatabase = getDefaultQueryQuotaForDatabase();
+ _defaultQpsQuotaForApplication = getDefaultQueryQuotaForApplication();
getQueryQuotaEnabledFlagFromInstanceConfig();
+
+ initializeApplicationQpsQuotas();
+ }
+
+ // read all app quotas from ZK and create rate limiters
+ private void initializeApplicationQpsQuotas() {
+ Map<String, Double> quotas =
+
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
+
+ if (quotas == null || quotas.isEmpty()) {
+ return;
+ }
+
+ ExternalView brokerResource = getBrokerResource();
+ int numOnlineBrokers = getNumOnlineBrokers(brokerResource);
+
+ for (Map.Entry<String, Double> entry : quotas.entrySet()) {
+ if (entry.getKey() == null) {
+ continue;
+ }
+
+ String appName = entry.getKey();
+ double appQpsQuota =
+ entry.getValue() != null && entry.getValue() != -1.0d ?
entry.getValue() : _defaultQpsQuotaForApplication;
+
+ if (appQpsQuota < 0) {
+ buildEmptyOrResetApplicationRateLimiter(appName);
+ continue;
+ }
+
+ double perBrokerQpsQuota = appQpsQuota / numOnlineBrokers;
+ LOGGER.info("Adding new query rate limiter for application {} with rate
{}.", appName, perBrokerQpsQuota);
+ QueryQuotaEntity queryQuotaEntity =
+ new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota), new
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+ new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
numOnlineBrokers, appQpsQuota, -1);
+ _applicationRateLimiterMap.put(appName, queryQuotaEntity);
+ }
+
+ return;
}
@Override
public void processClusterChange(HelixConstants.ChangeType changeType) {
Preconditions.checkState(CHANGE_TYPES_TO_PROCESS.contains(changeType),
"Illegal change type: " + changeType);
if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
- ExternalView brokerResourceEV = HelixHelper
- .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
- CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ ExternalView brokerResourceEV = getBrokerResource();
processQueryRateLimitingExternalViewChange(brokerResourceEV);
} else if (changeType == HelixConstants.ChangeType.INSTANCE_CONFIG) {
processQueryRateLimitingInstanceConfigChange();
} else {
processQueryRateLimitingClusterConfigChange();
+ processApplicationQueryRateLimitingClusterConfigChange();
}
}
public void initOrUpdateTableQueryQuota(String tableNameWithType) {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- ExternalView brokerResourceEV = HelixHelper
- .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
- CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ ExternalView brokerResourceEV = getBrokerResource();
initOrUpdateTableQueryQuota(tableConfig, brokerResourceEV);
}
@@ -264,52 +308,103 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
}
+ /**
+ * Updates the application rate limiter if it already exists. It won't
create a new rate limiter.
+ *
+ * @param applicationName application name for which rate limiter needs to
be updated
+ */
+ public void updateApplicationRateLimiter(String applicationName) {
+ if (!_applicationRateLimiterMap.containsKey(applicationName)) {
+ return;
+ }
+ createOrUpdateApplicationRateLimiter(applicationName);
+ }
+
// Caller method need not worry about getting lock on _databaseRateLimiterMap
// as this method will do idempotent updates to the database rate limiters
private synchronized void createOrUpdateDatabaseRateLimiter(List<String>
databaseNames) {
- ExternalView brokerResource = HelixHelper
- .getExternalViewForResource(_helixManager.getClusterManagmentTool(),
_helixManager.getClusterName(),
- CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ ExternalView brokerResource = getBrokerResource();
for (String databaseName : databaseNames) {
- double databaseQpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
- if (databaseQpsQuota < 0) {
+ double qpsQuota = getEffectiveQueryQuotaOnDatabase(databaseName);
+ if (qpsQuota < 0) {
buildEmptyOrResetDatabaseRateLimiter(databaseName);
continue;
}
int numOnlineBrokers = getNumOnlineBrokers(databaseName, brokerResource);
- double perBrokerQpsQuota = databaseQpsQuota / numOnlineBrokers;
- QueryQuotaEntity oldQueryQuotaEntity =
_databaseRateLimiterMap.get(databaseName);
- if (oldQueryQuotaEntity == null) {
+ double perBrokerQpsQuota = qpsQuota / numOnlineBrokers;
+ QueryQuotaEntity oldEntity = _databaseRateLimiterMap.get(databaseName);
+ if (oldEntity == null) {
LOGGER.info("Adding new query rate limiter for database {} with rate
{}.", databaseName, perBrokerQpsQuota);
- QueryQuotaEntity queryQuotaEntity = new
QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota),
- new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND), new
MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
- numOnlineBrokers, databaseQpsQuota, -1);
+ QueryQuotaEntity queryQuotaEntity =
+ new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota),
+ new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+ new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND),
+ numOnlineBrokers, qpsQuota, -1);
_databaseRateLimiterMap.put(databaseName, queryQuotaEntity);
continue;
}
- boolean changeDetected = false;
- double oldQuota = oldQueryQuotaEntity.getRateLimiter() != null ?
oldQueryQuotaEntity.getRateLimiter().getRate()
- : -1;
- if (oldQueryQuotaEntity.getOverallRate() != databaseQpsQuota) {
- changeDetected = true;
- LOGGER.info("Overall quota changed for the database from {} to {}",
oldQueryQuotaEntity.getOverallRate(),
- databaseQpsQuota);
- oldQueryQuotaEntity.setOverallRate(databaseQpsQuota);
- }
- if (oldQueryQuotaEntity.getNumOnlineBrokers() != numOnlineBrokers) {
- changeDetected = true;
- LOGGER.info("Number of online brokers changed for the database from {}
to {}",
- oldQueryQuotaEntity.getNumOnlineBrokers(), numOnlineBrokers);
- oldQueryQuotaEntity.setNumOnlineBrokers(numOnlineBrokers);
+ checkQueryQuotaChanged(databaseName, oldEntity, qpsQuota, "database",
numOnlineBrokers, perBrokerQpsQuota);
+ }
+ }
+
+ public synchronized void createOrUpdateApplicationRateLimiter(String
applicationName) {
+
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
+ }
+
+ // Caller method need not worry about getting lock on
_applicationRateLimiterMap
+ // as this method will do idempotent updates to the application rate limiters
+ private synchronized void createOrUpdateApplicationRateLimiter(List<String>
applicationNames) {
+ ExternalView brokerResource = getBrokerResource();
+ for (String appName : applicationNames) {
+ double qpsQuota = getEffectiveQueryQuotaOnApplication(appName);
+ if (qpsQuota < 0) {
+ buildEmptyOrResetApplicationRateLimiter(appName);
+ continue;
}
- if (!changeDetected) {
- LOGGER.info("No change detected with the query rate limiter for
database {}", databaseName);
+ int numOnlineBrokers = getNumOnlineBrokers(brokerResource);
+ double perBrokerQpsQuota = qpsQuota / numOnlineBrokers;
+ QueryQuotaEntity oldEntity = _applicationRateLimiterMap.get(appName);
+ if (oldEntity == null) {
+ LOGGER.info("Adding new query rate limiter for application {} with
rate {}.", appName, perBrokerQpsQuota);
+ QueryQuotaEntity queryQuotaEntity =
+ new QueryQuotaEntity(RateLimiter.create(perBrokerQpsQuota), new
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+ new
MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), numOnlineBrokers, qpsQuota,
+ -1);
+ _applicationRateLimiterMap.put(appName, queryQuotaEntity);
continue;
}
- LOGGER.info("Updating existing query rate limiter for database {} from
rate {} to {}", databaseName, oldQuota,
- perBrokerQpsQuota);
-
oldQueryQuotaEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota));
+ checkQueryQuotaChanged(appName, oldEntity, qpsQuota, "application",
numOnlineBrokers, perBrokerQpsQuota);
+ }
+ }
+
+ private void checkQueryQuotaChanged(String appName, QueryQuotaEntity
oldEntity, double qpsQuota, String quotaType,
+ int numOnlineBrokers, double
perBrokerQpsQuota) {
+ boolean isChange = false;
+ double oldQuota = oldEntity.getRateLimiter() != null ?
oldEntity.getRateLimiter().getRate() : -1;
+ if (oldEntity.getOverallRate() != qpsQuota) {
+ isChange = true;
+ LOGGER.info("Overall quota changed for the {} {} from {} to {}",
quotaType, appName, oldEntity.getOverallRate(),
+ qpsQuota);
+ oldEntity.setOverallRate(qpsQuota);
+ }
+ if (oldEntity.getNumOnlineBrokers() != numOnlineBrokers) {
+ isChange = true;
+ LOGGER.info("Number of online brokers changed for the {} {} from {} to
{}",
+ quotaType, appName, oldEntity.getNumOnlineBrokers(),
numOnlineBrokers);
+ oldEntity.setNumOnlineBrokers(numOnlineBrokers);
+ }
+ if (!isChange) {
+ LOGGER.info("No change detected with the query rate limiter for {} {}",
quotaType, appName);
+ return;
}
+ LOGGER.info("Updating existing query rate limiter for {} {} from rate {}
to {}", quotaType, appName, oldQuota,
+ perBrokerQpsQuota);
+ oldEntity.setRateLimiter(RateLimiter.create(perBrokerQpsQuota));
+ }
+
+ private ExternalView getBrokerResource() {
+ return
HelixHelper.getExternalViewForResource(_helixManager.getClusterManagmentTool(),
+ _helixManager.getClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
}
// Pulling this logic to a separate placeholder method so that the quota
split logic
@@ -321,6 +416,10 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
return
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
}
+ private int getNumOnlineBrokers(ExternalView brokerResource) {
+ return
HelixHelper.getOnlineInstanceFromExternalView(brokerResource).size();
+ }
+
/**
* Utility to get the effective query quota being imposed on a database.
* It is computed based on the default quota set at cluster config and
override set at database config
@@ -337,6 +436,22 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
return _defaultQpsQuotaForDatabase;
}
+ /**
+ * Utility to get the effective query quota being imposed on an application.
It is computed based on the default quota
+ * set at cluster config.
+ *
+ * @param applicationName application name to get the query quota on.
+ * @return effective query quota limit being applied
+ */
+ private double getEffectiveQueryQuotaOnApplication(String applicationName) {
+ Map<String, Double> quotas =
+
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
+ if (quotas != null && quotas.get(applicationName) != null &&
quotas.get(applicationName) != -1.0d) {
+ return quotas.get(applicationName);
+ }
+ return _defaultQpsQuotaForApplication;
+ }
+
/**
* Creates a new database rate limiter. Will not update the database rate
limiter if it already exists.
* @param databaseName database name for which rate limiter needs to be
created
@@ -348,6 +463,18 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
createOrUpdateDatabaseRateLimiter(Collections.singletonList(databaseName));
}
+ /**
+ * Creates a new database rate limiter. Will not update the database rate
limiter if it already exists.
+ *
+ * @param applicationName database name for which rate limiter needs to be
created
+ */
+ public void createApplicationRateLimiter(String applicationName) {
+ if (_applicationRateLimiterMap.containsKey(applicationName)) {
+ return;
+ }
+
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
+ }
+
/**
* Build an empty rate limiter in the new query quota entity, or set the
rate limiter to null in an existing query
* quota entity.
@@ -365,6 +492,23 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
}
+ /**
+ * Build an empty rate limiter in the new query quota entity, or set the
rate limiter to null in an existing query
+ * quota entity.
+ */
+ private void buildEmptyOrResetApplicationRateLimiter(String applicationName)
{
+ QueryQuotaEntity quotaEntity =
_applicationRateLimiterMap.get(applicationName);
+ if (quotaEntity == null) {
+ // Create an QueryQuotaEntity object without setting a rate limiter.
+ quotaEntity = new QueryQuotaEntity(null, new
HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+ new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0);
+ _applicationRateLimiterMap.put(applicationName, quotaEntity);
+ } else {
+ // Set rate limiter to null for an existing QueryQuotaEntity object.
+ quotaEntity.setRateLimiter(null);
+ }
+ }
+
/**
* Build an empty rate limiter in the new query quota entity, or set the
rate limiter to null in an existing query
* quota entity.
@@ -428,6 +572,25 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
return tryAcquireToken(databaseName, queryQuota);
}
+ @Override
+ public boolean acquireApplication(String applicationName) {
+ if (isQueryRateLimitDisabled()) {
+ return true;
+ }
+ QueryQuotaEntity queryQuota =
_applicationRateLimiterMap.get(applicationName);
+ if (queryQuota == null) {
+ if (getDefaultQueryQuotaForApplication() < 0) {
+ return true;
+ } else {
+ createOrUpdateApplicationRateLimiter(applicationName);
+ queryQuota = _applicationRateLimiterMap.get(applicationName);
+ }
+ }
+
+ LOGGER.debug("Trying to acquire token for application: {}",
applicationName);
+ return tryAcquireToken(applicationName, queryQuota);
+ }
+
@Override
public double getTableQueryQuota(String tableNameWithType) {
return getQueryQuota(_rateLimiterMap.get(tableNameWithType));
@@ -438,6 +601,11 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
return getQueryQuota(_databaseRateLimiterMap.get(databaseName));
}
+ @Override
+ public double getApplicationQueryQuota(String applicationName) {
+ return getQueryQuota(_applicationRateLimiterMap.get(applicationName));
+ }
+
private double getQueryQuota(QueryQuotaEntity quotaEntity) {
return quotaEntity == null || quotaEntity.getRateLimiter() == null ? 0 :
quotaEntity.getRateLimiter().getRate();
}
@@ -503,11 +671,11 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
if (rateLimiter == null) {
return true;
}
- double perBrokerRate = rateLimiter.getRate();
// Emit the qps capacity utilization rate.
- int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
if (!rateLimiter.tryAcquire()) {
+ int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
+ double perBrokerRate = rateLimiter.getRate();
LOGGER.info("Quota is exceeded for table/database: {}. Per-broker rate:
{}. Current qps: {}", resourceName,
perBrokerRate, numHits);
return false;
@@ -526,6 +694,11 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
return _databaseRateLimiterMap;
}
+ @VisibleForTesting
+ public Map<String, QueryQuotaEntity> getApplicationRateLimiterMap() {
+ return _applicationRateLimiterMap;
+ }
+
@VisibleForTesting
public void cleanUpRateLimiterMap() {
_rateLimiterMap.clear();
@@ -625,7 +798,20 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
quota.setNumOnlineBrokers(onlineBrokerCount);
}
if (quota.getOverallRate() > 0) {
- quota.setRateLimiter(RateLimiter.create(quota.getOverallRate() /
onlineBrokerCount));
+ double qpsQuota = quota.getOverallRate() / onlineBrokerCount;
+ quota.setRateLimiter(RateLimiter.create(qpsQuota));
+ }
+ }
+
+ // handle EV change for application query quotas
+ for (Map.Entry<String, QueryQuotaEntity> it :
_applicationRateLimiterMap.entrySet()) {
+ QueryQuotaEntity quota = it.getValue();
+ if (quota.getNumOnlineBrokers() != onlineBrokerCount) {
+ quota.setNumOnlineBrokers(onlineBrokerCount);
+ }
+ if (quota.getOverallRate() > 0) {
+ double qpsQuota = quota.getOverallRate() / onlineBrokerCount;
+ quota.setRateLimiter(RateLimiter.create(qpsQuota));
}
}
@@ -651,6 +837,15 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
createOrUpdateDatabaseRateLimiter(new
ArrayList<>(_databaseRateLimiterMap.keySet()));
}
+ public void processApplicationQueryRateLimitingClusterConfigChange() {
+ double oldQpsQuota = _defaultQpsQuotaForApplication;
+ _defaultQpsQuotaForApplication = getDefaultQueryQuotaForApplication();
+ if (oldQpsQuota == _defaultQpsQuotaForApplication) {
+ return;
+ }
+ createOrUpdateApplicationRateLimiter(new
ArrayList<>(_applicationRateLimiterMap.keySet()));
+ }
+
private double getDefaultQueryQuotaForDatabase() {
HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
HelixConfigScope configScope = new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
@@ -660,6 +855,15 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
.getOrDefault(CommonConstants.Helix.DATABASE_MAX_QUERIES_PER_SECOND, "-1"));
}
+ private double getDefaultQueryQuotaForApplication() {
+ HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
+ HelixConfigScope configScope = new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+ _helixManager.getClusterName()).build();
+ return Double.parseDouble(helixAdmin.getConfig(configScope,
+
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
+
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1"));
+ }
+
/**
* Process query quota state change when instance config gets changed
*/
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
index 50d2a8c7ae..70c3ef7588 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
@@ -34,6 +34,13 @@ public interface QueryQuotaManager {
*/
boolean acquireDatabase(String databaseName);
+ /**
+ * Try to acquire a quota for the given application.
+ * @param applicationName application name
+ * @return {@code true} if the application quota has not been reached,
{@code false} otherwise
+ */
+ boolean acquireApplication(String applicationName);
+
/**
* Get the QPS quota in effect for the table
* @param tableNameWithType table name with type
@@ -47,4 +54,11 @@ public interface QueryQuotaManager {
* @return effective quota qps. 0 if no qps quota is set.
*/
double getDatabaseQueryQuota(String databaseName);
+
+ /**
+ * Get the QPS quota in effect for the application
+ * @param applicationName table name with type
+ * @return effective quota qps. 0 if no qps quota is set.
+ */
+ double getApplicationQueryQuota(String applicationName);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 406d3d032a..9a5e0e94a4 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -53,10 +53,13 @@ import
org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@ThreadSafe
public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BaseBrokerRequestHandler.class);
protected final PinotConfiguration _config;
protected final String _brokerId;
protected final BrokerRoutingManager _routingManager;
@@ -145,6 +148,16 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
}
}
+ // check app qps before doing anything
+ String application =
sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.APPLICATION_NAME);
+ if (application != null &&
!_queryQuotaManager.acquireApplication(application)) {
+ String errorMessage =
+ "Request " + requestId + ": " + query + " exceeds query quota for
application: " + application;
+ LOGGER.info(errorMessage);
+ requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+ return new
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
errorMessage));
+ }
+
// Add null handling option from broker config only if there is no
override in the query
if (_enableNullHandling != null) {
sqlNodeAndOptions.getOptions()
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 70dadd2f24..8aef51ebd1 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -349,7 +349,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
/**
- * Returns true if the QPS quota of the tables has exceeded.
+ * Returns true if the QPS quota of query tables, database or application
has been exceeded.
*/
private boolean hasExceededQPSQuota(@Nullable String database, Set<String>
tableNames,
RequestContext requestContext) {
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index a9ac37f544..131faee022 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -60,6 +60,7 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
private HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
private static final Map<String, String> CLUSTER_CONFIG_MAP = new
HashMap<>();
+ private static final String APP_NAME = "app";
private static final String RAW_TABLE_NAME = "testTable";
private static final String OFFLINE_TABLE_NAME = RAW_TABLE_NAME + "_OFFLINE";
private static final String REALTIME_TABLE_NAME = RAW_TABLE_NAME +
"_REALTIME";
@@ -138,10 +139,12 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore,
OFFLINE_TABLE_NAME);
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore,
REALTIME_TABLE_NAME);
ZKMetadataProvider.removeDatabaseConfig(_testPropertyStore,
CommonConstants.DEFAULT_DATABASE);
+ ZKMetadataProvider.removeApplicationQuotas(_testPropertyStore);
CLUSTER_CONFIG_MAP.clear();
}
_queryQuotaManager.cleanUpRateLimiterMap();
_queryQuotaManager.getDatabaseRateLimiterMap().clear();
+ _queryQuotaManager.getApplicationRateLimiterMap().clear();
}
@AfterTest
@@ -255,6 +258,112 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
+ @Test
+ public void testWhenNoTableOrDatabaseOrApplicationQuotasSetQueriesRunWild()
+ throws InterruptedException {
+ ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+ TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+ ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+ _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
brokerResource);
+
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+ _queryQuotaManager.createApplicationRateLimiter(APP_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(),
1);
+
Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(),
1);
+
+ setDefaultDatabaseQps("-1");
+ setDefaultApplicationQps("-1");
+
+ runQueries(25, false);
+ runQueries(40, false);
+ runQueries(100, false);
+
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ }
+
+ @Test
+ public void testWhenOnlySpecificAppQuotaIsSetItAffectsQueriesWithAppOption()
+ throws InterruptedException {
+ ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+ TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+ ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+ _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
brokerResource);
+
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+
+ ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, APP_NAME,
50d);
+ _queryQuotaManager.createApplicationRateLimiter(APP_NAME);
+
+ setDefaultDatabaseQps("-1");
+ setDefaultApplicationQps("-1");
+
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(),
1);
+
Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(),
1);
+
+ runQueries(50, false);
+ runQueries(100, true);
+
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ }
+
+ @Test
+ public void testWhenOnlyDefaultAppQuotaIsSetItAffectsAllApplications()
+ throws InterruptedException {
+ ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
+ TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
+ ZKMetadataProvider.setTableConfig(_testPropertyStore, tableConfig);
+ _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
brokerResource);
+
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
+
+ setDefaultDatabaseQps("-1");
+ setDefaultApplicationQps("50");
+
+ ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, "someApp",
100d);
+ _queryQuotaManager.createApplicationRateLimiter("someApp");
+
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ Assert.assertEquals(_queryQuotaManager.getDatabaseRateLimiterMap().size(),
1);
+
Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(),
1);
+
+ runQueries(100, true, APP_NAME);
+ runQueries(100, true, "otherApp");
+ runQueries(100, false, "someApp");
+ runQueries(201, true, "someApp");
+
+
Assert.assertEquals(_queryQuotaManager.getApplicationRateLimiterMap().size(),
3);
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ }
+
+ @Test
+ public void tesCreateAndUpdateAppRateLimiterChangesRateLimiterMap() {
+ Map<String, Double> apps = new HashMap<>();
+ apps.put("app1", null);
+ apps.put("app2", 1d);
+ apps.put("app3", 2d);
+
+ apps.entrySet().stream().forEach(e -> {
+ ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore,
e.getKey(), e.getValue());
+ });
+ apps.entrySet().forEach(app ->
_queryQuotaManager.createApplicationRateLimiter(app.getKey()));
+ Map<String, QueryQuotaEntity> appQuotaMap =
_queryQuotaManager.getApplicationRateLimiterMap();
+
+ Assert.assertNull(appQuotaMap.get("app1").getRateLimiter());
+ Assert.assertEquals(appQuotaMap.get("app2").getRateLimiter().getRate(), 1);
+ Assert.assertEquals(appQuotaMap.get("app3").getRateLimiter().getRate(), 2);
+
+ ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, "app1", 1d);
+ ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, "app2", 2d);
+
+ apps.entrySet().forEach(e ->
_queryQuotaManager.updateApplicationRateLimiter(e.getKey()));
+
+ Assert.assertEquals(appQuotaMap.get("app1").getRateLimiter().getRate(), 1);
+ Assert.assertEquals(appQuotaMap.get("app2").getRateLimiter().getRate(), 2);
+ Assert.assertEquals(appQuotaMap.get("app3").getRateLimiter().getRate(), 2);
+ }
+
@Test
public void testCreateOrUpdateDatabaseRateLimiter() {
List<String> dbList = new ArrayList<>(2);
@@ -264,19 +373,23 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
DatabaseConfig db1 = new DatabaseConfig(dbList.get(0), new
QuotaConfig(null, null));
DatabaseConfig db2 = new DatabaseConfig(dbList.get(1), new
QuotaConfig(null, "1"));
DatabaseConfig db3 = new DatabaseConfig(dbList.get(2), new
QuotaConfig(null, "2"));
+
ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1);
ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2);
ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db3);
+
dbList.forEach(db -> _queryQuotaManager.createDatabaseRateLimiter(db));
Map<String, QueryQuotaEntity> dbQuotaMap =
_queryQuotaManager.getDatabaseRateLimiterMap();
Assert.assertNull(dbQuotaMap.get(dbList.get(0)).getRateLimiter());
Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(),
1);
Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(),
2);
+
db1.setQuotaConfig(new QuotaConfig(null, "1"));
db2.setQuotaConfig(new QuotaConfig(null, "2"));
ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db1);
ZKMetadataProvider.setDatabaseConfig(_testPropertyStore, db2);
dbList.forEach(db -> _queryQuotaManager.updateDatabaseRateLimiter(db));
+
Assert.assertEquals(dbQuotaMap.get(dbList.get(0)).getRateLimiter().getRate(),
1);
Assert.assertEquals(dbQuotaMap.get(dbList.get(1)).getRateLimiter().getRate(),
2);
Assert.assertEquals(dbQuotaMap.get(dbList.get(2)).getRateLimiter().getRate(),
2);
@@ -509,6 +622,11 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
_queryQuotaManager.processQueryRateLimitingClusterConfigChange();
}
+ private void setDefaultApplicationQps(String maxQps) {
+
CLUSTER_CONFIG_MAP.put(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND,
maxQps);
+
_queryQuotaManager.processApplicationQueryRateLimitingClusterConfigChange();
+ }
+
private void setDatabaseQps(DatabaseConfig databaseConfig, String maxQps) {
QuotaConfig quotaConfig = new QuotaConfig(null, maxQps);
databaseConfig.setQuotaConfig(quotaConfig);
@@ -516,11 +634,21 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
_queryQuotaManager.createDatabaseRateLimiter(CommonConstants.DEFAULT_DATABASE);
}
+ private void setApplicationQps(String appName, Double maxQps) {
+ ZKMetadataProvider.setApplicationQpsQuota(_testPropertyStore, appName,
maxQps);
+ _queryQuotaManager.createApplicationRateLimiter(appName);
+ }
+
private void setQps(TableConfig tableConfig) {
QuotaConfig quotaConfig = new QuotaConfig(null, TABLE_MAX_QPS_STR);
tableConfig.setQuotaConfig(quotaConfig);
}
+ private void setQps(TableConfig tableConfig, String value) {
+ QuotaConfig quotaConfig = new QuotaConfig(null, value);
+ tableConfig.setQuotaConfig(quotaConfig);
+ }
+
private static ExternalView generateBrokerResource(String tableName) {
ExternalView brokerResource = new
ExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
brokerResource.setState(tableName, BROKER_INSTANCE_ID, "ONLINE");
@@ -531,17 +659,29 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
private void runQueries()
throws InterruptedException {
runQueries(TABLE_MAX_QPS, false);
- //increase the qps and some of the queries should be throttled.
- runQueries(TABLE_MAX_QPS * 2, true);
+ // increase the qps and some of the queries should be throttled.
+ // keep in mind that permits are 'regenerated' on every call based on how
much time elapsed since last one
+ // that means for 25 QPS we get new permit every 40 ms or 0.5 every 20 ms
+ // if we start with 25 permits at time t1 then if we want to exceed the
qps in the next second we've to do more
+ // double requests, because 25 will regenerate
+ runQueries(TABLE_MAX_QPS * 2 + 1, true);
+ }
+
+ private void runQueries(double qps, boolean shouldFail)
+ throws InterruptedException {
+ runQueries(qps, shouldFail, APP_NAME);
}
// try to keep the qps below 50 to ensure that the time lost between 2 query
runs on top of the sleepMillis
// is not comparable to sleepMillis, else the actual qps would end being lot
lower than required qps
- private void runQueries(double qps, boolean shouldFail)
+ private void runQueries(double qps, boolean shouldFail, String appName)
throws InterruptedException {
int failCount = 0;
long sleepMillis = (long) (1000 / qps);
for (int i = 0; i < qps; i++) {
+ if (!_queryQuotaManager.acquireApplication(appName)) {
+ failCount++;
+ }
if
(!_queryQuotaManager.acquireDatabase(CommonConstants.DEFAULT_DATABASE)) {
failCount++;
}
@@ -550,6 +690,11 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
}
Thread.sleep(sleepMillis);
}
- Assert.assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 &&
shouldFail));
+
+ if (shouldFail) {
+ Assert.assertTrue(failCount != 0, "Expected failure with qps: " + qps +
" and app :" + appName);
+ } else {
+ Assert.assertTrue(failCount == 0, "Expected no failure with qps: " + qps
+ " and app :" + appName);
+ }
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
index 0677d9dc5d..df4b1b6bf8 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java
@@ -173,6 +173,7 @@ public class BaseSingleStageBrokerRequestHandlerTest {
QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true);
+ when(queryQuotaManager.acquireApplication(anyString())).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
long[] testRequestId = {-1};
BrokerMetrics.register(mock(BrokerMetrics.class));
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/messages/ApplicationQpsQuotaRefreshMessage.java
b/pinot-common/src/main/java/org/apache/pinot/common/messages/ApplicationQpsQuotaRefreshMessage.java
new file mode 100644
index 0000000000..11768f7b37
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/messages/ApplicationQpsQuotaRefreshMessage.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import org.apache.helix.model.Message;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
+
+/**
+ * This (Helix) message is sent from the controller to brokers when a request
is received to update the application
+ * quota.
+ */
+public class ApplicationQpsQuotaRefreshMessage extends Message {
+ public static final String REFRESH_APP_QUOTA_MSG_SUB_TYPE =
"REFRESH_APPLICATION_QUOTA";
+
+ private static final String APPLICATION_NAME_KEY = "applicationName";
+
+ /**
+ * Constructor for the sender.
+ */
+ public ApplicationQpsQuotaRefreshMessage(String applicationName) {
+ super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+ setMsgSubType(REFRESH_APP_QUOTA_MSG_SUB_TYPE);
+ // Give it infinite time to process the message, as long as session is
alive
+ setExecutionTimeout(-1);
+ // Set the Pinot specific fields
+ ZNRecord znRecord = getRecord();
+ znRecord.setSimpleField(APPLICATION_NAME_KEY, applicationName);
+ }
+
+ /**
+ * Constructor for the receiver.
+ */
+ public ApplicationQpsQuotaRefreshMessage(Message message) {
+ super(message.getRecord());
+ if (!message.getMsgSubType().equals(REFRESH_APP_QUOTA_MSG_SUB_TYPE)) {
+ throw new IllegalArgumentException("Invalid message subtype:" +
message.getMsgSubType());
+ }
+ }
+
+ public String getApplicationName() {
+ return getRecord().getSimpleField(APPLICATION_NAME_KEY);
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 588b9df026..7d1143b0cb 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -63,6 +63,7 @@ public class ZKMetadataProvider {
private static final Logger LOGGER =
LoggerFactory.getLogger(ZKMetadataProvider.class);
private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY =
"tenantIsolationEnabled";
+ private static final String CLUSTER_APPLICATION_QUOTAS = "applicationQuotas";
private static final String PROPERTYSTORE_CONTROLLER_JOBS_PREFIX =
"/CONTROLLER_JOBS";
private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS";
private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
@@ -112,6 +113,15 @@ public class ZKMetadataProvider {
propertyStore.remove(constructPropertyStorePathForDatabaseConfig(databaseName),
AccessOption.PERSISTENT);
}
+ /**
+ * Remove database config.
+ */
+ @VisibleForTesting
+ public static void removeApplicationQuotas(ZkHelixPropertyStore<ZNRecord>
propertyStore) {
+
propertyStore.remove(constructPropertyStorePathForControllerConfig(CLUSTER_APPLICATION_QUOTAS),
+ AccessOption.PERSISTENT);
+ }
+
private static ZNRecord toZNRecord(DatabaseConfig databaseConfig) {
ZNRecord databaseConfigZNRecord = new
ZNRecord(databaseConfig.getDatabaseName());
Map<String, String> simpleFields = new HashMap<>();
@@ -758,4 +768,66 @@ public class ZKMetadataProvider {
return true;
}
}
+
+ public static boolean setApplicationQpsQuota(ZkHelixPropertyStore<ZNRecord>
propertyStore, String applicationName,
+ Double value) {
+ final ZNRecord znRecord;
+ final String path =
constructPropertyStorePathForControllerConfig(CLUSTER_APPLICATION_QUOTAS);
+
+ boolean doCreate;
+ if (!propertyStore.exists(path, AccessOption.PERSISTENT)) {
+ znRecord = new ZNRecord(CLUSTER_APPLICATION_QUOTAS);
+ doCreate = true;
+ } else {
+ znRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
+ doCreate = false;
+ }
+
+ Map<String, String> quotas =
znRecord.getMapField(CLUSTER_APPLICATION_QUOTAS);
+ if (quotas == null) {
+ quotas = new HashMap<>();
+ znRecord.setMapField(CLUSTER_APPLICATION_QUOTAS, quotas);
+ }
+ quotas.put(applicationName, value != null ? value.toString() : null);
+
+ if (doCreate) {
+ return propertyStore.create(path, znRecord, AccessOption.PERSISTENT);
+ } else {
+ return propertyStore.set(path, znRecord, AccessOption.PERSISTENT);
+ }
+ }
+
+ @Nullable
+ public static Map<String, Double>
getApplicationQpsQuotas(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ String controllerConfigPath =
constructPropertyStorePathForControllerConfig(CLUSTER_APPLICATION_QUOTAS);
+ if (propertyStore.exists(controllerConfigPath, AccessOption.PERSISTENT)) {
+ ZNRecord znRecord = propertyStore.get(controllerConfigPath, null,
AccessOption.PERSISTENT);
+ if (znRecord.getMapFields().containsKey(CLUSTER_APPLICATION_QUOTAS)) {
+ return
toApplicationQpsQuotas(znRecord.getMapField(CLUSTER_APPLICATION_QUOTAS));
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ private static Map<String, Double> toApplicationQpsQuotas(Map<String,
String> quotas) {
+ if (quotas == null) {
+ return new HashMap<>();
+ } else {
+ HashMap<String, Double> result = new HashMap<>();
+ for (Map.Entry<String, String> entry : quotas.entrySet()) {
+ if (entry.getValue() != null) {
+ try {
+ double value = Double.parseDouble(entry.getValue());
+ result.put(entry.getKey(), value);
+ } catch (NumberFormatException nfe) {
+ continue;
+ }
+ }
+ }
+ return result;
+ }
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 78476f603c..fea05fc8b2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -33,6 +33,7 @@ public class Constants {
private static final Logger LOGGER =
LoggerFactory.getLogger(Constants.class);
+ public static final String APPLICATION_TAG = "Application";
public static final String CLUSTER_TAG = "Cluster";
public static final String DATABASE_TAG = "Database";
public static final String TABLE_TAG = "Table";
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
new file mode 100644
index 0000000000..db050168fa
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collections;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+@Api(tags = Constants.APPLICATION_TAG, authorizations = {@Authorization(value
= SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = {
+ @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key =
+ SWAGGER_AUTHORIZATION_KEY, description =
+ "The format of the key is ```\"Basic <token>\" or \"Bearer "
+ + "<token>\"```"), @ApiKeyAuthDefinition(name =
CommonConstants.APPLICATION, in =
+ ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key =
CommonConstants.APPLICATION, description =
+ "Application context passed through http header. If no context is provided
'default' application "
+ + "context will be considered.")
+}))
+@Path("/")
+public class PinotApplicationQuotaRestletResource {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(PinotApplicationQuotaRestletResource.class);
+
+ @Inject
+ PinotHelixResourceManager _pinotHelixResourceManager;
+
+ /**
+ * API to get application quota configs. Will return null if application
quotas are not defined
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/applicationQuotas")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_APPLICATION_QUERY_QUOTA)
+ @ApiOperation(value = "Get all application qps quotas", notes = "Get all
application qps quotas")
+ public Map<String, Double> getApplicationQuotas(@Context HttpHeaders
httpHeaders) {
+ Map<String, Double> quotas =
_pinotHelixResourceManager.getApplicationQuotas();
+ if (quotas != null) {
+ return quotas;
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
+ /**
+ * API to get application quota configs. Will return null if application
quotas are not defined
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/applicationQuotas/{appName}")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_APPLICATION_QUERY_QUOTA)
+ @ApiOperation(value = "Get application qps quota", notes = "Get application
qps quota")
+ public Double getApplicationQuota(@Context HttpHeaders httpHeaders,
@PathParam("appName") String appName) {
+
+ Map<String, Double> quotas =
_pinotHelixResourceManager.getApplicationQuotas();
+ if (quotas != null && quotas.containsKey(appName)) {
+ return quotas.get(appName);
+ }
+
+ HelixConfigScope scope = new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+ _pinotHelixResourceManager.getHelixClusterName()).build();
+ HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
+ String defaultQuota =
+ helixAdmin.getConfig(scope,
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
+
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, null);
+ return defaultQuota != null ? Double.parseDouble(defaultQuota) : null;
+ }
+
+ /**
+ * API to update the quota configs for application
+ */
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Path("/applicationQuotas/{appName}")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.UPDATE_APPLICATION_QUOTA)
+ @ApiOperation(value = "Update application quota", notes = "Update
application quota")
+ public SuccessResponse setApplicationQuota(@PathParam("appName") String
appName,
+ @QueryParam("maxQueriesPerSecond") String queryQuota, @Context
HttpHeaders httpHeaders) {
+ try {
+ try {
+ Double newQuota = queryQuota != null ? Double.parseDouble(queryQuota)
: null;
+ _pinotHelixResourceManager.updateApplicationQpsQuota(appName,
newQuota);
+ } catch (NumberFormatException nfe) {
+ throw new ControllerApplicationException(LOGGER, "Application query
quota value is not a number",
+ Response.Status.INTERNAL_SERVER_ERROR, nfe);
+ }
+
+ return new SuccessResponse("Query quota for application " + appName + "
successfully updated");
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
index eecf2d0778..ad5067f595 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotDatabaseRestletResource.java
@@ -137,7 +137,7 @@ public class PinotDatabaseRestletResource {
@Path("/databases/{databaseName}/quotas")
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.UPDATE_DATABASE_QUOTA)
@ApiOperation(value = "Update database quotas", notes = "Update database
quotas")
- public SuccessResponse addTable(
+ public SuccessResponse setDatabaseQuota(
@PathParam("databaseName") String databaseName,
@QueryParam("maxQueriesPerSecond") String queryQuota,
@Context HttpHeaders httpHeaders) {
if
(!databaseName.equals(DatabaseUtils.extractDatabaseFromHttpHeaders(httpHeaders)))
{
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1b4b722a7e..7ce94f04c8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -102,6 +102,7 @@ import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
+import org.apache.pinot.common.messages.ApplicationQpsQuotaRefreshMessage;
import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
@@ -197,6 +198,7 @@ public class PinotHelixResourceManager {
private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500;
private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500;
private static final String API_REQUEST_ID_PREFIX = "api-";
+ private static final int INFINITE_TIMEOUT = -1;
private enum LineageUpdateType {
START, END, REVERT
@@ -1653,6 +1655,19 @@ public class PinotHelixResourceManager {
sendDatabaseConfigRefreshMessage(databaseConfig.getDatabaseName());
}
+ /**
+ * Updates application quota and sends out a refresh message.
+ *
+ * @param applicationName name of application to set quota for
+ * @param value quota value to set
+ */
+ public void updateApplicationQpsQuota(String applicationName, Double value) {
+ if (!ZKMetadataProvider.setApplicationQpsQuota(_propertyStore,
applicationName, value)) {
+ throw new RuntimeException("Failed to create query quota for
application: " + applicationName);
+ }
+ sendApplicationQpsQuotaRefreshMessage(applicationName);
+ }
+
/**
* Updates database config and sends out a database config refresh message.
* @param databaseConfig database config to be created
@@ -2884,6 +2899,25 @@ public class PinotHelixResourceManager {
}
}
+ private void sendApplicationQpsQuotaRefreshMessage(String appName) {
+ ApplicationQpsQuotaRefreshMessage message = new
ApplicationQpsQuotaRefreshMessage(appName);
+
+ // Send database config refresh message to brokers
+ Criteria criteria = new Criteria();
+ criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ criteria.setInstanceName("%");
+ criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
+ criteria.setSessionSpecific(true);
+
+ int numMessagesSent = _helixZkManager.getMessagingService().send(criteria,
message, null, INFINITE_TIMEOUT);
+ if (numMessagesSent > 0) {
+ LOGGER.info("Sent {} applcation qps quota refresh messages to brokers
for application: {}", numMessagesSent,
+ appName);
+ } else {
+ LOGGER.warn("No application qps quota refresh message sent to brokers
for application: {}", appName);
+ }
+ }
+
private void sendDatabaseConfigRefreshMessage(String databaseName) {
DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new
DatabaseConfigRefreshMessage(databaseName);
@@ -3162,6 +3196,16 @@ public class PinotHelixResourceManager {
return ZKMetadataProvider.getDatabaseConfig(_propertyStore, databaseName);
}
+ /**
+ * Get the database config for the given database name.
+ *
+ * @return map of application name to quotas
+ */
+ @Nullable
+ public Map<String, Double> getApplicationQuotas() {
+ return ZKMetadataProvider.getApplicationQpsQuotas(_propertyStore);
+ }
+
/**
* Get the table config for the given table name with type suffix.
*
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 51512eeeb4..d92ee5f1b4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -72,6 +72,7 @@ public class Actions {
public static final String GET_USER = "GetUser";
public static final String GET_VERSION = "GetVersion";
public static final String GET_ZNODE = "GetZnode";
+ public static final String GET_APPLICATION_QUERY_QUOTA =
"GetApplicationQueryQuota";
public static final String GET_DATABASE_QUOTA = "GetDatabaseQuota";
public static final String GET_DATABASE_QUERY_QUOTA =
"GetDatabaseQueryQuota";
public static final String INGEST_FILE = "IngestFile";
@@ -91,6 +92,7 @@ public class Actions {
public static final String UPDATE_TIME_INTERVAL = "UpdateTimeInterval";
public static final String UPDATE_USER = "UpdateUser";
public static final String UPDATE_DATABASE_QUOTA = "UpdateDatabaseQuota";
+ public static final String UPDATE_APPLICATION_QUOTA =
"UpdateApplicationQuota";
public static final String UPDATE_ZNODE = "UpdateZnode";
public static final String UPLOAD_SEGMENT = "UploadSegment";
public static final String GET_INSTANCE_PARTITIONS =
"GetInstancePartitions";
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index ffe846cf9c..cf1fce6fb0 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -292,15 +292,28 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
* Creates a new OFFLINE table config.
*/
protected TableConfig createOfflineTableConfig() {
- return new
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setTimeColumnName(getTimeColumnName())
-
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
-
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
-
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
-
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
-
.setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
-
.setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
-
.setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(getSegmentPartitionConfig())
+ // @formatter:off
+ return new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(getTableName())
+ .setTimeColumnName(getTimeColumnName())
+ .setSortedColumn(getSortedColumn())
+ .setInvertedIndexColumns(getInvertedIndexColumns())
+ .setNoDictionaryColumns(getNoDictionaryColumns())
+ .setRangeIndexColumns(getRangeIndexColumns())
+ .setBloomFilterColumns(getBloomFilterColumns())
+ .setFieldConfigList(getFieldConfigs())
+ .setNumReplicas(getNumReplicas())
+ .setSegmentVersion(getSegmentVersion())
+ .setLoadMode(getLoadMode())
+ .setTaskConfig(getTaskConfig())
+ .setBrokerTenant(getBrokerTenant())
+ .setServerTenant(getServerTenant())
+ .setIngestionConfig(getIngestionConfig())
+ .setQueryConfig(getQueryConfig())
+ .setNullHandlingEnabled(getNullHandlingEnabled())
+ .setSegmentPartitionConfig(getSegmentPartitionConfig())
.build();
+ // @formatter:on
}
/**
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index dfd9d39727..8ac736e507 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -79,14 +79,20 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
.buildTransport();
_pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl()
+ "/" + getHelixClusterName(),
_pinotClientTransport);
+
+ // create default application rate limiter manually, otherwise
verifyQuotaUpdate will fail
+ setQueryQuotaForApplication(null);
}
@AfterMethod
void resetQuotas()
throws Exception {
addQueryQuotaToClusterConfig(null);
+ addAppQueryQuotaToClusterConfig(null);
+ setQueryQuotaForApplication(null);
addQueryQuotaToDatabaseConfig(null);
addQueryQuotaToTableConfig(null);
+
_brokerHostPort = LOCAL_HOST + ":" + _brokerPorts.get(0);
verifyQuotaUpdate(0);
}
@@ -98,6 +104,13 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
testQueryRate(40);
}
+ @Test
+ public void testDefaultApplicationQueryQuota()
+ throws Exception {
+ addAppQueryQuotaToClusterConfig(50);
+ testQueryRate(50);
+ }
+
@Test
public void testDatabaseConfigQueryQuota()
throws Exception {
@@ -105,6 +118,13 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
testQueryRate(10);
}
+ @Test
+ public void testApplicationQueryQuota()
+ throws Exception {
+ setQueryQuotaForApplication(15);
+ testQueryRate(15);
+ }
+
@Test
public void testDefaultDatabaseQueryQuotaOverride()
throws Exception {
@@ -117,6 +137,18 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
testQueryRate(40);
}
+ @Test
+ public void testDefaultApplicationQueryQuotaOverride()
+ throws Exception {
+ addAppQueryQuotaToClusterConfig(25);
+ // override lower than default quota
+ setQueryQuotaForApplication(10);
+ testQueryRate(10);
+ // override higher than default quota
+ setQueryQuotaForApplication(40);
+ testQueryRate(40);
+ }
+
@Test
public void testDatabaseQueryQuotaWithTableQueryQuota()
throws Exception {
@@ -129,6 +161,18 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
testQueryRate(25);
}
+ @Test
+ public void testApplicationQueryQuotaWithTableQueryQuota()
+ throws Exception {
+ setQueryQuotaForApplication(25);
+ // table quota within database quota. Queries should fail upon table quota
(10 qps) breach
+ addQueryQuotaToTableConfig(10);
+ testQueryRate(10);
+ // table quota more than database quota. Queries should fail upon database
quota (25 qps) breach
+ addQueryQuotaToTableConfig(50);
+ testQueryRate(25);
+ }
+
@Test
public void testDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker()
throws Exception {
@@ -152,6 +196,39 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
}
}
+ @Test
+ public void
testApplicationAndDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker()
+ throws Exception {
+ BaseBrokerStarter brokerStarter = null;
+ try {
+ addAppQueryQuotaToClusterConfig(null);
+ addQueryQuotaToClusterConfig(null);
+ setQueryQuotaForApplication(50);
+ addQueryQuotaToDatabaseConfig(25);
+ addQueryQuotaToTableConfig(10);
+ //
+ // Add one more broker such that quota gets distributed equally among
them
+ brokerStarter = startOneBroker(2);
+ _brokerHostPort = LOCAL_HOST + ":" + brokerStarter.getPort();
+ // query only one broker across the divided quota
+ testQueryRateOnBroker(5);
+
+ // drop table level quota so that database quota comes into effect
+ addQueryQuotaToTableConfig(null);
+ // query only one broker across the divided quota
+ testQueryRateOnBroker(12.5f);
+
+ // drop database level quota so that app quota comes into effect
+ addQueryQuotaToDatabaseConfig(null);
+ // query only one broker across the divided quota
+ testQueryRateOnBroker(25f);
+ } finally {
+ if (brokerStarter != null) {
+ brokerStarter.stop();
+ }
+ }
+ }
+
/**
* Runs the query load with the max rate that the quota can allow and
ensures queries are not failing.
* Then runs the query load with double the max rate and expects queries to
fail due to quota breach.
@@ -181,7 +258,8 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
long sleepMillis = (long) (1000 / qps);
Thread.sleep(sleepMillis);
for (int i = 0; i < qps * 2; i++) {
- ResultSetGroup resultSetGroup = _pinotConnection.execute("SELECT
COUNT(*) FROM " + getTableName());
+ ResultSetGroup resultSetGroup =
+ _pinotConnection.execute("SET applicationName='default'; SELECT
COUNT(*) FROM " + getTableName());
for (PinotClientException exception : resultSetGroup.getExceptions()) {
if (exception.getMessage().contains("QuotaExceededError")) {
failCount++;
@@ -190,24 +268,48 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
}
Thread.sleep(sleepMillis);
}
- assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 &&
shouldFail));
+ if (shouldFail) {
+ assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps);
+ } else {
+ assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps);
+ }
}
+ private static volatile float _quota;
+ private static volatile String _quotaSource;
+
private void verifyQuotaUpdate(float quotaQps) {
- TestUtils.waitForCondition(aVoid -> {
- try {
- float tableQuota =
Float.parseFloat(sendGetRequest(String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE",
- _brokerHostPort, getTableName())));
- tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota;
- float dbQuota =
Float.parseFloat(sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default",
- _brokerHostPort)));
- dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota;
- return quotaQps == Math.min(tableQuota, dbQuota)
- || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota ==
Long.MAX_VALUE);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }, 5000, "Failed to reflect query quota on rate limiter in 5s");
+ try {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ float tableQuota = Float.parseFloat(sendGetRequest(
+ String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE",
_brokerHostPort, getTableName())));
+ tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota;
+ float dbQuota = Float.parseFloat(
+
sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default",
_brokerHostPort)));
+ float appQuota = Float.parseFloat(
+
sendGetRequest(String.format("http://%s/debug/applicationQuotas/default",
_brokerHostPort)));
+ dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota;
+ appQuota = appQuota == 0 ? Long.MAX_VALUE : appQuota;
+ float actualQuota = Math.min(Math.min(tableQuota, dbQuota),
appQuota);
+ _quota = actualQuota;
+ if (_quota == dbQuota) {
+ _quotaSource = "database";
+ } else if (_quota == tableQuota) {
+ _quotaSource = "table";
+ } else {
+ _quotaSource = "application";
+ }
+ return quotaQps == actualQuota || (quotaQps == 0 && tableQuota ==
Long.MAX_VALUE && dbQuota == Long.MAX_VALUE
+ && appQuota == Long.MAX_VALUE);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 5000, "Failed to reflect query quota on rate limiter in 5s.");
+ } catch (AssertionError ae) {
+ throw new AssertionError(
+ ae.getMessage() + " Expected quota:" + quotaQps + " but is: " +
_quota + " set on: " + _quotaSource, ae);
+ }
}
private BrokerResponse executeQueryOnBroker(String query) {
@@ -220,7 +322,8 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
long sleepMillis = (long) (1000 / qps);
Thread.sleep(sleepMillis);
for (int i = 0; i < qps * 2; i++) {
- BrokerResponse resultSetGroup = executeQueryOnBroker("SELECT COUNT(*)
FROM " + getTableName());
+ BrokerResponse resultSetGroup =
+ executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*)
FROM " + getTableName());
for (Iterator<JsonNode> it = resultSetGroup.getExceptions().elements();
it.hasNext(); ) {
JsonNode exception = it.next();
if (exception.toPrettyString().contains("QuotaExceededError")) {
@@ -230,7 +333,12 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
}
Thread.sleep(sleepMillis);
}
- assertTrue((failCount == 0 && !shouldFail) || (failCount != 0 &&
shouldFail));
+
+ if (shouldFail) {
+ assertTrue(failCount != 0, "Expected >0 failures for qps: " + qps);
+ } else {
+ assertTrue(failCount == 0, "Expected 0 failures for qps: " + qps);
+ }
}
public void addQueryQuotaToTableConfig(Integer maxQps)
@@ -251,6 +359,16 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
// to allow change propagation to QueryQuotaManager
}
+ public void setQueryQuotaForApplication(Integer maxQps)
+ throws Exception {
+ String url = _controllerRequestURLBuilder.getBaseUrl() +
"/applicationQuotas/default";
+ if (maxQps != null) {
+ url += "?maxQueriesPerSecond=" + maxQps;
+ }
+ HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new
URI(url), null, null));
+ // to allow change propagation to QueryQuotaManager
+ }
+
public void addQueryQuotaToClusterConfig(Integer maxQps)
throws Exception {
if (maxQps == null) {
@@ -264,4 +382,18 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
}
// to allow change propagation to QueryQuotaManager
}
+
+ public void addAppQueryQuotaToClusterConfig(Integer maxQps)
+ throws Exception {
+ if (maxQps == null) {
+ HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(new
URI(
+ _controllerRequestURLBuilder.forClusterConfigs() + "/"
+ + CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND)));
+ } else {
+ String payload = "{\"" +
CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND + "\":\"" + maxQps +
"\"}";
+ HttpClient.wrapAndThrowHttpException(
+ _httpClient.sendJsonPostRequest(new
URI(_controllerRequestURLBuilder.forClusterConfigs()), payload));
+ }
+ // to allow change propagation to QueryQuotaManager
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index f62efb2062..e889b5e0f7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -57,6 +57,8 @@ public class CommonConstants {
public static final String CONFIG_OF_SWAGGER_RESOURCES_PATH =
"META-INF/resources/webjars/swagger-ui/";
public static final String CONFIG_OF_TIMEZONE = "pinot.timezone";
+ public static final String APPLICATION = "application";
+
public static final String DATABASE = "database";
public static final String DEFAULT_DATABASE = "default";
public static final String CONFIG_OF_PINOT_INSECURE_MODE =
"pinot.insecure.mode";
@@ -86,6 +88,7 @@ public class CommonConstants {
public static final String QUERIES_DISABLED = "queriesDisabled";
public static final String QUERY_RATE_LIMIT_DISABLED =
"queryRateLimitDisabled";
public static final String DATABASE_MAX_QUERIES_PER_SECOND =
"databaseMaxQueriesPerSecond";
+ public static final String APPLICATION_MAX_QUERIES_PER_SECOND =
"applicationMaxQueriesPerSecond";
public static final String INSTANCE_CONNECTED_METRIC_NAME =
"helix.connected";
@@ -401,6 +404,7 @@ public class CommonConstants {
public static final String USE_MULTISTAGE_ENGINE =
"useMultistageEngine";
public static final String INFER_PARTITION_HINT = "inferPartitionHint";
public static final String ENABLE_NULL_HANDLING = "enableNullHandling";
+ public static final String APPLICATION_NAME = "applicationName";
/**
* If set, changes the explain behavior in multi-stage engine.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]