This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e0d0aeb3d4 [multistage] Make join operator more resilient (#11401)
e0d0aeb3d4 is described below

commit e0d0aeb3d4d47e4e63589f7f96e194403f839075
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Tue Aug 22 16:57:59 2023 -0700

    [multistage] Make join operator more resilient (#11401)
    
    * Support hash join right table protection
    
    * early terminate right table operator
---
 .../pinot/common/exception/QueryException.java     |   4 +
 .../common/utils/config/QueryOptionsUtils.java     |  12 ++-
 .../apache/calcite/rel/hint/PinotHintOptions.java  |  10 ++
 .../planner/logical/RelToPlanNodeConverter.java    |   2 +-
 .../query/planner/plannode/AbstractPlanNode.java   |   1 +
 .../pinot/query/planner/plannode/JoinNode.java     |  10 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |  37 +++++++
 .../query/runtime/operator/HashJoinOperator.java   | 109 +++++++++++++++++--
 .../apache/pinot/query/service/QueryConfig.java    |   6 ++
 .../runtime/operator/HashJoinOperatorTest.java     | 116 ++++++++++++++++++---
 .../plan/pipeline/PipelineBreakerExecutorTest.java |  11 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  17 +--
 12 files changed, 297 insertions(+), 38 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index a6a1a2bfa4..f5aa77b83b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -61,6 +61,7 @@ public class QueryException {
   public static final int SERVER_TABLE_MISSING_ERROR_CODE = 230;
   public static final int SERVER_SEGMENT_MISSING_ERROR_CODE = 235;
   public static final int QUERY_SCHEDULING_TIMEOUT_ERROR_CODE = 240;
+  public static final int SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE = 245;
   public static final int EXECUTION_TIMEOUT_ERROR_CODE = 250;
   public static final int DATA_TABLE_SERIALIZATION_ERROR_CODE = 260;
   public static final int BROKER_GATHER_ERROR_CODE = 300;
@@ -105,6 +106,8 @@ public class QueryException {
       new ProcessingException(SERVER_SEGMENT_MISSING_ERROR_CODE);
   public static final ProcessingException QUERY_SCHEDULING_TIMEOUT_ERROR =
       new ProcessingException(QUERY_SCHEDULING_TIMEOUT_ERROR_CODE);
+  public static final ProcessingException SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR 
=
+      new ProcessingException(SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
   public static final ProcessingException EXECUTION_TIMEOUT_ERROR =
       new ProcessingException(EXECUTION_TIMEOUT_ERROR_CODE);
   public static final ProcessingException DATA_TABLE_SERIALIZATION_ERROR =
@@ -147,6 +150,7 @@ public class QueryException {
     SERVER_TABLE_MISSING_ERROR.setMessage("ServerTableMissing");
     SERVER_SEGMENT_MISSING_ERROR.setMessage("ServerSegmentMissing");
     QUERY_SCHEDULING_TIMEOUT_ERROR.setMessage("QuerySchedulingTimeoutError");
+    
SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR.setMessage("ServerResourceLimitExceededError");
     EXECUTION_TIMEOUT_ERROR.setMessage("ExecutionTimeoutError");
     DATA_TABLE_DESERIALIZATION_ERROR.setMessage("DataTableSerializationError");
     BROKER_GATHER_ERROR.setMessage("BrokerGatherError");
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5b239fde33..36e6baf5bc 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -36,7 +36,6 @@ public class QueryOptionsUtils {
   private QueryOptionsUtils() {
   }
 
-
   private static final Map<String, String> CONFIG_RESOLVER;
   private static final RuntimeException CLASS_LOAD_ERROR;
 
@@ -189,4 +188,15 @@ public class QueryOptionsUtils {
   public static boolean shouldDropResults(Map<String, String> queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.DROP_RESULTS));
   }
+
+  @Nullable
+  public static Integer getMaxRowsInJoin(Map<String, String> queryOptions) {
+    String maxRowsInJoin = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_JOIN);
+    return maxRowsInJoin != null ? Integer.parseInt(maxRowsInJoin) : null;
+  }
+
+  @Nullable
+  public static String getJoinOverflowMode(Map<String, String> queryOptions) {
+    return queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE);
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
index 9ffeaf8f8c..7abe9a6f06 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
@@ -65,6 +65,16 @@ public class PinotHintOptions {
 
   public static class JoinHintOptions {
     public static final String JOIN_STRATEGY = "join_strategy";
+    /**
+     * Max rows allowed to build the right table hash collection.
+     */
+    public static final String MAX_ROWS_IN_JOIN = "max_rows_in_join";
+    /**
+     * Mode when join overflow happens, supported values: THROW or BREAK.
+     *   THROW(default): Break right table build process, and throw exception, 
no JOIN with left table performed.
+     *   BREAK: Break right table build process, continue to perform JOIN 
operation, results might be partial.
+     */
+    public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode";
   }
 
   public static class TableHintOptions {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index b0b7545677..4ff7014b49 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -186,7 +186,7 @@ public final class RelToPlanNodeConverter {
     List<RexExpression> joinClause =
         
joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
     return new JoinNode(currentStageId, toDataSchema(node.getRowType()), 
toDataSchema(node.getLeft().getRowType()),
-        toDataSchema(node.getRight().getRowType()), joinType, joinKeys, 
joinClause);
+        toDataSchema(node.getRight().getRowType()), joinType, joinKeys, 
joinClause, node.getHints());
   }
 
   private static DataSchema toDataSchema(RelDataType rowType) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
index c4349097e4..f3ca4e705e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
@@ -89,6 +89,7 @@ public abstract class AbstractPlanNode implements PlanNode, 
ProtoSerializable {
   public static class NodeHint {
     @ProtoProperties
     public Map<String, Map<String, String>> _hintOptions;
+
     public NodeHint() {
     }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
index 6d089c6239..fe55facf01 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.plannode;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.RelHint;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
@@ -36,6 +37,8 @@ public class JoinNode extends AbstractPlanNode {
   @ProtoProperties
   private List<RexExpression> _joinClause;
   @ProtoProperties
+  private NodeHint _joinHints;
+  @ProtoProperties
   private List<String> _leftColumnNames;
   @ProtoProperties
   private List<String> _rightColumnNames;
@@ -45,13 +48,14 @@ public class JoinNode extends AbstractPlanNode {
   }
 
   public JoinNode(int planFragmentId, DataSchema dataSchema, DataSchema 
leftSchema, DataSchema rightSchema,
-      JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> 
joinClause) {
+      JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> 
joinClause, List<RelHint> joinHints) {
     super(planFragmentId, dataSchema);
     _leftColumnNames = Arrays.asList(leftSchema.getColumnNames());
     _rightColumnNames = Arrays.asList(rightSchema.getColumnNames());
     _joinRelType = joinRelType;
     _joinKeys = joinKeys;
     _joinClause = joinClause;
+    _joinHints = new NodeHint(joinHints);
   }
 
   public JoinRelType getJoinRelType() {
@@ -66,6 +70,10 @@ public class JoinNode extends AbstractPlanNode {
     return _joinClause;
   }
 
+  public NodeHint getJoinHints() {
+    return _joinHints;
+  }
+
   public List<String> getLeftColumnNames() {
     return _leftColumnNames;
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 1b33c8bab3..9e109431df 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -25,11 +25,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
@@ -78,6 +80,12 @@ public class QueryRunner {
 
   private OpChainSchedulerService _scheduler;
 
+  // Join Overflow configs
+  @Nullable
+  private Integer _maxRowsInJoin;
+  @Nullable
+  private String _joinOverflowMode;
+
   /**
    * Initializes the query executor.
    * <p>Should be called only once and before calling any other method.
@@ -89,6 +97,11 @@ public class QueryRunner {
         CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
     _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
     _helixManager = helixManager;
+    // Set Join Overflow configs
+    _joinOverflowMode = 
config.getProperty(QueryConfig.KEY_OF_JOIN_OVERFLOW_MODE);
+    _maxRowsInJoin = config.containsKey(QueryConfig.KEY_OF_MAX_ROWS_IN_JOIN) ? 
Integer.parseInt(
+        config.getProperty(QueryConfig.KEY_OF_MAX_ROWS_IN_JOIN)) : null;
+
     try {
       //TODO: make this configurable
       _opChainExecutor = ExecutorServiceUtils.create(config, 
"pinot.query.runner.opchain",
@@ -133,6 +146,9 @@ public class QueryRunner {
     PipelineBreakerResult pipelineBreakerResult = 
PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
         _mailboxService, distributedStagePlan, deadlineMs, requestId, 
isTraceEnabled);
 
+    // Set Join Overflow configs to StageMetadata from request
+    setJoinOverflowConfigs(distributedStagePlan, requestMetadataMap);
+
     // run OpChain
     if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
       try {
@@ -157,6 +173,27 @@ public class QueryRunner {
     }
   }
 
+  private void setJoinOverflowConfigs(DistributedStagePlan 
distributedStagePlan,
+      Map<String, String> requestMetadataMap) {
+    String joinOverflowMode = 
QueryOptionsUtils.getJoinOverflowMode(requestMetadataMap);
+    if (joinOverflowMode != null) {
+      distributedStagePlan.getStageMetadata().getCustomProperties()
+          
.put(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE, 
joinOverflowMode);
+    } else if (_joinOverflowMode != null) {
+      distributedStagePlan.getStageMetadata().getCustomProperties()
+          
.put(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE, 
_joinOverflowMode);
+    }
+
+    Integer maxRowsInJoin = 
QueryOptionsUtils.getMaxRowsInJoin(requestMetadataMap);
+    if (maxRowsInJoin != null) {
+      distributedStagePlan.getStageMetadata().getCustomProperties()
+          .put(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN, 
String.valueOf(maxRowsInJoin));
+    } else if (_maxRowsInJoin != null) {
+      distributedStagePlan.getStageMetadata().getCustomProperties()
+          .put(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN, 
String.valueOf(_maxRowsInJoin));
+    }
+  }
+
   public void cancel(long requestId) {
     _scheduler.cancel(requestId);
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 7f68116586..c12a67ecf1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -29,17 +29,23 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
 import org.apache.pinot.query.runtime.operator.utils.TypeUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
 
 
 /**
@@ -55,9 +61,12 @@ import 
org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
  * The output is in the format of [left_row, right_row]
  */
 // TODO: Move inequi out of hashjoin. 
(https://github.com/apache/pinot/issues/9728)
+// TODO: Support memory size based resource limit.
 public class HashJoinOperator extends MultiStageOperator {
   private static final String EXPLAIN_NAME = "HASH_JOIN";
   private static final int INITIAL_HEURISTIC_SIZE = 16;
+  private static final int DEFAULT_MAX_ROWS_IN_JOIN = 1024 * 1024; // 2^20, 
around 1MM rows
+  private static final JoinOverFlowMode DEFAULT_JOIN_OVERFLOW_MODE = 
JoinOverFlowMode.THROW;
 
   private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = ImmutableSet.of(
       JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT, 
JoinRelType.FULL, JoinRelType.SEMI, JoinRelType.ANTI);
@@ -83,14 +92,32 @@ public class HashJoinOperator extends MultiStageOperator {
   // TODO: Remove this special handling by fixing data block EOS abstraction 
or operator's invariant.
   private boolean _isTerminated;
   private TransferableBlock _upstreamErrorBlock;
-  private KeySelector<Object[], Object[]> _leftKeySelector;
-  private KeySelector<Object[], Object[]> _rightKeySelector;
+  private final KeySelector<Object[], Object[]> _leftKeySelector;
+  private final KeySelector<Object[], Object[]> _rightKeySelector;
+
+  // Below are specific parameters to protect the hash table from growing too 
large.
+  // Once the hash table reaches the limit, we will throw exception or break 
the right table build process.
+  /**
+   * Max rows allowed to build the right table hash collection.
+   */
+  private final int _maxRowsInHashTable;
+  /**
+   * Mode when join overflow happens, supported values: THROW or BREAK.
+   *   THROW(default): Break right table build process, and throw exception, 
no JOIN with left table performed.
+   *   BREAK: Break right table build process, continue to perform JOIN 
operation, results might be partial.
+   */
+  private final JoinOverFlowMode _joinOverflowMode;
+
+  private int _currentRowsInHashTable = 0;
+  private ProcessingException _resourceLimitExceededException = null;
 
   public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator 
leftTableOperator,
       MultiStageOperator rightTableOperator, DataSchema leftSchema, JoinNode 
node) {
     super(context);
     
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()),
         "Join type: " + node.getJoinRelType() + " is not supported!");
+    _maxRowsInHashTable = getMaxRowInJoin(context.getStageMetadata(), 
node.getJoinHints());
+    _joinOverflowMode = getJoinOverflowMode(context.getStageMetadata(), 
node.getJoinHints());
     _joinType = node.getJoinRelType();
     _leftKeySelector = node.getJoinKeys().getLeftJoinKeySelector();
     _rightKeySelector = node.getJoinKeys().getRightJoinKeySelector();
@@ -119,6 +146,38 @@ public class HashJoinOperator extends MultiStageOperator {
     _upstreamErrorBlock = null;
   }
 
+  private JoinOverFlowMode getJoinOverflowMode(StageMetadata stageMetadata, 
AbstractPlanNode.NodeHint joinHints) {
+    if (joinHints != null && joinHints._hintOptions != null && 
joinHints._hintOptions
+        .containsKey(PinotHintOptions.JOIN_HINT_OPTIONS) && 
joinHints._hintOptions.get(
+            PinotHintOptions.JOIN_HINT_OPTIONS)
+        .containsKey(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE)) {
+      return 
JoinOverFlowMode.valueOf(joinHints._hintOptions.get(PinotHintOptions.JOIN_HINT_OPTIONS)
+          .get(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE));
+    }
+    if (stageMetadata != null && stageMetadata.getCustomProperties() != null 
&& stageMetadata.getCustomProperties()
+        
.containsKey(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE)) 
{
+      return JoinOverFlowMode.valueOf(
+          
stageMetadata.getCustomProperties().get(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE));
+    }
+    return DEFAULT_JOIN_OVERFLOW_MODE;
+  }
+
+  private int getMaxRowInJoin(StageMetadata stageMetadata, 
AbstractPlanNode.NodeHint joinHints) {
+    if (joinHints != null && joinHints._hintOptions != null && 
joinHints._hintOptions
+        .containsKey(PinotHintOptions.JOIN_HINT_OPTIONS) && 
joinHints._hintOptions.get(
+            PinotHintOptions.JOIN_HINT_OPTIONS)
+        .containsKey(PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN)) {
+      return 
Integer.parseInt(joinHints._hintOptions.get(PinotHintOptions.JOIN_HINT_OPTIONS)
+          .get(PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN));
+    }
+    if (stageMetadata != null && stageMetadata.getCustomProperties() != null 
&& stageMetadata.getCustomProperties()
+        
.containsKey(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN)) {
+      return Integer.parseInt(
+          
stageMetadata.getCustomProperties().get(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN));
+    }
+    return DEFAULT_MAX_ROWS_IN_JOIN;
+  }
+
   // TODO: Separate left and right table operator.
   @Override
   public List<MultiStageOperator> getChildOperators() {
@@ -135,7 +194,7 @@ public class HashJoinOperator extends MultiStageOperator {
   protected TransferableBlock getNextBlock() {
     try {
       if (_isTerminated) {
-        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+        return 
setPartialResultExceptionToBlock(TransferableBlockUtils.getEndOfStreamTransferableBlock());
       }
       if (!_isHashTableBuilt) {
         // Build JOIN hash table
@@ -146,26 +205,48 @@ public class HashJoinOperator extends MultiStageOperator {
       }
       TransferableBlock leftBlock = _leftTableOperator.nextBlock();
       // JOIN each left block with the right block.
-      return buildJoinedDataBlock(leftBlock);
+      return setPartialResultExceptionToBlock(buildJoinedDataBlock(leftBlock));
     } catch (Exception e) {
       return TransferableBlockUtils.getErrorTransferableBlock(e);
     }
   }
 
-  private void buildBroadcastHashTable() {
+  private void buildBroadcastHashTable()
+      throws ProcessingException {
     TransferableBlock rightBlock = _rightTableOperator.nextBlock();
     while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
       List<Object[]> container = rightBlock.getContainer();
+      // Row based overflow check.
+      if (container.size() + _currentRowsInHashTable > _maxRowsInHashTable) {
+        _resourceLimitExceededException =
+            new 
ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
+        _resourceLimitExceededException.setMessage(
+            "Cannot build in memory hash table for join operator, reach number 
of rows limit: "
+                + _maxRowsInHashTable);
+        if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+          throw _resourceLimitExceededException;
+        } else {
+          // Just fill up the buffer.
+          int remainingRows = _maxRowsInHashTable - _currentRowsInHashTable;
+          container = container.subList(0, remainingRows);
+        }
+      }
       // put all the rows into corresponding hash collections keyed by the key 
selector function.
       for (Object[] row : container) {
         ArrayList<Object[]> hashCollection = 
_broadcastRightTable.computeIfAbsent(
             new Key(_rightKeySelector.getKey(row)), k -> new 
ArrayList<>(INITIAL_HEURISTIC_SIZE));
         int size = hashCollection.size();
-        if ((size & size - 1) == 0 && size < Integer.MAX_VALUE / 2) { // is 
power of 2
-          hashCollection.ensureCapacity(size << 1);
+        if ((size & size - 1) == 0 && size < _maxRowsInHashTable && size < 
Integer.MAX_VALUE / 2) { // is power of 2
+          hashCollection.ensureCapacity(Math.min(size << 1, 
_maxRowsInHashTable));
         }
         hashCollection.add(row);
       }
+      _currentRowsInHashTable += container.size();
+      if (_currentRowsInHashTable == _maxRowsInHashTable) {
+        // Early terminate right table operator.
+        _rightTableOperator.close();
+        break;
+      }
       rightBlock = _rightTableOperator.nextBlock();
     }
     if (rightBlock.isErrorBlock()) {
@@ -175,8 +256,7 @@ public class HashJoinOperator extends MultiStageOperator {
     }
   }
 
-  private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock)
-      throws Exception {
+  private TransferableBlock buildJoinedDataBlock(TransferableBlock leftBlock) {
     if (leftBlock.isErrorBlock()) {
       _upstreamErrorBlock = leftBlock;
       return _upstreamErrorBlock;
@@ -228,6 +308,13 @@ public class HashJoinOperator extends MultiStageOperator {
     return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
   }
 
+  private TransferableBlock setPartialResultExceptionToBlock(TransferableBlock 
block) {
+    if (_resourceLimitExceededException != null) {
+      block.getDataBlock().addException(_resourceLimitExceededException);
+    }
+    return block;
+  }
+
   private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) 
{
     List<Object[]> container = leftBlock.getContainer();
     List<Object[]> rows = new ArrayList<>(container.size());
@@ -321,4 +408,8 @@ public class HashJoinOperator extends MultiStageOperator {
   private boolean needUnmatchedLeftRows() {
     return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL;
   }
+
+  enum JoinOverFlowMode {
+    THROW, BREAK
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index ac18ad9138..925f544ee6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -58,4 +58,10 @@ public class QueryConfig {
    */
   public static final String KEY_OF_SERVER_RESPONSE_STATUS_ERROR = "ERROR";
   public static final String KEY_OF_SERVER_RESPONSE_STATUS_OK = "OK";
+
+  /**
+   * Configuration for join overflow.
+   */
+  public static final String KEY_OF_JOIN_OVERFLOW_MODE = 
"pinot.query.join.overflow.mode";
+  public static final String KEY_OF_MAX_ROWS_IN_JOIN = 
"pinot.query.join.max.rows";
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index cc4661ca21..730ccf9f67 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -19,10 +19,15 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.exception.QueryException;
@@ -73,6 +78,12 @@ public class HashJoinOperatorTest {
     return new JoinNode.JoinKeys(leftSelect, rightSelect);
   }
 
+  private static List<RelHint> getJoinHints(Map<String, String> hintsMap) {
+    RelHint.Builder relHintBuilder = 
RelHint.builder(PinotHintOptions.JOIN_HINT_OPTIONS);
+    hintsMap.forEach(relHintBuilder::hintOption);
+    return ImmutableList.of(relHintBuilder.build());
+  }
+
   @Test
   public void shouldHandleHashJoinKeyCollisionInnerJoin() {
     DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
@@ -94,7 +105,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
Collections.emptyList());
     HashJoinOperator joinOnString =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -129,7 +140,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
Collections.emptyList());
     HashJoinOperator joinOnInt =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = joinOnInt.nextBlock();
@@ -161,7 +172,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
+        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, 
Collections.emptyList());
     HashJoinOperator joinOnInt =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = joinOnInt.nextBlock();
@@ -200,7 +211,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.LEFT,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -232,7 +243,7 @@ public class HashJoinOperatorTest {
         });
     List<RexExpression> joinClauses = new ArrayList<>();
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -261,7 +272,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.LEFT,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -294,7 +305,7 @@ public class HashJoinOperatorTest {
         });
 
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -330,7 +341,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
+        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
@@ -366,7 +377,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
+        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
@@ -398,7 +409,7 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.STRING
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.RIGHT,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
Collections.emptyList());
     HashJoinOperator joinOnNum =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = joinOnNum.nextBlock();
@@ -439,7 +450,7 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.STRING
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.SEMI,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
@@ -473,7 +484,7 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.STRING
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.FULL,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
@@ -517,7 +528,7 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.STRING
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.ANTI,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
@@ -550,7 +561,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -581,7 +592,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
Collections.emptyList());
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -590,5 +601,80 @@ public class HashJoinOperatorTest {
     
Assert.assertTrue(result.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE)
         .contains("testInnerJoinLeftError"));
   }
+
+  @Test
+  public void shouldPropagateJoinLimitError() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    Map<String, String> hintsMap = ImmutableMap.of(
+        PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "THROW",
+        PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "1"
+    );
+    JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
getJoinHints(hintsMap));
+    HashJoinOperator join =
+        new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
+
+    TransferableBlock result = join.nextBlock();
+    Assert.assertTrue(result.isErrorBlock());
+    Assert.assertTrue(
+        
result.getDataBlock().getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
+            .contains("reach number of rows limit"));
+  }
+
+  @Test
+  public void shouldHandleJoinWithPartialResultsWhenHitDataRowsLimit() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", 
"string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, 
new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new 
Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", 
"string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    Map<String, String> hintsMap = ImmutableMap.of(
+        PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "BREAK",
+        PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "1"
+    );
+    JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, 
getJoinHints(hintsMap));
+    HashJoinOperator join =
+        new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
+
+    TransferableBlock result = join.nextBlock();
+    Assert.assertFalse(result.isErrorBlock());
+    Assert.assertEquals(result.getNumRows(), 1);
+    Assert.assertTrue(
+        
result.getDataBlock().getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
+            .contains("reach number of rows limit"));
+  }
 }
 // TODO: Add more inequi join tests.
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 5c8a132954..7f33dc45a4 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.plan.pipeline;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -91,7 +92,6 @@ public class PipelineBreakerExecutorTest {
               ImmutableList.of(_server), ImmutableMap.of()))
           .build()).collect(Collectors.toList())).build();
 
-
   @AfterClass
   public void tearDownClass() {
     ExecutorServiceUtils.close(_executor);
@@ -152,7 +152,8 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode mailboxReceiveNode2 =
         new MailboxReceiveNode(0, DATA_SCHEMA, 2, 
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null);
+    JoinNode joinNode =
+        new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null, Collections.emptyList());
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(mailboxReceiveNode2);
     DistributedStagePlan distributedStagePlan =
@@ -247,7 +248,8 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode incorrectlyConfiguredMailboxNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 3, 
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null);
+    JoinNode joinNode =
+        new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null, Collections.emptyList());
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(incorrectlyConfiguredMailboxNode);
     DistributedStagePlan distributedStagePlan =
@@ -285,7 +287,8 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode incorrectlyConfiguredMailboxNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 2, 
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null);
+    JoinNode joinNode =
+        new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null, Collections.emptyList());
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(incorrectlyConfiguredMailboxNode);
     DistributedStagePlan distributedStagePlan =
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 775ff1fc58..7371510ee5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -96,7 +96,6 @@ public class CommonConstants {
     // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
     public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
 
-
     public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
 
     // Whether to rewrite DistinctCount to DistinctCountBitmap
@@ -250,19 +249,19 @@ public class CommonConstants {
     // By default, Jersey uses the default unbounded thread pool to process 
queries.
     // By enabling it, BrokerManagedAsyncExecutorProvider will be used to 
create a bounded thread pool.
     public static final String 
CONFIG_OF_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR =
-            "pinot.broker.enable.bounded.jersey.threadpool.executor";
+        "pinot.broker.enable.bounded.jersey.threadpool.executor";
     public static final boolean 
DEFAULT_ENABLE_BOUNDED_JERSEY_THREADPOOL_EXECUTOR = false;
     // Default capacities for the bounded thread pool
     public static final String 
CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE =
-            "pinot.broker.jersey.threadpool.executor.max.pool.size";
+        "pinot.broker.jersey.threadpool.executor.max.pool.size";
     public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_MAX_POOL_SIZE =
-            Runtime.getRuntime().availableProcessors() * 2;
+        Runtime.getRuntime().availableProcessors() * 2;
     public static final String 
CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE =
-            "pinot.broker.jersey.threadpool.executor.core.pool.size";
+        "pinot.broker.jersey.threadpool.executor.core.pool.size";
     public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_CORE_POOL_SIZE =
-            Runtime.getRuntime().availableProcessors() * 2;
+        Runtime.getRuntime().availableProcessors() * 2;
     public static final String CONFIG_OF_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE 
=
-            "pinot.broker.jersey.threadpool.executor.queue.size";
+        "pinot.broker.jersey.threadpool.executor.queue.size";
     public static final int DEFAULT_JERSEY_THREADPOOL_EXECUTOR_QUEUE_SIZE = 
Integer.MAX_VALUE;
 
     // used for SQL GROUP BY during broker reduce
@@ -350,6 +349,10 @@ public class CommonConstants {
 
         public static final String DROP_RESULTS = "dropResults";
 
+        // Handle JOIN Overflow
+        public static final String MAX_ROWS_IN_JOIN = "maxRowsInJoin";
+        public static final String JOIN_OVERFLOW_MODE = "joinOverflowMode";
+
         // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
         @Deprecated
         public static final String PRESERVE_TYPE = "preserveType";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to