This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d1222e7de1 Reduce direct memory OOM chances on broker with a per
server query response size budget (#11710)
d1222e7de1 is described below
commit d1222e7de147c313b2a65998326dd2c632b3275d
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Wed Oct 18 09:35:11 2023 -0700
Reduce direct memory OOM chances on broker with a per server query response
size budget (#11710)
* Add thresholds to limit query response size at server
* Address review comments
* Add table config and tests
* Add perServer and perQuery response size limits
* Address minor review comments
---
.../requesthandler/BaseBrokerRequestHandler.java | 83 ++++++++++++
.../apache/pinot/common/metrics/ServerMeter.java | 3 +-
.../common/utils/config/QueryOptionsUtils.java | 24 ++++
.../common/utils/config/TableConfigSerDeTest.java | 3 +-
.../pinot/core/query/scheduler/QueryScheduler.java | 19 +++
.../tests/BaseClusterIntegrationTest.java | 2 +-
.../tests/OfflineClusterIntegrationTest.java | 140 ++++++++++++++++++++-
.../apache/pinot/spi/config/table/QueryConfig.java | 30 ++++-
.../apache/pinot/spi/utils/CommonConstants.java | 17 +++
9 files changed, 314 insertions(+), 7 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 49ae0d9d94..25b258907e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -682,6 +682,24 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
return new BrokerResponseNative(exceptions);
}
+ // Set the maximum serialized response size per server
+ int numServers = 0;
+ if (offlineRoutingTable != null) {
+ numServers += offlineRoutingTable.size();
+ }
+ if (realtimeRoutingTable != null) {
+ numServers += realtimeRoutingTable.size();
+ }
+
+ if (offlineBrokerRequest != null) {
+ setMaxServerResponseSizeBytes(numServers,
offlineBrokerRequest.getPinotQuery().getQueryOptions(),
+ offlineTableConfig);
+ }
+ if (realtimeBrokerRequest != null) {
+ setMaxServerResponseSizeBytes(numServers,
realtimeBrokerRequest.getPinotQuery().getQueryOptions(),
+ realtimeTableConfig);
+ }
+
// Execute the query
// TODO: Replace ServerStats with ServerRoutingStatsEntry.
ServerStats serverStats = new ServerStats();
@@ -1654,6 +1672,71 @@ public abstract class BaseBrokerRequestHandler
implements BrokerRequestHandler {
return remainingTimeMs;
}
+ /**
+ * Sets a query option indicating the maximum response size that can be sent
from a server to the broker. This size
+ * is measured for the serialized response.
+ */
+ private void setMaxServerResponseSizeBytes(int numServers, Map<String,
String> queryOptions,
+ TableConfig tableConfig) {
+ if (numServers == 0) {
+ return;
+ }
+
+ // The overriding order of priority is:
+ // 1. QueryOption -> maxServerResponseSizeBytes
+ // 2. QueryOption -> maxQueryResponseSizeBytes
+ // 3. TableConfig -> maxServerResponseSizeBytes
+ // 4. TableConfig -> maxQueryResponseSizeBytes
+ // 5. BrokerConfig -> maxServerResponseSizeBytes
+ // 6. BrokerConfig -> maxServerResponseSizeBytes
+
+ // QueryOption
+ if (QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions) != null)
{
+ return;
+ }
+ Long maxQueryResponseSizeQOption =
QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions);
+ if (maxQueryResponseSizeQOption != null) {
+ Long maxServerResponseSize = maxQueryResponseSizeQOption / numServers;
+
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+ Long.toString(maxServerResponseSize));
+ return;
+ }
+
+ // TableConfig
+ Preconditions.checkState(tableConfig != null);
+ QueryConfig queryConfig = tableConfig.getQueryConfig();
+ if (queryConfig != null && queryConfig.getMaxServerResponseSizeBytes() !=
null) {
+ Long maxServerResponseSize = queryConfig.getMaxServerResponseSizeBytes();
+
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+ Long.toString(maxServerResponseSize));
+ return;
+ }
+ if (queryConfig != null && queryConfig.getMaxQueryResponseSizeBytes() !=
null) {
+ Long maxQueryResponseSize = queryConfig.getMaxQueryResponseSizeBytes();
+ Long maxServerResponseSize = maxQueryResponseSize / numServers;
+
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+ Long.toString(maxServerResponseSize));
+ return;
+ }
+
+ // BrokerConfig
+ Long maxServerResponseSizeCfg =
_config.getProperty(Broker.CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES,
+ Broker.DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES);
+ Long maxQueryResponseSizeCfg =
_config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES,
+ Broker.DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES);
+
+ if (maxServerResponseSizeCfg > 0) {
+
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+ Long.toString(maxServerResponseSizeCfg));
+ return;
+ }
+ if (maxQueryResponseSizeCfg > 0) {
+ Long maxServerResponseSize = maxQueryResponseSizeCfg / numServers;
+
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+ Long.toString(maxServerResponseSize));
+ }
+ }
+
/**
* Broker side validation on the query.
* <p>Throw exception if query does not pass validation.
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index ce2a4e0cda..e66199dcf5 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -104,7 +104,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
NUM_SEGMENTS_PRUNED_BY_LIMIT("numSegmentsPrunedByLimit", false),
NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false),
LARGE_QUERY_RESPONSES_SENT("largeResponses", false),
- TOTAL_THREAD_CPU_TIME_MILLIS("millis", false);
+ TOTAL_THREAD_CPU_TIME_MILLIS("millis", false),
+ LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false);
private final String _meterName;
private final String _unit;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 06308a5983..f2589d7f37 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -101,6 +101,30 @@ public class QueryOptionsUtils {
}
}
+ @Nullable
+ public static Long getMaxServerResponseSizeBytes(Map<String, String>
queryOptions) {
+ String responseSize =
queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);
+ if (responseSize != null) {
+ long maxSize = Long.parseLong(responseSize);
+ Preconditions.checkState(maxSize > 0, "maxServerResponseSize must be
positive. got %s", maxSize);
+ return maxSize;
+ }
+
+ return null;
+ }
+
+ @Nullable
+ public static Long getMaxQueryResponseSizeBytes(Map<String, String>
queryOptions) {
+ String responseSize =
queryOptions.get(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES);
+ if (responseSize != null) {
+ long maxSize = Long.parseLong(responseSize);
+ Preconditions.checkState(maxSize > 0, "maxQueryResponseSize must be
positive. got %s", maxSize);
+ return maxSize;
+ }
+
+ return null;
+ }
+
public static boolean isAndScanReorderingEnabled(Map<String, String>
queryOptions) {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.AND_SCAN_REORDERING));
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index eb1d31bab0..5a56d9e884 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -192,7 +192,8 @@ public class TableConfigSerDeTest {
}
{
// With query config
- QueryConfig queryConfig = new QueryConfig(1000L, true, true,
Collections.singletonMap("func(a)", "b"));
+ QueryConfig queryConfig = new QueryConfig(1000L, true, true,
Collections.singletonMap("func(a)", "b"), null,
+ null);
TableConfig tableConfig =
tableConfigBuilder.setQueryConfig(queryConfig).build();
checkQueryConfig(tableConfig);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index a0d72a9a67..693e45c171 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -168,6 +169,24 @@ public abstract class QueryScheduler {
byte[] responseBytes = serializeResponse(queryRequest, instanceResponse);
+ Map<String, String> queryOptions =
queryRequest.getQueryContext().getQueryOptions();
+ Long maxResponseSizeBytes =
QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions);
+
+ // TODO: Perform this check sooner during the serialization of DataTable.
+ if (maxResponseSizeBytes != null && responseBytes.length >
maxResponseSizeBytes) {
+ String errMsg =
+ String.format("Serialized query response size %d exceeds threshold
%d for requestId %d from broker %s",
+ responseBytes.length, maxResponseSizeBytes,
queryRequest.getRequestId(), queryRequest.getBrokerId());
+ LOGGER.error(errMsg);
+
_serverMetrics.addMeteredTableValue(queryRequest.getTableNameWithType(),
+ ServerMeter.LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS, 1);
+
+ instanceResponse = new InstanceResponseBlock();
+
instanceResponse.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
errMsg));
+ instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId));
+ responseBytes = serializeResponse(queryRequest, instanceResponse);
+ }
+
// Log the statistics
String tableNameWithType = queryRequest.getTableNameWithType();
long numDocsScanned =
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 3157174e04..4d77be325b 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -246,7 +246,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected QueryConfig getQueryConfig() {
// Enable groovy for tables used in the tests
- return new QueryConfig(null, false, null, null);
+ return new QueryConfig(null, false, null, null, null, null);
}
protected boolean getNullHandlingEnabled() {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index b129783170..1fc973f17e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -357,7 +357,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
throws Exception {
// Set timeout as 5ms so that query will timeout
TableConfig tableConfig = getOfflineTableConfig();
- tableConfig.setQueryConfig(new QueryConfig(5L, null, null, null));
+ tableConfig.setQueryConfig(new QueryConfig(5L, null, null, null, null,
null));
updateTableConfig(tableConfig);
// Wait for at most 1 minute for broker to receive and process the table
config refresh message
@@ -621,6 +621,140 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(oneHourAgoTodayStr, expectedOneHourAgoTodayStr);
}
+ @Test
+ public void testMaxServerResponseSizeQueryOption()
+ throws Exception {
+ String queryWithOption = "SET maxServerResponseSizeBytes=1000; " +
SELECT_STAR_QUERY;
+ JsonNode response = postQuery(queryWithOption);
+ assert response.get("exceptions").size() > 0;
+ int errorCode = response.get("exceptions").get(0).get("errorCode").asInt();
+ assertEquals(errorCode, 503);
+ }
+
+ @Test
+ public void testMaxQueryResponseSizeQueryOption()
+ throws Exception {
+ String queryWithOption = "SET maxQueryResponseSizeBytes=1000; " +
SELECT_STAR_QUERY;
+ JsonNode response = postQuery(queryWithOption);
+ assert response.get("exceptions").size() > 0;
+ int errorCode = response.get("exceptions").get(0).get("errorCode").asInt();
+ assertEquals(errorCode, 503);
+ }
+
+ @Test
+ public void testMaxQueryResponseSizeTableConfig() throws Exception {
+ TableConfig tableConfig = getOfflineTableConfig();
+ tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 1000L,
null));
+ updateTableConfig(tableConfig);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ // Server should return an exception
+ JsonNode response = postQuery(SELECT_STAR_QUERY);
+ assert response.get("exceptions").size() > 0;
+ int errorCode =
response.get("exceptions").get(0).get("errorCode").asInt();
+ if (errorCode == 503) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60_000L, "Failed to execute query");
+
+ tableConfig.setQueryConfig(null);
+ updateTableConfig(tableConfig);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ // Server should not return an exception
+ JsonNode response = postQuery(SELECT_STAR_QUERY);
+ if (response.get("exceptions").size() == 0) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60_000L, "Failed to execute query");
+ }
+
+ @Test
+ public void testMaxServerResponseSizeTableConfig() throws Exception {
+ TableConfig tableConfig = getOfflineTableConfig();
+ tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, null,
1000L));
+ updateTableConfig(tableConfig);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ // Server should return an exception
+ JsonNode response = postQuery(SELECT_STAR_QUERY);
+ assert response.get("exceptions").size() > 0;
+ int errorCode =
response.get("exceptions").get(0).get("errorCode").asInt();
+ if (errorCode == 503) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60_000L, "Failed to execute query");
+
+ tableConfig.setQueryConfig(null);
+ updateTableConfig(tableConfig);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ // Server should not return an exception
+ JsonNode response = postQuery(SELECT_STAR_QUERY);
+ if (response.get("exceptions").size() == 0) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60_000L, "Failed to execute query");
+ }
+
+ @Test
+ public void testMaxResponseSizeTableConfigOrdering() throws Exception {
+ TableConfig tableConfig = getOfflineTableConfig();
+ tableConfig.setQueryConfig(new QueryConfig(null, false, null, null,
1000000L, 1000L));
+ updateTableConfig(tableConfig);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ // Server should return an exception
+ JsonNode response = postQuery(SELECT_STAR_QUERY);
+ assert response.get("exceptions").size() > 0;
+ int errorCode =
response.get("exceptions").get(0).get("errorCode").asInt();
+ if (errorCode == 503) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60_000L, "Failed to execute query");
+
+ tableConfig.setQueryConfig(null);
+ updateTableConfig(tableConfig);
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ // Server should not return an exception
+ JsonNode response = postQuery(SELECT_STAR_QUERY);
+ if (response.get("exceptions").size() == 0) {
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60_000L, "Failed to execute query");
+ }
+
@Test(dataProvider = "useBothQueryEngines")
public void testRegexpReplace(boolean useMultiStageQueryEngine)
throws Exception {
@@ -1366,7 +1500,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
String groovyQuery = "SELECT
GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', "
+ "'arg0 + arg1', FlightNum, Origin) FROM mytable";
TableConfig tableConfig = getOfflineTableConfig();
- tableConfig.setQueryConfig(new QueryConfig(null, false, null, null));
+ tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, null,
null));
updateTableConfig(tableConfig);
TestUtils.waitForCondition(aVoid -> {
@@ -1610,7 +1744,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// Add expression override
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQueryConfig(new QueryConfig(null, null, null,
- Collections.singletonMap("DaysSinceEpoch * 24",
"NewAddedDerivedHoursSinceEpoch")));
+ Collections.singletonMap("DaysSinceEpoch * 24",
"NewAddedDerivedHoursSinceEpoch"), null, null));
updateTableConfig(tableConfig);
TestUtils.waitForCondition(aVoid -> {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
index 01c3993fe2..b2d6c28bea 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
@@ -49,16 +49,32 @@ public class QueryConfig extends BaseJsonConfig {
// the expressions within the query to the desired ones (e.g. override
transform function to derived column).
private final Map<String, String> _expressionOverrideMap;
+ // Indicates the maximum length of serialized response across all servers
for a query. This value is equally
+ // divided across all servers processing the query.
+ private final Long _maxQueryResponseSizeBytes;
+
+ // Indicates the maximum length of the serialized response per server for a
query.
+ private final Long _maxServerResponseSizeBytes;
+
@JsonCreator
public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs,
@JsonProperty("disableGroovy") @Nullable Boolean disableGroovy,
@JsonProperty("useApproximateFunction") @Nullable Boolean
useApproximateFunction,
- @JsonProperty("expressionOverrideMap") @Nullable Map<String, String>
expressionOverrideMap) {
+ @JsonProperty("expressionOverrideMap") @Nullable Map<String, String>
expressionOverrideMap,
+ @JsonProperty("maxQueryResponseSizeBytes") @Nullable Long
maxQueryResponseSizeBytes,
+ @JsonProperty("maxServerResponseSizeBytes") @Nullable Long
maxServerResponseSizeBytes) {
Preconditions.checkArgument(timeoutMs == null || timeoutMs > 0, "Invalid
'timeoutMs': %s", timeoutMs);
+ Preconditions.checkArgument(maxQueryResponseSizeBytes == null ||
maxQueryResponseSizeBytes > 0,
+ "Invalid 'maxQueryResponseSizeBytes': %s", maxQueryResponseSizeBytes);
+ Preconditions.checkArgument(maxServerResponseSizeBytes == null ||
maxServerResponseSizeBytes > 0,
+ "Invalid 'maxServerResponseSizeBytes': %s",
maxServerResponseSizeBytes);
+
_timeoutMs = timeoutMs;
_disableGroovy = disableGroovy;
_useApproximateFunction = useApproximateFunction;
_expressionOverrideMap = expressionOverrideMap;
+ _maxQueryResponseSizeBytes = maxQueryResponseSizeBytes;
+ _maxServerResponseSizeBytes = maxServerResponseSizeBytes;
}
@Nullable
@@ -84,4 +100,16 @@ public class QueryConfig extends BaseJsonConfig {
public Map<String, String> getExpressionOverrideMap() {
return _expressionOverrideMap;
}
+
+ @Nullable
+ @JsonProperty("maxQueryResponseSizeBytes")
+ public Long getMaxQueryResponseSizeBytes() {
+ return _maxQueryResponseSizeBytes;
+ }
+
+ @Nullable
+ @JsonProperty("maxServerResponseSizeBytes")
+ public Long getMaxServerResponseSizeBytes() {
+ return _maxServerResponseSizeBytes;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 80c1816702..303ad6c716 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -318,6 +318,16 @@ public class CommonConstants {
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER =
false;
+ // Broker config indicating the maximum serialized response size across
all servers for a query. This value is
+ // equally divided across all servers processing the query.
+ public static final String CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES =
"pinot.broker.max.query.response.size.bytes";
+ public static final long DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES =
Long.MAX_VALUE;
+
+ // Broker config indicating the maximum length of the serialized response
per server for a query.
+ public static final String CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES =
"pinot.broker.max.server.response.size.bytes";
+ public static final long DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES =
Long.MAX_VALUE;
+
+
public static class Request {
public static final String SQL = "sql";
public static final String TRACE = "trace";
@@ -362,6 +372,13 @@ public class CommonConstants {
public static final String MAX_ROWS_IN_JOIN = "maxRowsInJoin";
public static final String JOIN_OVERFLOW_MODE = "joinOverflowMode";
+ // Indicates the maximum length of the serialized response per server
for a query.
+ public static final String MAX_SERVER_RESPONSE_SIZE_BYTES =
"maxServerResponseSizeBytes";
+
+ // Indicates the maximum length of serialized response across all
servers for a query. This value is equally
+ // divided across all servers processing the query.
+ public static final String MAX_QUERY_RESPONSE_SIZE_BYTES =
"maxQueryResponseSizeBytes";
+
// TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
@Deprecated
public static final String PRESERVE_TYPE = "preserveType";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]