This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c3fc1b9156 [Multi-stage] Only track max joined rows within each block
(#13981)
c3fc1b9156 is described below
commit c3fc1b915675366163bd0e54204ccec47b3ae2e6
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Sep 12 11:37:40 2024 -0700
[Multi-stage] Only track max joined rows within each block (#13981)
---
.../query/runtime/operator/HashJoinOperator.java | 98 ++++++++++------------
1 file changed, 44 insertions(+), 54 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 8289d1c86f..9d845561c8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -110,8 +110,6 @@ public class HashJoinOperator extends MultiStageOperator {
private final JoinOverFlowMode _joinOverflowMode;
private boolean _isHashTableBuilt;
- private int _currentRowsInHashTable;
- private int _currentJoinedRows;
private TransferableBlock _upstreamErrorBlock;
private MultiStageQueryStats _leftSideStats;
private MultiStageQueryStats _rightSideStats;
@@ -150,8 +148,6 @@ public class HashJoinOperator extends MultiStageOperator {
PlanNode.NodeHint nodeHint = node.getNodeHint();
_maxRowsInJoin = getMaxRowsInJoin(metadata, nodeHint);
_joinOverflowMode = getJoinOverflowMode(metadata, nodeHint);
- _currentRowsInHashTable = 0;
- _currentJoinedRows = 0;
}
@Override
@@ -225,17 +221,18 @@ public class HashJoinOperator extends MultiStageOperator {
private void buildBroadcastHashTable()
throws ProcessingException {
long startTime = System.currentTimeMillis();
+ int numRowsInHashTable = 0;
TransferableBlock rightBlock = _rightInput.nextBlock();
while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
List<Object[]> container = rightBlock.getContainer();
// Row based overflow check.
- if (container.size() + _currentRowsInHashTable > _maxRowsInJoin) {
+ if (container.size() + numRowsInHashTable > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
- throwProcessingExceptionForJoinRowLimitExceeded("Cannot build in
memory hash table for join operator, "
- + "reached number of rows limit: " + _maxRowsInJoin);
+ throwProcessingExceptionForJoinRowLimitExceeded(
+ "Cannot build in memory hash table for join operator, reached
number of rows limit: " + _maxRowsInJoin);
} else {
// Just fill up the buffer.
- int remainingRows = _maxRowsInJoin - _currentRowsInHashTable;
+ int remainingRows = _maxRowsInJoin - numRowsInHashTable;
container = container.subList(0, remainingRows);
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
// setting only the rightTableOperator to be early terminated and
awaits EOS block next.
@@ -252,7 +249,7 @@ public class HashJoinOperator extends MultiStageOperator {
}
hashCollection.add(row);
}
- _currentRowsInHashTable += container.size();
+ numRowsInHashTable += container.size();
sampleAndCheckInterruption();
rightBlock = _rightInput.nextBlock();
}
@@ -319,25 +316,6 @@ public class HashJoinOperator extends MultiStageOperator {
}
}
- private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
- throws ProcessingException {
- List<Object[]> container = leftBlock.getContainer();
- List<Object[]> rows = new ArrayList<>(container.size());
-
- for (Object[] leftRow : container) {
- Object key = _leftKeySelector.getKey(leftRow);
- // SEMI-JOIN only checks existence of the key
- if (_broadcastRightTable.containsKey(key)) {
- if (incrementJoinedRowsAndCheckLimit()) {
- break;
- }
- rows.add(joinRow(leftRow, null));
- }
- }
-
- return rows;
- }
-
private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock
leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
@@ -349,7 +327,7 @@ public class HashJoinOperator extends MultiStageOperator {
List<Object[]> rightRows = _broadcastRightTable.get(key);
if (rightRows == null) {
if (needUnmatchedLeftRows()) {
- if (incrementJoinedRowsAndCheckLimit()) {
+ if (isMaxRowsLimitReached(rows.size())) {
break;
}
rows.add(joinRow(leftRow, null));
@@ -359,13 +337,15 @@ public class HashJoinOperator extends MultiStageOperator {
boolean hasMatchForLeftRow = false;
int numRightRows = rightRows.size();
rows.ensureCapacity(rows.size() + numRightRows);
+ boolean maxRowsLimitReached = false;
for (int i = 0; i < numRightRows; i++) {
Object[] rightRow = rightRows.get(i);
// TODO: Optimize this to avoid unnecessary object copy.
Object[] resultRow = joinRow(leftRow, rightRow);
if (_nonEquiEvaluators.isEmpty() || _nonEquiEvaluators.stream()
.allMatch(evaluator ->
BooleanUtils.isTrueInternalValue(evaluator.apply(resultRow)))) {
- if (incrementJoinedRowsAndCheckLimit()) {
+ if (isMaxRowsLimitReached(rows.size())) {
+ maxRowsLimitReached = true;
break;
}
rows.add(resultRow);
@@ -375,11 +355,11 @@ public class HashJoinOperator extends MultiStageOperator {
}
}
}
- if (_currentJoinedRows > _maxRowsInJoin) {
+ if (maxRowsLimitReached) {
break;
}
if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
- if (incrementJoinedRowsAndCheckLimit()) {
+ if (isMaxRowsLimitReached(rows.size())) {
break;
}
rows.add(joinRow(leftRow, null));
@@ -389,8 +369,22 @@ public class HashJoinOperator extends MultiStageOperator {
return rows;
}
- private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
- throws ProcessingException {
+ private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
{
+ List<Object[]> container = leftBlock.getContainer();
+ List<Object[]> rows = new ArrayList<>(container.size());
+
+ for (Object[] leftRow : container) {
+ Object key = _leftKeySelector.getKey(leftRow);
+ // SEMI-JOIN only checks existence of the key
+ if (_broadcastRightTable.containsKey(key)) {
+ rows.add(joinRow(leftRow, null));
+ }
+ }
+
+ return rows;
+ }
+
+ private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
{
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());
@@ -398,9 +392,6 @@ public class HashJoinOperator extends MultiStageOperator {
Object key = _leftKeySelector.getKey(leftRow);
// ANTI-JOIN only checks non-existence of the key
if (!_broadcastRightTable.containsKey(key)) {
- if (incrementJoinedRowsAndCheckLimit()) {
- break;
- }
rows.add(joinRow(leftRow, null));
}
}
@@ -475,18 +466,17 @@ public class HashJoinOperator extends MultiStageOperator {
}
/**
- * Increments {@link #_currentJoinedRows} and checks if the limit has been
exceeded. If the limit has been exceeded,
- * either an exception is thrown or the left input is early terminated based
on the {@link #_joinOverflowMode}.
+ * Checks if we have reached the rows limit for joined rows. If the limit
has been reached, either an exception is
+ * thrown or the left input is early terminated based on the {@link
#_joinOverflowMode}.
*
- * @return {@code true} if the limit has been exceeded, {@code false}
otherwise
+ * @return {@code true} if the limit has been reached, {@code false}
otherwise.
*/
- private boolean incrementJoinedRowsAndCheckLimit()
+ private boolean isMaxRowsLimitReached(int numJoinedRows)
throws ProcessingException {
- _currentJoinedRows++;
- if (_currentJoinedRows > _maxRowsInJoin) {
+ if (numJoinedRows == _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
- throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join,
reached number of rows limit: "
- + _maxRowsInJoin);
+ throwProcessingExceptionForJoinRowLimitExceeded(
+ "Cannot process join, reached number of rows limit: " +
_maxRowsInJoin);
} else {
// Skip over remaining blocks until we reach the end of stream since
we already breached the rows limit.
logger().info("Terminating join operator early as the maximum number
of rows limit was reached: {}",
@@ -504,15 +494,15 @@ public class HashJoinOperator extends MultiStageOperator {
throws ProcessingException {
ProcessingException resourceLimitExceededException =
new
ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
- resourceLimitExceededException.setMessage(
- reason + ". Consider increasing the limit for the maximum number of
rows in a join either via the query "
- + "option '" +
CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
- + PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in
the '"
- + PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if
partial results are acceptable, the join"
- + " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name()
+ "' either via the query option '"
- + CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE
+ "' or the '"
- + PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in
the '"
- + PinotHintOptions.JOIN_HINT_OPTIONS + "'.");
+ resourceLimitExceededException.setMessage(reason
+ + ". Consider increasing the limit for the maximum number of rows in a
join either via the query option '"
+ + CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "'
or the '"
+ + PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the
'" + PinotHintOptions.JOIN_HINT_OPTIONS
+ + "'. Alternatively, if partial results are acceptable, the join
overflow mode can be set to '"
+ + JoinOverFlowMode.BREAK.name() + "' either via the query option '"
+ + CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE +
"' or the '"
+ + PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the
'" + PinotHintOptions.JOIN_HINT_OPTIONS
+ + "'.");
throw resourceLimitExceededException;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]