This is an automated email from the ASF dual-hosted git repository.
jtao pushed a commit to branch hotfix-empty-schema
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/hotfix-empty-schema by this
push:
new 85e4761a2d protect usage MSQE compiler for empty schema polyfill with
config param disabled by default (#15078)
85e4761a2d is described below
commit 85e4761a2da62fe7c78db797e4c8c858f7c4bede
Author: Alberto Bastos <[email protected]>
AuthorDate: Wed Feb 19 15:57:20 2025 +0100
protect usage MSQE compiler for empty schema polyfill with config param
disabled by default (#15078)
---
.../BaseSingleStageBrokerRequestHandler.java | 12 +++++++++--
.../common/utils/config/QueryOptionsUtils.java | 5 +++++
.../tests/BasePauselessRealtimeIngestionTest.java | 6 ++++++
.../tests/EmptyResponseIntegrationTest.java | 7 ++++++
...mentGenerationMinionClusterIntegrationTest.java | 8 +++++++
.../pinot/query/parser/utils/ParserUtils.java | 25 +++++++++++++---------
.../apache/pinot/spi/utils/CommonConstants.java | 8 +++++++
7 files changed, 59 insertions(+), 12 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 41824eaf05..e83fe780cc 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -146,6 +146,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
protected final int _defaultQueryLimit;
protected final Map<Long, QueryServers> _queriesById;
protected final boolean _enableMultistageMigrationMetric;
+ protected final boolean _useMSEToFillEmptyResponseSchema;
protected ExecutorService _multistageCompileExecutor;
protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;
@@ -175,6 +176,9 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
_multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000);
}
+ _useMSEToFillEmptyResponseSchema =
_config.getProperty(Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA,
+ Broker.DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA);
+
LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query
response limit: {}, "
+ "default query limit {}, query log max length: {}, query log max
rate: {}, query cancellation "
+ "enabled: {}", getClass().getSimpleName(), _brokerId,
_brokerTimeoutMs, _queryResponseLimit,
@@ -853,7 +857,9 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
// server returns STRING as default dataType for all columns in (some)
scenarios where no rows are returned
// this is an attempt to return more faithful information based on other
sources
if (brokerResponse.getNumRowsResultSet() == 0) {
- ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache,
schema, database, query);
+ boolean useMSE = QueryOptionsUtils.isUseMSEToFillEmptySchema(
+ pinotQuery.getQueryOptions(), _useMSEToFillEmptyResponseSchema);
+ ParserUtils.fillEmptyResponseSchema(useMSE, brokerResponse,
_tableCache, schema, database, query);
}
// Set total query processing time
@@ -928,7 +934,9 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
// Send empty response since we don't need to evaluate either offline or
realtime request.
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
- ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema,
database, query);
+ boolean useMSE = QueryOptionsUtils.isUseMSEToFillEmptySchema(
+ pinotQuery.getQueryOptions(), _useMSEToFillEmptyResponseSchema);
+ ParserUtils.fillEmptyResponseSchema(useMSE, brokerResponse, _tableCache,
schema, database, query);
brokerResponse.setTimeUsedMs(System.currentTimeMillis() -
requestContext.getRequestArrivalTimeMillis());
_queryLogger.log(
new QueryLogger.QueryLogParams(requestContext, tableName,
brokerResponse, requesterIdentity, null));
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5e8ba86643..3aa24cd29f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -356,6 +356,11 @@ public class QueryOptionsUtils {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD));
}
+ public static Boolean isUseMSEToFillEmptySchema(Map<String, String>
queryOptions, boolean defaultValue) {
+ String useMSEToFillEmptySchema =
queryOptions.get(QueryOptionKey.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA);
+ return useMSEToFillEmptySchema != null ?
Boolean.parseBoolean(useMSEToFillEmptySchema) : defaultValue;
+ }
+
@Nullable
private static Integer uncheckedParseInt(String optionName, @Nullable String
optionValue) {
if (optionValue == null) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
index 5d7f7caa4a..036fc50079 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
@@ -88,6 +88,12 @@ public abstract class BasePauselessRealtimeIngestionTest
extends BaseClusterInte
500);
}
+ @Override
+ protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+ super.overrideBrokerConf(brokerConf);
+
brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA,
true);
+ }
+
@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
try {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
index 5c0c3454a9..97470be599 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -82,6 +83,12 @@ public class EmptyResponseIntegrationTest extends
BaseClusterIntegrationTestSet
CompressionCodec.MV_ENTRY_DICT, null));
}
+ @Override
+ protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+ super.overrideBrokerConf(brokerConf);
+
brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA,
true);
+ }
+
@BeforeClass
public void setUp()
throws Exception {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
index bd4673f853..d9352a801c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
@@ -29,7 +29,9 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.task.AdhocTaskConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
@@ -46,6 +48,12 @@ import static org.testng.Assert.assertEquals;
public class SegmentGenerationMinionClusterIntegrationTest extends
BaseClusterIntegrationTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentGenerationMinionClusterIntegrationTest.class);
+ @Override
+ protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+ super.overrideBrokerConf(brokerConf);
+
brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA,
true);
+ }
+
@BeforeClass
public void setUp()
throws Exception {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
index ec58dd296c..eb308f45cd 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
@@ -62,27 +62,32 @@ public class ParserUtils {
* 2. Data schema has all columns set to default type (STRING) (when all
segments pruned on server).
*
* Priority is:
- * - Types from multi-stage engine validation for the given query.
+ * - Types from multi-stage engine validation for the given query (if
allowed).
* - Types from schema for the given table (only applicable to selection
fields).
* - Types from single-stage engine response (no action).
*
* Multi-stage engine schema will be available only if query compiles.
*/
- public static void fillEmptyResponseSchema(BrokerResponse response,
TableCache tableCache, Schema schema,
- String database, String query) {
+ public static void fillEmptyResponseSchema(boolean useMSE, BrokerResponse
response, TableCache tableCache,
+ Schema schema, String database, String query) {
Preconditions.checkState(response.getNumRowsResultSet() == 0, "Cannot fill
schema for non-empty response");
DataSchema dataSchema = response.getResultTable() != null ?
response.getResultTable().getDataSchema() : null;
List<RelDataTypeField> dataTypeFields = null;
- try {
- QueryEnvironment queryEnvironment = new QueryEnvironment(database,
tableCache, null);
- RelRoot node = queryEnvironment.getRelRootIfCanCompile(query);
- if (node != null && node.validatedRowType != null) {
- dataTypeFields = node.validatedRowType.getFieldList();
+ // Turn on (with pinot.broker.use.mse.to.fill.empty.response.schema=true
or query option
+ // useMSEToFillEmptyResponseSchema=true) only for clusters where no
queries with huge IN clauses are expected
+ // (see https://github.com/apache/pinot/issues/15064)
+ if (useMSE) {
+ try {
+ QueryEnvironment queryEnvironment = new QueryEnvironment(database,
tableCache, null);
+ RelRoot node = queryEnvironment.getRelRootIfCanCompile(query);
+ if (node != null && node.validatedRowType != null) {
+ dataTypeFields = node.validatedRowType.getFieldList();
+ }
+ } catch (Exception ignored) {
+ // Ignored
}
- } catch (Exception ignored) {
- // Ignored
}
if (dataSchema == null && dataTypeFields == null) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index f1695e1119..736ec34b73 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -557,6 +557,10 @@ public class CommonConstants {
public static final String GET_CURSOR = "getCursor";
// Number of rows that the cursor should contain
public static final String CURSOR_NUM_ROWS = "cursorNumRows";
+
+ // Use MSE compiler when trying to fill a response with no schema
metadata
+ // (overrides the "pinot.broker.use.mse.to.fill.empty.response.schema"
broker conf)
+ public static final String USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA =
"useMSEToFillEmptyResponseSchema";
}
public static class QueryOptionValue {
@@ -677,6 +681,10 @@ public class CommonConstants {
}
public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY =
"pinot.broker.storage.factory";
+
+ public static final String USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA =
+ "pinot.broker.use.mse.to.fill.empty.response.schema";
+ public static final boolean DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA
= false;
}
public static class Server {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]