This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d1222e7de1 Reduce direct memory OOM chances on broker with a per 
server query response size budget (#11710)
d1222e7de1 is described below

commit d1222e7de147c313b2a65998326dd2c632b3275d
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Wed Oct 18 09:35:11 2023 -0700

    Reduce direct memory OOM chances on broker with a per server query response 
size budget (#11710)
    
    * Add thresholds to limit query response size at server
    
    * Address review comments
    
    * Add table config and tests
    
    * Add perServer and perQuery response size limits
    
    * Address minor review comments
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  83 ++++++++++++
 .../apache/pinot/common/metrics/ServerMeter.java   |   3 +-
 .../common/utils/config/QueryOptionsUtils.java     |  24 ++++
 .../common/utils/config/TableConfigSerDeTest.java  |   3 +-
 .../pinot/core/query/scheduler/QueryScheduler.java |  19 +++
 .../tests/BaseClusterIntegrationTest.java          |   2 +-
 .../tests/OfflineClusterIntegrationTest.java       | 140 ++++++++++++++++++++-
 .../apache/pinot/spi/config/table/QueryConfig.java |  30 ++++-
 .../apache/pinot/spi/utils/CommonConstants.java    |  17 +++
 9 files changed, 314 insertions(+), 7 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 49ae0d9d94..25b258907e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -682,6 +682,24 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
         return new BrokerResponseNative(exceptions);
       }
 
+      // Set the maximum serialized response size per server
+      int numServers = 0;
+      if (offlineRoutingTable != null) {
+        numServers += offlineRoutingTable.size();
+      }
+      if (realtimeRoutingTable != null) {
+        numServers += realtimeRoutingTable.size();
+      }
+
+      if (offlineBrokerRequest != null) {
+        setMaxServerResponseSizeBytes(numServers, 
offlineBrokerRequest.getPinotQuery().getQueryOptions(),
+            offlineTableConfig);
+      }
+      if (realtimeBrokerRequest != null) {
+        setMaxServerResponseSizeBytes(numServers, 
realtimeBrokerRequest.getPinotQuery().getQueryOptions(),
+            realtimeTableConfig);
+      }
+
       // Execute the query
       // TODO: Replace ServerStats with ServerRoutingStatsEntry.
       ServerStats serverStats = new ServerStats();
@@ -1654,6 +1672,71 @@ public abstract class BaseBrokerRequestHandler 
implements BrokerRequestHandler {
     return remainingTimeMs;
   }
 
+  /**
+   * Sets a query option indicating the maximum response size that can be sent 
from a server to the broker. This size
+   * is measured for the serialized response.
+   */
+  private void setMaxServerResponseSizeBytes(int numServers, Map<String, 
String> queryOptions,
+      TableConfig tableConfig) {
+    if (numServers == 0) {
+      return;
+    }
+
+    // The overriding order of priority is:
+    // 1. QueryOption  -> maxServerResponseSizeBytes
+    // 2. QueryOption  -> maxQueryResponseSizeBytes
+    // 3. TableConfig  -> maxServerResponseSizeBytes
+    // 4. TableConfig  -> maxQueryResponseSizeBytes
+    // 5. BrokerConfig -> maxServerResponseSizeBytes
+    // 6. BrokerConfig -> maxServerResponseSizeBytes
+
+    // QueryOption
+    if (QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions) != null) 
{
+      return;
+    }
+    Long maxQueryResponseSizeQOption = 
QueryOptionsUtils.getMaxQueryResponseSizeBytes(queryOptions);
+    if (maxQueryResponseSizeQOption != null) {
+      Long maxServerResponseSize = maxQueryResponseSizeQOption / numServers;
+      
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+          Long.toString(maxServerResponseSize));
+      return;
+    }
+
+    // TableConfig
+    Preconditions.checkState(tableConfig != null);
+    QueryConfig queryConfig = tableConfig.getQueryConfig();
+    if (queryConfig != null && queryConfig.getMaxServerResponseSizeBytes() != 
null) {
+      Long maxServerResponseSize = queryConfig.getMaxServerResponseSizeBytes();
+      
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+          Long.toString(maxServerResponseSize));
+      return;
+    }
+    if (queryConfig != null && queryConfig.getMaxQueryResponseSizeBytes() != 
null) {
+      Long maxQueryResponseSize = queryConfig.getMaxQueryResponseSizeBytes();
+      Long maxServerResponseSize = maxQueryResponseSize / numServers;
+      
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+          Long.toString(maxServerResponseSize));
+      return;
+    }
+
+    // BrokerConfig
+    Long maxServerResponseSizeCfg = 
_config.getProperty(Broker.CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES,
+        Broker.DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES);
+    Long maxQueryResponseSizeCfg = 
_config.getProperty(Broker.CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES,
+        Broker.DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES);
+
+    if (maxServerResponseSizeCfg > 0) {
+      
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+          Long.toString(maxServerResponseSizeCfg));
+      return;
+    }
+    if (maxQueryResponseSizeCfg > 0) {
+      Long maxServerResponseSize = maxQueryResponseSizeCfg / numServers;
+      
queryOptions.put(Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES,
+          Long.toString(maxServerResponseSize));
+    }
+  }
+
   /**
    * Broker side validation on the query.
    * <p>Throw exception if query does not pass validation.
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index ce2a4e0cda..e66199dcf5 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -104,7 +104,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   NUM_SEGMENTS_PRUNED_BY_LIMIT("numSegmentsPrunedByLimit", false),
   NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false),
   LARGE_QUERY_RESPONSES_SENT("largeResponses", false),
-  TOTAL_THREAD_CPU_TIME_MILLIS("millis", false);
+  TOTAL_THREAD_CPU_TIME_MILLIS("millis", false),
+  LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false);
 
   private final String _meterName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 06308a5983..f2589d7f37 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -101,6 +101,30 @@ public class QueryOptionsUtils {
     }
   }
 
+  @Nullable
+  public static Long getMaxServerResponseSizeBytes(Map<String, String> 
queryOptions) {
+    String responseSize = 
queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);
+    if (responseSize != null) {
+      long maxSize = Long.parseLong(responseSize);
+      Preconditions.checkState(maxSize > 0, "maxServerResponseSize must be 
positive. got %s", maxSize);
+      return maxSize;
+    }
+
+    return null;
+  }
+
+  @Nullable
+  public static Long getMaxQueryResponseSizeBytes(Map<String, String> 
queryOptions) {
+    String responseSize = 
queryOptions.get(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES);
+    if (responseSize != null) {
+      long maxSize = Long.parseLong(responseSize);
+      Preconditions.checkState(maxSize > 0, "maxQueryResponseSize must be 
positive. got %s", maxSize);
+      return maxSize;
+    }
+
+    return null;
+  }
+
   public static boolean isAndScanReorderingEnabled(Map<String, String> 
queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.AND_SCAN_REORDERING));
   }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index eb1d31bab0..5a56d9e884 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -192,7 +192,8 @@ public class TableConfigSerDeTest {
     }
     {
       // With query config
-      QueryConfig queryConfig = new QueryConfig(1000L, true, true, 
Collections.singletonMap("func(a)", "b"));
+      QueryConfig queryConfig = new QueryConfig(1000L, true, true, 
Collections.singletonMap("func(a)", "b"), null,
+          null);
       TableConfig tableConfig = 
tableConfigBuilder.setQueryConfig(queryConfig).build();
 
       checkQueryConfig(tableConfig);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index a0d72a9a67..693e45c171 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -168,6 +169,24 @@ public abstract class QueryScheduler {
 
       byte[] responseBytes = serializeResponse(queryRequest, instanceResponse);
 
+      Map<String, String> queryOptions = 
queryRequest.getQueryContext().getQueryOptions();
+      Long maxResponseSizeBytes = 
QueryOptionsUtils.getMaxServerResponseSizeBytes(queryOptions);
+
+      // TODO: Perform this check sooner during the serialization of DataTable.
+      if (maxResponseSizeBytes != null && responseBytes.length > 
maxResponseSizeBytes) {
+        String errMsg =
+            String.format("Serialized query response size %d exceeds threshold 
%d for requestId %d from broker %s",
+                responseBytes.length, maxResponseSizeBytes, 
queryRequest.getRequestId(), queryRequest.getBrokerId());
+        LOGGER.error(errMsg);
+        
_serverMetrics.addMeteredTableValue(queryRequest.getTableNameWithType(),
+            ServerMeter.LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS, 1);
+
+        instanceResponse = new InstanceResponseBlock();
+        
instanceResponse.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
 errMsg));
+        instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
+        responseBytes = serializeResponse(queryRequest, instanceResponse);
+      }
+
       // Log the statistics
       String tableNameWithType = queryRequest.getTableNameWithType();
       long numDocsScanned =
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 3157174e04..4d77be325b 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -246,7 +246,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
 
   protected QueryConfig getQueryConfig() {
     // Enable groovy for tables used in the tests
-    return new QueryConfig(null, false, null, null);
+    return new QueryConfig(null, false, null, null, null, null);
   }
 
   protected boolean getNullHandlingEnabled() {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index b129783170..1fc973f17e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -357,7 +357,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       throws Exception {
     // Set timeout as 5ms so that query will timeout
     TableConfig tableConfig = getOfflineTableConfig();
-    tableConfig.setQueryConfig(new QueryConfig(5L, null, null, null));
+    tableConfig.setQueryConfig(new QueryConfig(5L, null, null, null, null, 
null));
     updateTableConfig(tableConfig);
 
     // Wait for at most 1 minute for broker to receive and process the table 
config refresh message
@@ -621,6 +621,140 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(oneHourAgoTodayStr, expectedOneHourAgoTodayStr);
   }
 
+  @Test
+  public void testMaxServerResponseSizeQueryOption()
+      throws Exception {
+    String queryWithOption = "SET maxServerResponseSizeBytes=1000; " + 
SELECT_STAR_QUERY;
+    JsonNode response = postQuery(queryWithOption);
+    assert response.get("exceptions").size() > 0;
+    int errorCode = response.get("exceptions").get(0).get("errorCode").asInt();
+    assertEquals(errorCode, 503);
+  }
+
+  @Test
+  public void testMaxQueryResponseSizeQueryOption()
+      throws Exception {
+    String queryWithOption = "SET maxQueryResponseSizeBytes=1000; " + 
SELECT_STAR_QUERY;
+    JsonNode response = postQuery(queryWithOption);
+    assert response.get("exceptions").size() > 0;
+    int errorCode = response.get("exceptions").get(0).get("errorCode").asInt();
+    assertEquals(errorCode, 503);
+  }
+
+  @Test
+  public void testMaxQueryResponseSizeTableConfig() throws Exception {
+    TableConfig tableConfig = getOfflineTableConfig();
+    tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 1000L, 
null));
+    updateTableConfig(tableConfig);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        // Server should return an exception
+        JsonNode response = postQuery(SELECT_STAR_QUERY);
+        assert response.get("exceptions").size() > 0;
+        int errorCode = 
response.get("exceptions").get(0).get("errorCode").asInt();
+        if (errorCode == 503) {
+          return true;
+        }
+        return false;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60_000L, "Failed to execute query");
+
+    tableConfig.setQueryConfig(null);
+    updateTableConfig(tableConfig);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        // Server should not return an exception
+        JsonNode response = postQuery(SELECT_STAR_QUERY);
+        if (response.get("exceptions").size() == 0) {
+          return true;
+        }
+        return false;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60_000L, "Failed to execute query");
+  }
+
+  @Test
+  public void testMaxServerResponseSizeTableConfig() throws Exception {
+    TableConfig tableConfig = getOfflineTableConfig();
+    tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, null, 
1000L));
+    updateTableConfig(tableConfig);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        // Server should return an exception
+        JsonNode response = postQuery(SELECT_STAR_QUERY);
+        assert response.get("exceptions").size() > 0;
+        int errorCode = 
response.get("exceptions").get(0).get("errorCode").asInt();
+        if (errorCode == 503) {
+          return true;
+        }
+        return false;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60_000L, "Failed to execute query");
+
+    tableConfig.setQueryConfig(null);
+    updateTableConfig(tableConfig);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        // Server should not return an exception
+        JsonNode response = postQuery(SELECT_STAR_QUERY);
+        if (response.get("exceptions").size() == 0) {
+          return true;
+        }
+        return false;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60_000L, "Failed to execute query");
+  }
+
+  @Test
+  public void testMaxResponseSizeTableConfigOrdering() throws Exception {
+    TableConfig tableConfig = getOfflineTableConfig();
+    tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 
1000000L, 1000L));
+    updateTableConfig(tableConfig);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        // Server should return an exception
+        JsonNode response = postQuery(SELECT_STAR_QUERY);
+        assert response.get("exceptions").size() > 0;
+        int errorCode = 
response.get("exceptions").get(0).get("errorCode").asInt();
+        if (errorCode == 503) {
+          return true;
+        }
+        return false;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60_000L, "Failed to execute query");
+
+    tableConfig.setQueryConfig(null);
+    updateTableConfig(tableConfig);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        // Server should not return an exception
+        JsonNode response = postQuery(SELECT_STAR_QUERY);
+        if (response.get("exceptions").size() == 0) {
+          return true;
+        }
+        return false;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60_000L, "Failed to execute query");
+  }
+
   @Test(dataProvider = "useBothQueryEngines")
   public void testRegexpReplace(boolean useMultiStageQueryEngine)
       throws Exception {
@@ -1366,7 +1500,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     String groovyQuery = "SELECT 
GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', "
         + "'arg0 + arg1', FlightNum, Origin) FROM mytable";
     TableConfig tableConfig = getOfflineTableConfig();
-    tableConfig.setQueryConfig(new QueryConfig(null, false, null, null));
+    tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, null, 
null));
     updateTableConfig(tableConfig);
 
     TestUtils.waitForCondition(aVoid -> {
@@ -1610,7 +1744,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // Add expression override
     TableConfig tableConfig = getOfflineTableConfig();
     tableConfig.setQueryConfig(new QueryConfig(null, null, null,
-        Collections.singletonMap("DaysSinceEpoch * 24", 
"NewAddedDerivedHoursSinceEpoch")));
+        Collections.singletonMap("DaysSinceEpoch * 24", 
"NewAddedDerivedHoursSinceEpoch"), null, null));
     updateTableConfig(tableConfig);
 
     TestUtils.waitForCondition(aVoid -> {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
index 01c3993fe2..b2d6c28bea 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/QueryConfig.java
@@ -49,16 +49,32 @@ public class QueryConfig extends BaseJsonConfig {
   // the expressions within the query to the desired ones (e.g. override 
transform function to derived column).
   private final Map<String, String> _expressionOverrideMap;
 
+  // Indicates the maximum length of serialized response across all servers 
for a query. This value is equally
+  // divided across all servers processing the query.
+  private final Long _maxQueryResponseSizeBytes;
+
+  // Indicates the maximum length of the serialized response per server for a 
query.
+  private final Long _maxServerResponseSizeBytes;
+
   @JsonCreator
   public QueryConfig(@JsonProperty("timeoutMs") @Nullable Long timeoutMs,
       @JsonProperty("disableGroovy") @Nullable Boolean disableGroovy,
       @JsonProperty("useApproximateFunction") @Nullable Boolean 
useApproximateFunction,
-      @JsonProperty("expressionOverrideMap") @Nullable Map<String, String> 
expressionOverrideMap) {
+      @JsonProperty("expressionOverrideMap") @Nullable Map<String, String> 
expressionOverrideMap,
+      @JsonProperty("maxQueryResponseSizeBytes") @Nullable Long 
maxQueryResponseSizeBytes,
+      @JsonProperty("maxServerResponseSizeBytes") @Nullable Long 
maxServerResponseSizeBytes) {
     Preconditions.checkArgument(timeoutMs == null || timeoutMs > 0, "Invalid 
'timeoutMs': %s", timeoutMs);
+    Preconditions.checkArgument(maxQueryResponseSizeBytes == null || 
maxQueryResponseSizeBytes > 0,
+        "Invalid 'maxQueryResponseSizeBytes': %s", maxQueryResponseSizeBytes);
+    Preconditions.checkArgument(maxServerResponseSizeBytes == null || 
maxServerResponseSizeBytes > 0,
+        "Invalid 'maxServerResponseSizeBytes': %s", 
maxServerResponseSizeBytes);
+
     _timeoutMs = timeoutMs;
     _disableGroovy = disableGroovy;
     _useApproximateFunction = useApproximateFunction;
     _expressionOverrideMap = expressionOverrideMap;
+    _maxQueryResponseSizeBytes = maxQueryResponseSizeBytes;
+    _maxServerResponseSizeBytes = maxServerResponseSizeBytes;
   }
 
   @Nullable
@@ -84,4 +100,16 @@ public class QueryConfig extends BaseJsonConfig {
   public Map<String, String> getExpressionOverrideMap() {
     return _expressionOverrideMap;
   }
+
+  @Nullable
+  @JsonProperty("maxQueryResponseSizeBytes")
+  public Long getMaxQueryResponseSizeBytes() {
+    return _maxQueryResponseSizeBytes;
+  }
+
+  @Nullable
+  @JsonProperty("maxServerResponseSizeBytes")
+  public Long getMaxServerResponseSizeBytes() {
+    return _maxServerResponseSizeBytes;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 80c1816702..303ad6c716 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -318,6 +318,16 @@ public class CommonConstants {
         "pinot.broker.enable.partition.metadata.manager";
     public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = 
false;
 
+    // Broker config indicating the maximum serialized response size across 
all servers for a query. This value is
+    // equally divided across all servers processing the query.
+    public static final String CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES = 
"pinot.broker.max.query.response.size.bytes";
+    public static final long DEFAULT_MAX_QUERY_RESPONSE_SIZE_BYTES = 
Long.MAX_VALUE;
+
+    // Broker config indicating the maximum length of the serialized response 
per server for a query.
+    public static final String CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES = 
"pinot.broker.max.server.response.size.bytes";
+    public static final long DEFAULT_MAX_SERVER_RESPONSE_SIZE_BYTES = 
Long.MAX_VALUE;
+
+
     public static class Request {
       public static final String SQL = "sql";
       public static final String TRACE = "trace";
@@ -362,6 +372,13 @@ public class CommonConstants {
         public static final String MAX_ROWS_IN_JOIN = "maxRowsInJoin";
         public static final String JOIN_OVERFLOW_MODE = "joinOverflowMode";
 
+        // Indicates the maximum length of the serialized response per server 
for a query.
+        public static final String MAX_SERVER_RESPONSE_SIZE_BYTES = 
"maxServerResponseSizeBytes";
+
+        // Indicates the maximum length of serialized response across all 
servers for a query. This value is equally
+        // divided across all servers processing the query.
+        public static final String MAX_QUERY_RESPONSE_SIZE_BYTES = 
"maxQueryResponseSizeBytes";
+
         // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
         @Deprecated
         public static final String PRESERVE_TYPE = "preserveType";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to