gortiz commented on code in PR #13922:
URL: https://github.com/apache/pinot/pull/13922#discussion_r1740787947
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java:
##########
@@ -432,6 +447,66 @@ private boolean needUnmatchedLeftRows() {
return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL;
}
+ private void earlyTerminateLeftInput() {
+ _leftInput.earlyTerminate();
+ TransferableBlock leftBlock = _leftInput.nextBlock();
+
+ while (!leftBlock.isSuccessfulEndOfStreamBlock()) {
+ if (leftBlock.isErrorBlock()) {
+ _upstreamErrorBlock = leftBlock;
+ return;
+ }
+ leftBlock = _leftInput.nextBlock();
+ }
+
+ assert leftBlock.isSuccessfulEndOfStreamBlock();
+ assert _rightSideStats != null;
+ _leftSideStats = leftBlock.getQueryStats();
+ assert _leftSideStats != null;
+ _leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(), _statMap);
+ _isTerminated = true;
+ }
+
+ /**
+ * Increments {@link #_currentJoinedRows} and checks if the limit has been
exceeded. If the limit has been exceeded,
+ * either an exception is thrown or the left input is early terminated based
on the {@link #_joinOverflowMode}.
+ *
+ * @return {@code true} if the limit has been exceeded, {@code false}
otherwise
+ */
+ private boolean incrementJoinedRowsAndCheckLimit() throws
ProcessingException {
+ _currentJoinedRows++;
+ if (_currentJoinedRows > _maxRowsInJoin) {
+ if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+ throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join,
reached number of rows limit: "
+ + _maxRowsInJoin);
+ } else {
+ // Skip over remaining blocks until we reach the end of stream since
we already breached the rows limit.
+ logger().info("Terminating join operator early as the maximum number
of rows limit was reached: {}",
+ _maxRowsInJoin);
+ earlyTerminateLeftInput();
+ _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
+ return true;
+ }
+ }
+
+ return false;
+ }
Review Comment:
isn't `_currentJoinedRows` always be equal to `_rows.size()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]