This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e0d0aeb3d4 [multistage] Make join operator more resilient (#11401) e0d0aeb3d4 is described below commit e0d0aeb3d4d47e4e63589f7f96e194403f839075 Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Tue Aug 22 16:57:59 2023 -0700 [multistage] Make join operator more resilient (#11401) * Support hash join right table protection * early terminate right table operator --- .../pinot/common/exception/QueryException.java | 4 + .../common/utils/config/QueryOptionsUtils.java | 12 ++- .../apache/calcite/rel/hint/PinotHintOptions.java | 10 ++ .../planner/logical/RelToPlanNodeConverter.java | 2 +- .../query/planner/plannode/AbstractPlanNode.java | 1 + .../pinot/query/planner/plannode/JoinNode.java | 10 +- .../apache/pinot/query/runtime/QueryRunner.java | 37 +++++++ .../query/runtime/operator/HashJoinOperator.java | 109 +++++++++++++++++-- .../apache/pinot/query/service/QueryConfig.java | 6 ++ .../runtime/operator/HashJoinOperatorTest.java | 116 ++++++++++++++++++--- .../plan/pipeline/PipelineBreakerExecutorTest.java | 11 +- .../apache/pinot/spi/utils/CommonConstants.java | 17 +-- 12 files changed, 297 insertions(+), 38 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index a6a1a2bfa4..f5aa77b83b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -61,6 +61,7 @@ public class QueryException { public static final int SERVER_TABLE_MISSING_ERROR_CODE = 230; public static final int SERVER_SEGMENT_MISSING_ERROR_CODE = 235; public static final int QUERY_SCHEDULING_TIMEOUT_ERROR_CODE = 240; + public static final int SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE = 245; public static final int EXECUTION_TIMEOUT_ERROR_CODE = 250; public static final int DATA_TABLE_SERIALIZATION_ERROR_CODE = 260; public static final int BROKER_GATHER_ERROR_CODE = 300; @@ -105,6 +106,8 @@ public class QueryException { new ProcessingException(SERVER_SEGMENT_MISSING_ERROR_CODE); public static final ProcessingException QUERY_SCHEDULING_TIMEOUT_ERROR = new ProcessingException(QUERY_SCHEDULING_TIMEOUT_ERROR_CODE); + public static final ProcessingException SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR = + new ProcessingException(SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE); public static final ProcessingException EXECUTION_TIMEOUT_ERROR = new ProcessingException(EXECUTION_TIMEOUT_ERROR_CODE); public static final ProcessingException DATA_TABLE_SERIALIZATION_ERROR = @@ -147,6 +150,7 @@ public class QueryException { SERVER_TABLE_MISSING_ERROR.setMessage("ServerTableMissing"); SERVER_SEGMENT_MISSING_ERROR.setMessage("ServerSegmentMissing"); QUERY_SCHEDULING_TIMEOUT_ERROR.setMessage("QuerySchedulingTimeoutError"); + SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR.setMessage("ServerResourceLimitExceededError"); EXECUTION_TIMEOUT_ERROR.setMessage("ExecutionTimeoutError"); DATA_TABLE_DESERIALIZATION_ERROR.setMessage("DataTableSerializationError"); BROKER_GATHER_ERROR.setMessage("BrokerGatherError"); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 5b239fde33..36e6baf5bc 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -36,7 +36,6 @@ public class QueryOptionsUtils { private QueryOptionsUtils() { } - private static final Map<String, String> CONFIG_RESOLVER; private static final RuntimeException CLASS_LOAD_ERROR; @@ -189,4 +188,15 @@ public class QueryOptionsUtils { public static boolean shouldDropResults(Map<String, String> queryOptions) { return Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.DROP_RESULTS)); } + + @Nullable + public static Integer getMaxRowsInJoin(Map<String, String> queryOptions) { + String maxRowsInJoin = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_JOIN); + return maxRowsInJoin != null ? Integer.parseInt(maxRowsInJoin) : null; + } + + @Nullable + public static String getJoinOverflowMode(Map<String, String> queryOptions) { + return queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE); + } } diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java index 9ffeaf8f8c..7abe9a6f06 100644 --- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java +++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java @@ -65,6 +65,16 @@ public class PinotHintOptions { public static class JoinHintOptions { public static final String JOIN_STRATEGY = "join_strategy"; + /** + * Max rows allowed to build the right table hash collection. + */ + public static final String MAX_ROWS_IN_JOIN = "max_rows_in_join"; + /** + * Mode when join overflow happens, supported values: THROW or BREAK. + * THROW(default): Break right table build process, and throw exception, no JOIN with left table performed. + * BREAK: Break right table build process, continue to perform JOIN operation, results might be partial. + */ + public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode"; } public static class TableHintOptions { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index b0b7545677..4ff7014b49 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -186,7 +186,7 @@ public final class RelToPlanNodeConverter { List<RexExpression> joinClause = joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList()); return new JoinNode(currentStageId, toDataSchema(node.getRowType()), toDataSchema(node.getLeft().getRowType()), - toDataSchema(node.getRight().getRowType()), joinType, joinKeys, joinClause); + toDataSchema(node.getRight().getRowType()), joinType, joinKeys, joinClause, node.getHints()); } private static DataSchema toDataSchema(RelDataType rowType) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java index c4349097e4..f3ca4e705e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java @@ -89,6 +89,7 @@ public abstract class AbstractPlanNode implements PlanNode, ProtoSerializable { public static class NodeHint { @ProtoProperties public Map<String, Map<String, String>> _hintOptions; + public NodeHint() { } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java index 6d089c6239..fe55facf01 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java @@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.plannode; import java.util.Arrays; import java.util.List; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.hint.RelHint; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; @@ -36,6 +37,8 @@ public class JoinNode extends AbstractPlanNode { @ProtoProperties private List<RexExpression> _joinClause; @ProtoProperties + private NodeHint _joinHints; + @ProtoProperties private List<String> _leftColumnNames; @ProtoProperties private List<String> _rightColumnNames; @@ -45,13 +48,14 @@ public class JoinNode extends AbstractPlanNode { } public JoinNode(int planFragmentId, DataSchema dataSchema, DataSchema leftSchema, DataSchema rightSchema, - JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> joinClause) { + JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> joinClause, List<RelHint> joinHints) { super(planFragmentId, dataSchema); _leftColumnNames = Arrays.asList(leftSchema.getColumnNames()); _rightColumnNames = Arrays.asList(rightSchema.getColumnNames()); _joinRelType = joinRelType; _joinKeys = joinKeys; _joinClause = joinClause; + _joinHints = new NodeHint(joinHints); } public JoinRelType getJoinRelType() { @@ -66,6 +70,10 @@ public class JoinNode extends AbstractPlanNode { return _joinClause; } + public NodeHint getJoinHints() { + return _joinHints; + } + public List<String> getLeftColumnNames() { return _leftColumnNames; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 1b33c8bab3..9e109431df 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -25,11 +25,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl; @@ -78,6 +80,12 @@ public class QueryRunner { private OpChainSchedulerService _scheduler; + // Join Overflow configs + @Nullable + private Integer _maxRowsInJoin; + @Nullable + private String _joinOverflowMode; + /** * Initializes the query executor. * <p>Should be called only once and before calling any other method. @@ -89,6 +97,11 @@ public class QueryRunner { CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName; _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT); _helixManager = helixManager; + // Set Join Overflow configs + _joinOverflowMode = config.getProperty(QueryConfig.KEY_OF_JOIN_OVERFLOW_MODE); + _maxRowsInJoin = config.containsKey(QueryConfig.KEY_OF_MAX_ROWS_IN_JOIN) ? Integer.parseInt( + config.getProperty(QueryConfig.KEY_OF_MAX_ROWS_IN_JOIN)) : null; + try { //TODO: make this configurable _opChainExecutor = ExecutorServiceUtils.create(config, "pinot.query.runner.opchain", @@ -133,6 +146,9 @@ public class QueryRunner { PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, deadlineMs, requestId, isTraceEnabled); + // Set Join Overflow configs to StageMetadata from request + setJoinOverflowConfigs(distributedStagePlan, requestMetadataMap); + // run OpChain if (DistributedStagePlan.isLeafStage(distributedStagePlan)) { try { @@ -157,6 +173,27 @@ public class QueryRunner { } } + private void setJoinOverflowConfigs(DistributedStagePlan distributedStagePlan, + Map<String, String> requestMetadataMap) { + String joinOverflowMode = QueryOptionsUtils.getJoinOverflowMode(requestMetadataMap); + if (joinOverflowMode != null) { + distributedStagePlan.getStageMetadata().getCustomProperties() + .put(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE, joinOverflowMode); + } else if (_joinOverflowMode != null) { + distributedStagePlan.getStageMetadata().getCustomProperties() + .put(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE, _joinOverflowMode); + } + + Integer maxRowsInJoin = QueryOptionsUtils.getMaxRowsInJoin(requestMetadataMap); + if (maxRowsInJoin != null) { + distributedStagePlan.getStageMetadata().getCustomProperties() + .put(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN, String.valueOf(maxRowsInJoin)); + } else if (_maxRowsInJoin != null) { + distributedStagePlan.getStageMetadata().getCustomProperties() + .put(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN, String.valueOf(_maxRowsInJoin)); + } + } + public void cancel(long requestId) { _scheduler.cancel(requestId); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index 7f68116586..c12a67ecf1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -29,17 +29,23 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.common.datablock.DataBlock; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.partitioning.KeySelector; +import org.apache.pinot.query.planner.plannode.AbstractPlanNode; import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.operands.TransformOperand; import org.apache.pinot.query.runtime.operator.utils.TypeUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.runtime.plan.StageMetadata; +import org.apache.pinot.spi.utils.CommonConstants; /** @@ -55,9 +61,12 @@ import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; * The output is in the format of [left_row, right_row] */ // TODO: Move inequi out of hashjoin. (https://github.com/apache/pinot/issues/9728) +// TODO: Support memory size based resource limit. public class HashJoinOperator extends MultiStageOperator { private static final String EXPLAIN_NAME = "HASH_JOIN"; private static final int INITIAL_HEURISTIC_SIZE = 16; + private static final int DEFAULT_MAX_ROWS_IN_JOIN = 1024 * 1024; // 2^20, around 1MM rows + private static final JoinOverFlowMode DEFAULT_JOIN_OVERFLOW_MODE = JoinOverFlowMode.THROW; private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = ImmutableSet.of( JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT, JoinRelType.FULL, JoinRelType.SEMI, JoinRelType.ANTI); @@ -83,14 +92,32 @@ public class HashJoinOperator extends MultiStageOperator { // TODO: Remove this special handling by fixing data block EOS abstraction or operator's invariant. private boolean _isTerminated; private TransferableBlock _upstreamErrorBlock; - private KeySelector<Object[], Object[]> _leftKeySelector; - private KeySelector<Object[], Object[]> _rightKeySelector; + private final KeySelector<Object[], Object[]> _leftKeySelector; + private final KeySelector<Object[], Object[]> _rightKeySelector; + + // Below are specific parameters to protect the hash table from growing too large. + // Once the hash table reaches the limit, we will throw exception or break the right table build process. + /** + * Max rows allowed to build the right table hash collection. + */ + private final int _maxRowsInHashTable; + /** + * Mode when join overflow happens, supported values: THROW or BREAK. + * THROW(default): Break right table build process, and throw exception, no JOIN with left table performed. + * BREAK: Break right table build process, continue to perform JOIN operation, results might be partial. + */ + private final JoinOverFlowMode _joinOverflowMode; + + private int _currentRowsInHashTable = 0; + private ProcessingException _resourceLimitExceededException = null; public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator leftTableOperator, MultiStageOperator rightTableOperator, DataSchema leftSchema, JoinNode node) { super(context); Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()), "Join type: " + node.getJoinRelType() + " is not supported!"); + _maxRowsInHashTable = getMaxRowInJoin(context.getStageMetadata(), node.getJoinHints()); + _joinOverflowMode = getJoinOverflowMode(context.getStageMetadata(), node.getJoinHints()); _joinType = node.getJoinRelType(); _leftKeySelector = node.getJoinKeys().getLeftJoinKeySelector(); _rightKeySelector = node.getJoinKeys().getRightJoinKeySelector(); @@ -119,6 +146,38 @@ public class HashJoinOperator extends MultiStageOperator { _upstreamErrorBlock = null; } + private JoinOverFlowMode getJoinOverflowMode(StageMetadata stageMetadata, AbstractPlanNode.NodeHint joinHints) { + if (joinHints != null && joinHints._hintOptions != null && joinHints._hintOptions + .containsKey(PinotHintOptions.JOIN_HINT_OPTIONS) && joinHints._hintOptions.get( + PinotHintOptions.JOIN_HINT_OPTIONS) + .containsKey(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE)) { + return JoinOverFlowMode.valueOf(joinHints._hintOptions.get(PinotHintOptions.JOIN_HINT_OPTIONS) + .get(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE)); + } + if (stageMetadata != null && stageMetadata.getCustomProperties() != null && stageMetadata.getCustomProperties() + .containsKey(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE)) { + return JoinOverFlowMode.valueOf( + stageMetadata.getCustomProperties().get(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE)); + } + return DEFAULT_JOIN_OVERFLOW_MODE; + } + + private int getMaxRowInJoin(StageMetadata stageMetadata, AbstractPlanNode.NodeHint joinHints) { + if (joinHints != null && joinHints._hintOptions != null && joinHints._hintOptions + .containsKey(PinotHintOptions.JOIN_HINT_OPTIONS) && joinHints._hintOptions.get( + PinotHintOptions.JOIN_HINT_OPTIONS) + .containsKey(PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN)) { + return Integer.parseInt(joinHints._hintOptions.get(PinotHintOptions.JOIN_HINT_OPTIONS) + .get(PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN)); + } + if (stageMetadata != null && stageMetadata.getCustomProperties() != null && stageMetadata.getCustomProperties() + .containsKey(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN)) { + return Integer.parseInt( + stageMetadata.getCustomProperties().get(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN)); + } + return DEFAULT_MAX_ROWS_IN_JOIN; + } + // TODO: Separate left and right table operator. @Override public List<MultiStageOperator> getChildOperators() { @@ -135,7 +194,7 @@ public class HashJoinOperator extends MultiStageOperator { protected TransferableBlock getNextBlock() { try { if (_isTerminated) { - return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + return setPartialResultExceptionToBlock(TransferableBlockUtils.getEndOfStreamTransferableBlock()); } if (!_isHashTableBuilt) { // Build JOIN hash table @@ -146,26 +205,48 @@ public class HashJoinOperator extends MultiStageOperator { } TransferableBlock leftBlock = _leftTableOperator.nextBlock(); // JOIN each left block with the right block. - return buildJoinedDataBlock(leftBlock); + return setPartialResultExceptionToBlock(buildJoinedDataBlock(leftBlock)); } catch (Exception e) { return TransferableBlockUtils.getErrorTransferableBlock(e); } } - private void buildBroadcastHashTable() { + private void buildBroadcastHashTable() + throws ProcessingException { TransferableBlock rightBlock = _rightTableOperator.nextBlock(); while (!TransferableBlockUtils.isEndOfStream(rightBlock)) { List<Object[]> container = rightBlock.getContainer(); + // Row based overflow check. + if (container.size() + _currentRowsInHashTable > _maxRowsInHashTable) { + _resourceLimitExceededException = + new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE); + _resourceLimitExceededException.setMessage( + "Cannot build in memory hash table for join operator, reach number of rows limit: " + + _maxRowsInHashTable); + if (_joinOverflowMode == JoinOverFlowMode.THROW) { + throw _resourceLimitExceededException; + } else { + // Just fill up the buffer. + int remainingRows = _maxRowsInHashTable - _currentRowsInHashTable; + container = container.subList(0, remainingRows); + } + } // put all the rows into corresponding hash collections keyed by the key selector function. for (Object[] row : container) { ArrayList<Object[]> hashCollection = _broadcastRightTable.computeIfAbsent( new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>(INITIAL_HEURISTIC_SIZE)); int size = hashCollection.size(); - if ((size & size - 1) == 0 && size < Integer.MAX_VALUE / 2) { // is power of 2 - hashCollection.ensureCapacity(size << 1); + if ((size & size - 1) == 0 && size < _maxRowsInHashTable && size < Integer.MAX_VALUE / 2) { // is power of 2 + hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInHashTable)); } hashCollection.add(row); } + _currentRowsInHashTable += container.size(); + if (_currentRowsInHashTable == _maxRowsInHashTable) { + // Early terminate right table operator. + _rightTableOperator.close(); + break; + } rightBlock = _rightTableOperator.nextBlock(); } if (rightBlock.isErrorBlock()) { @@ -175,8 +256,7 @@ public class HashJoinOperator extends MultiStageOperator { } } - private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock) - throws Exception { + private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock) { if (leftBlock.isErrorBlock()) { _upstreamErrorBlock = leftBlock; return _upstreamErrorBlock; @@ -228,6 +308,13 @@ public class HashJoinOperator extends MultiStageOperator { return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW); } + private TransferableBlock setPartialResultExceptionToBlock(TransferableBlock block) { + if (_resourceLimitExceededException != null) { + block.getDataBlock().addException(_resourceLimitExceededException); + } + return block; + } + private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) { List<Object[]> container = leftBlock.getContainer(); List<Object[]> rows = new ArrayList<>(container.size()); @@ -321,4 +408,8 @@ public class HashJoinOperator extends MultiStageOperator { private boolean needUnmatchedLeftRows() { return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL; } + + enum JoinOverFlowMode { + THROW, BREAK + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java index ac18ad9138..925f544ee6 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java @@ -58,4 +58,10 @@ public class QueryConfig { */ public static final String KEY_OF_SERVER_RESPONSE_STATUS_ERROR = "ERROR"; public static final String KEY_OF_SERVER_RESPONSE_STATUS_OK = "OK"; + + /** + * Configuration for join overflow. + */ + public static final String KEY_OF_JOIN_OVERFLOW_MODE = "pinot.query.join.overflow.mode"; + public static final String KEY_OF_MAX_ROWS_IN_JOIN = "pinot.query.join.max.rows"; } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java index cc4661ca21..730ccf9f67 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java @@ -19,10 +19,15 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.hint.PinotHintOptions; +import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.sql.SqlKind; import org.apache.pinot.common.datablock.MetadataBlock; import org.apache.pinot.common.exception.QueryException; @@ -73,6 +78,12 @@ public class HashJoinOperatorTest { return new JoinNode.JoinKeys(leftSelect, rightSelect); } + private static List<RelHint> getJoinHints(Map<String, String> hintsMap) { + RelHint.Builder relHintBuilder = RelHint.builder(PinotHintOptions.JOIN_HINT_OPTIONS); + hintsMap.forEach(relHintBuilder::hintOption); + return ImmutableList.of(relHintBuilder.build()); + } + @Test public void shouldHandleHashJoinKeyCollisionInnerJoin() { DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ @@ -94,7 +105,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, - getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); + getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, Collections.emptyList()); HashJoinOperator joinOnString = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); @@ -129,7 +140,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, - getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, Collections.emptyList()); HashJoinOperator joinOnInt = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = joinOnInt.nextBlock(); @@ -161,7 +172,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, - getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses); + getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, Collections.emptyList()); HashJoinOperator joinOnInt = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = joinOnInt.nextBlock(); @@ -200,7 +211,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.LEFT, - getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); + getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); @@ -232,7 +243,7 @@ public class HashJoinOperatorTest { }); List<RexExpression> joinClauses = new ArrayList<>(); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, - getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); @@ -261,7 +272,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.LEFT, - getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); @@ -294,7 +305,7 @@ public class HashJoinOperatorTest { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, - getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); @@ -330,7 +341,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, - getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses); + getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); @@ -366,7 +377,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, - getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses); + getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); @@ -398,7 +409,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.RIGHT, - getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, Collections.emptyList()); HashJoinOperator joinOnNum = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = joinOnNum.nextBlock(); @@ -439,7 +450,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.SEMI, - getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); + getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); @@ -473,7 +484,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.FULL, - getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); @@ -517,7 +528,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.ANTI, - getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); + getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); @@ -550,7 +561,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, - getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); @@ -581,7 +592,7 @@ public class HashJoinOperatorTest { DataSchema.ColumnDataType.STRING }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, - getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, Collections.emptyList()); HashJoinOperator join = new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); @@ -590,5 +601,80 @@ public class HashJoinOperatorTest { Assert.assertTrue(result.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE) .contains("testInnerJoinLeftError")); } + + @Test + public void shouldPropagateJoinLimitError() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + Map<String, String> hintsMap = ImmutableMap.of( + PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "THROW", + PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "1" + ); + JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, getJoinHints(hintsMap)); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); + + TransferableBlock result = join.nextBlock(); + Assert.assertTrue(result.isErrorBlock()); + Assert.assertTrue( + result.getDataBlock().getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE) + .contains("reach number of rows limit")); + } + + @Test + public void shouldHandleJoinWithPartialResultsWhenHitDataRowsLimit() { + DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING + }); + Mockito.when(_leftOperator.nextBlock()) + .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + Mockito.when(_rightOperator.nextBlock()).thenReturn( + OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + List<RexExpression> joinClauses = new ArrayList<>(); + DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, + new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, + DataSchema.ColumnDataType.STRING + }); + Map<String, String> hintsMap = ImmutableMap.of( + PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "BREAK", + PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "1" + ); + JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, + getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, getJoinHints(hintsMap)); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); + + TransferableBlock result = join.nextBlock(); + Assert.assertFalse(result.isErrorBlock()); + Assert.assertEquals(result.getNumRows(), 1); + Assert.assertTrue( + result.getDataBlock().getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE) + .contains("reach number of rows limit")); + } } // TODO: Add more inequi join tests. diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java index 5c8a132954..7f33dc45a4 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.plan.pipeline; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; @@ -91,7 +92,6 @@ public class PipelineBreakerExecutorTest { ImmutableList.of(_server), ImmutableMap.of())) .build()).collect(Collectors.toList())).build(); - @AfterClass public void tearDownClass() { ExecutorServiceUtils.close(_executor); @@ -152,7 +152,8 @@ public class PipelineBreakerExecutorTest { MailboxReceiveNode mailboxReceiveNode2 = new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, null, null, false, false, null); - JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null); + JoinNode joinNode = + new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, Collections.emptyList()); joinNode.addInput(mailboxReceiveNode1); joinNode.addInput(mailboxReceiveNode2); DistributedStagePlan distributedStagePlan = @@ -247,7 +248,8 @@ public class PipelineBreakerExecutorTest { MailboxReceiveNode incorrectlyConfiguredMailboxNode = new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, null, null, false, false, null); - JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null); + JoinNode joinNode = + new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, Collections.emptyList()); joinNode.addInput(mailboxReceiveNode1); joinNode.addInput(incorrectlyConfiguredMailboxNode); DistributedStagePlan distributedStagePlan = @@ -285,7 +287,8 @@ public class PipelineBreakerExecutorTest { MailboxReceiveNode incorrectlyConfiguredMailboxNode = new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, null, null, false, false, null); - JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null); + JoinNode joinNode = + new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, Collections.emptyList()); joinNode.addInput(mailboxReceiveNode1); joinNode.addInput(incorrectlyConfiguredMailboxNode); DistributedStagePlan distributedStagePlan = diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 775ff1fc58..7371510ee5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -96,7 +96,6 @@ public class CommonConstants { // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536; - public static final int DEFAULT_TUPLE_SKETCH_LGK = 16; // Whether to rewrite DistinctCount to DistinctCountBitmap @@ -250,19 +249,19 @@ public class CommonConstants { // By default, Jersey uses the default unbounded thread pool to process queries. // By enabling it, BrokerManagedAsyncExecutorProvider will be used to create a bounded thread pool. public static final String CONFIG_OF_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR = - "pinot.broker.enable.bounded.jersey.threadpool.executor"; + "pinot.broker.enable.bounded.jersey.threadpool.executor"; public static final boolean DEFAULT_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR = false; // Default capacities for the bounded thread pool public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE = - "pinot.broker.jersey.threadpool.executor.max.pool.size"; + "pinot.broker.jersey.threadpool.executor.max.pool.size"; public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE = - Runtime.getRuntime().availableProcessors() * 2; + Runtime.getRuntime().availableProcessors() * 2; public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE = - "pinot.broker.jersey.threadpool.executor.core.pool.size"; + "pinot.broker.jersey.threadpool.executor.core.pool.size"; public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE = - Runtime.getRuntime().availableProcessors() * 2; + Runtime.getRuntime().availableProcessors() * 2; public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE = - "pinot.broker.jersey.threadpool.executor.queue.size"; + "pinot.broker.jersey.threadpool.executor.queue.size"; public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE = Integer.MAX_VALUE; // used for SQL GROUP BY during broker reduce @@ -350,6 +349,10 @@ public class CommonConstants { public static final String DROP_RESULTS = "dropResults"; + // Handle JOIN Overflow + public static final String MAX_ROWS_IN_JOIN = "maxRowsInJoin"; + public static final String JOIN_OVERFLOW_MODE = "joinOverflowMode"; + // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0 @Deprecated public static final String PRESERVE_TYPE = "preserveType"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org