This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6d304909d8a [multistage] add maxRowsInJoin, maxRowsInWindow, numGroups
to query response (#17784)
6d304909d8a is described below
commit 6d304909d8a7794006792d8f918d389bfd07abc7
Author: dang-stripe <[email protected]>
AuthorDate: Sat Mar 14 01:09:53 2026 -0700
[multistage] add maxRowsInJoin, maxRowsInWindow, numGroups to query
response (#17784)
---
.../pinot/common/response/BrokerResponse.java | 2 +
.../response/broker/BrokerResponseNative.java | 2 +
.../response/broker/BrokerResponseNativeV2.java | 37 +++++++-
.../query/runtime/operator/AggregateOperator.java | 9 ++
.../query/runtime/operator/BaseJoinOperator.java | 15 +++
.../query/runtime/operator/MultiStageOperator.java | 3 +
.../runtime/operator/WindowAggregateOperator.java | 12 +++
.../runtime/operator/AggregateOperatorTest.java | 39 ++++++++
.../runtime/operator/HashJoinOperatorTest.java | 104 +++++++++++++++++++++
.../operator/WindowAggregateOperatorTest.java | 68 ++++++++++++++
.../runtime/plan/MultiStageQueryStatsTest.java | 48 +++++++++-
11 files changed, 336 insertions(+), 3 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index fc800024719..603b0071d59 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -160,11 +160,13 @@ public interface BrokerResponse {
*/
boolean isMaxRowsInJoinReached();
+
/**
* Returns whether the limit for max rows in window has been reached.
*/
boolean isMaxRowsInWindowReached();
+
/**
* Returns the total time used for query execution in milliseconds.
*/
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 0e2eaeb77c6..8031395c911 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -238,12 +238,14 @@ public class BrokerResponseNative implements
BrokerResponse {
return false;
}
+
@JsonIgnore
@Override
public boolean isMaxRowsInWindowReached() {
return false;
}
+
@Override
public long getTimeUsedMs() {
return _timeUsedMs;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 84cb82183b6..e485912feea 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -38,7 +38,8 @@ import org.apache.pinot.common.response.ProcessingException;
*/
@JsonPropertyOrder({
"resultTable", "numRowsResultSet", "partialResult", "exceptions",
"numGroupsLimitReached",
- "numGroupsWarningLimitReached", "maxRowsInJoinReached",
"maxRowsInWindowReached", "timeUsedMs", "stageStats",
+ "numGroupsWarningLimitReached", "numGroups", "maxRowsInJoinReached",
"maxRowsInJoin",
+ "maxRowsInWindowReached", "maxRowsInWindow", "timeUsedMs", "stageStats",
"maxRowsInOperator", "requestId", "clientRequestId", "brokerId",
"numDocsScanned", "totalDocs",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter",
"numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched",
"numConsumingSegmentsQueried",
@@ -59,7 +60,9 @@ public class BrokerResponseNativeV2 implements BrokerResponse
{
private ResultTable _resultTable;
private int _numRowsResultSet;
private boolean _maxRowsInJoinReached;
+ private long _maxRowsInJoin;
private boolean _maxRowsInWindowReached;
+ private long _maxRowsInWindow;
private long _timeUsedMs;
/**
* Statistics for each stage of the query execution.
@@ -143,6 +146,14 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
_brokerStats.merge(StatKey.NUM_GROUPS_LIMIT_REACHED,
numGroupsLimitReached);
}
+ public long getNumGroups() {
+ return _brokerStats.getLong(StatKey.NUM_GROUPS);
+ }
+
+ public void mergeNumGroups(long numGroups) {
+ _brokerStats.merge(StatKey.NUM_GROUPS, numGroups);
+ }
+
@Override
public boolean isNumGroupsWarningLimitReached() {
return _brokerStats.getBoolean(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED);
@@ -161,6 +172,14 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
_maxRowsInJoinReached |= maxRowsInJoinReached;
}
+ public long getMaxRowsInJoin() {
+ return _maxRowsInJoin;
+ }
+
+ public void mergeMaxRowsInJoin(long maxRowsInJoin) {
+ _maxRowsInJoin = Math.max(_maxRowsInJoin, maxRowsInJoin);
+ }
+
@Override
public boolean isMaxRowsInWindowReached() {
return _maxRowsInWindowReached;
@@ -170,6 +189,14 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
_maxRowsInWindowReached |= maxRowsInWindowReached;
}
+ public long getMaxRowsInWindow() {
+ return _maxRowsInWindow;
+ }
+
+ public void mergeMaxRowsInWindow(long maxRowsInWindow) {
+ _maxRowsInWindow = Math.max(_maxRowsInWindow, maxRowsInWindow);
+ }
+
/**
* Returns the stage statistics.
*/
@@ -453,7 +480,13 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
- NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
+ NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
+ NUM_GROUPS(StatMap.Type.LONG) {
+ @Override
+ public long merge(long value1, long value2) {
+ return Math.max(value1, value2);
+ }
+ };
private final StatMap.Type _type;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 0a792c41120..f7f2da22666 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -228,6 +228,9 @@ public class AggregateOperator extends MultiStageOperator {
rows = _groupByExecutor.getResult(_groupTrimSize);
}
+ // Record stat before we check for limit so we can propagate to query
response
+ _statMap.merge(StatKey.NUM_GROUPS, _groupByExecutor.getNumGroups());
+
if (rows.isEmpty()) {
return _eosBlock;
} else {
@@ -472,6 +475,12 @@ public class AggregateOperator extends MultiStageOperator {
GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
+ NUM_GROUPS(StatMap.Type.LONG) {
+ @Override
+ public long merge(long value1, long value2) {
+ return Math.max(value1, value2);
+ }
+ },
/**
* Allocated memory in bytes for this operator or its children in the same
stage.
*/
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index fe46b175469..8ec374ca4b5 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -212,6 +212,8 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
// Row based overflow check.
if (rows.size() + numRows > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+ // Record stat before we throw so it propagates to query response
+ _statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numRows + rows.size());
throwForJoinRowLimitExceeded(
"Cannot build in memory hash table for join operator, reached
number of rows limit: " + _maxRowsInJoin);
} else {
@@ -236,6 +238,7 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
} else {
_isRightTableBuilt = true;
finishBuildingRightTable();
+ _statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numRows);
}
_statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS,
System.currentTimeMillis() - startTime);
@@ -343,6 +346,7 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
protected boolean isMaxRowsLimitReached(int numJoinedRows) {
if (numJoinedRows == _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+ _statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numJoinedRows);
throwForJoinRowLimitExceeded(
"Cannot process join, reached number of rows limit: " +
_maxRowsInJoin);
} else {
@@ -350,6 +354,7 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
logger().info("Terminating join operator early as the maximum number
of rows limit was reached: {}",
_maxRowsInJoin);
earlyTerminateLeftInput();
+ _statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numJoinedRows);
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
return true;
}
@@ -390,6 +395,16 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
* How long (CPU time) has been spent on building the hash table.
*/
TIME_BUILDING_HASH_TABLE_MS(StatMap.Type.LONG),
+ /**
+ * The max number of rows seen in the join. Recorded during right table
build (normal and overflow paths)
+ * and at the joined-output limit check in {@link #isMaxRowsLimitReached}.
+ */
+ MAX_ROWS_IN_JOIN(StatMap.Type.LONG) {
+ @Override
+ public long merge(long value1, long value2) {
+ return Math.max(value1, value2);
+ }
+ },
/**
* Allocated memory in bytes for this operator or its children in the same
stage.
*/
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index c6227918cfa..b33391d1284 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -252,6 +252,7 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
response.mergeNumGroupsLimitReached(stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED));
response.mergeNumGroupsWarningLimitReached(
stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED));
+
response.mergeNumGroups(stats.getLong(AggregateOperator.StatKey.NUM_GROUPS));
response.mergeMaxRowsInOperator(stats.getLong(AggregateOperator.StatKey.EMITTED_ROWS));
}
@@ -275,6 +276,7 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
StatMap<HashJoinOperator.StatKey> stats =
(StatMap<HashJoinOperator.StatKey>) map;
response.mergeMaxRowsInOperator(stats.getLong(HashJoinOperator.StatKey.EMITTED_ROWS));
response.mergeMaxRowsInJoinReached(stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED));
+
response.mergeMaxRowsInJoin(stats.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN));
}
@Override
@@ -411,6 +413,7 @@ public abstract class MultiStageOperator implements
Operator<MseBlock>, AutoClos
response.mergeMaxRowsInOperator(stats.getLong(WindowAggregateOperator.StatKey.EMITTED_ROWS));
response.mergeMaxRowsInWindowReached(
stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED));
+
response.mergeMaxRowsInWindow(stats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW));
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 40e077751db..f7583d1b426 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -217,6 +217,8 @@ public class WindowAggregateOperator extends
MultiStageOperator {
int containerSize = container.size();
if (_numRows + containerSize > _maxRowsInWindowCache) {
if (_windowOverflowMode == WindowOverFlowMode.THROW) {
+ // Record stat before we throw so it propagates to query response
+ _statMap.merge(StatKey.MAX_ROWS_IN_WINDOW, _numRows + containerSize);
throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
"Cannot build in memory window cache for WINDOW operator, reach
number of rows limit: "
+ _maxRowsInWindowCache);
@@ -224,6 +226,9 @@ public class WindowAggregateOperator extends
MultiStageOperator {
// Just fill up the buffer.
int remainingRows = _maxRowsInWindowCache - _numRows;
container = container.subList(0, remainingRows);
+ // Update container size here since MAX_ROWS_IN_WINDOW is recorded
after the loop exits
+ // via _numRows once EOS is received from the early-terminated input.
+ containerSize = remainingRows;
_statMap.merge(StatKey.MAX_ROWS_IN_WINDOW_REACHED, true);
// setting the inputOperator to be early terminated and awaits EOS
block next.
_input.earlyTerminate();
@@ -239,6 +244,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
checkTerminationAndSampleUsage();
block = _input.nextBlock();
}
+ _statMap.merge(StatKey.MAX_ROWS_IN_WINDOW, _numRows);
MseBlock.Eos eosBlock = (MseBlock.Eos) block;
_eosBlock = eosBlock;
// Early termination if the block is an error block
@@ -301,6 +307,12 @@ public class WindowAggregateOperator extends
MultiStageOperator {
}
},
MAX_ROWS_IN_WINDOW_REACHED(StatMap.Type.BOOLEAN),
+ MAX_ROWS_IN_WINDOW(StatMap.Type.LONG) {
+ @Override
+ public long merge(long value1, long value2) {
+ return Math.max(value1, value2);
+ }
+ },
/**
* Allocated memory in bytes for this operator or its children in the same
stage.
*/
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index c96805a1a15..60ab3e50356 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -55,6 +55,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.openMocks;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -145,6 +146,7 @@ public class AggregateOperatorTest {
when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new
Object[]{2, 1.0}, new Object[]{2, 2.0}))
.thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3.0}))
.thenReturn(SuccessMseBlock.INSTANCE);
+
when(_input.calculateStats()).thenReturn(MultiStageQueryStats.emptyStats(0));
DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new
ColumnDataType[]{INT, DOUBLE});
AggregateOperator operator = getOperator(resultSchema, aggCalls,
filterArgs, groupKeys);
@@ -156,6 +158,10 @@ public class AggregateOperatorTest {
assertEquals(resultRows.get(0), new Object[]{2, 6.0},
"Expected two columns (group by key, agg value), agg value is final
result");
assertTrue(operator.nextBlock().isSuccess(), "Second block is EOS (done
processing)");
+ MultiStageQueryStats stats = operator.calculateStats();
+ StatMap<AggregateOperator.StatKey> statMap =
OperatorTestUtil.getStatMap(AggregateOperator.StatKey.class, stats);
+ assertEquals(statMap.getLong(AggregateOperator.StatKey.NUM_GROUPS), 1,
+ "Num groups should equal the number of distinct group keys");
}
@Test
@@ -312,6 +318,8 @@ public class AggregateOperatorTest {
"num groups limit should be reached");
assertTrue(statMap.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED),
"num groups warning limit should be reached");
+ assertEquals(statMap.getLong(AggregateOperator.StatKey.NUM_GROUPS), 1,
+ "Num groups should equal the limit since only one group was accepted");
}
@Test
@@ -354,6 +362,37 @@ public class AggregateOperatorTest {
collations, limit));
}
+ @Test
+ public void shouldRecordNumGroupsBelowLimit() {
+ // Given: 1 distinct group key, limit = 2 — below limit, no overflow
+ List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new
RexExpression.InputRef(1)));
+ List<Integer> filterArgs = List.of(-1);
+ List<Integer> groupKeys = List.of(0);
+ PlanNode.NodeHint nodeHint = new
PlanNode.NodeHint(Map.of(PinotHintOptions.AGGREGATE_HINT_OPTIONS,
+ Map.of(PinotHintOptions.AggregateOptions.NUM_GROUPS_LIMIT, "2")));
+ DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, DOUBLE});
+
+ _input = new BlockListMultiStageOperator.Builder(inSchema)
+ .addRow(2, 1.0)
+ .addRow(2, 2.0)
+ .buildWithEos();
+ DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new
ColumnDataType[]{INT, DOUBLE});
+ AggregateOperator operator = getOperator(resultSchema, aggCalls,
filterArgs, groupKeys, nodeHint, Map.of());
+
+ // When:
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+
+ // Then:
+ assertEquals(resultRows.size(), 1);
+ assertTrue(operator.nextBlock().isEos());
+ MultiStageQueryStats stats = operator.calculateStats();
+ StatMap<AggregateOperator.StatKey> statMap =
OperatorTestUtil.getStatMap(AggregateOperator.StatKey.class, stats);
+
assertFalse(statMap.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED),
+ "Num groups limit should not be reached when groups are below limit");
+ assertEquals(statMap.getLong(AggregateOperator.StatKey.NUM_GROUPS), 1,
+ "Num groups should equal 1");
+ }
+
private static RexExpression.FunctionCall getSum(RexExpression arg) {
return new RexExpression.FunctionCall(ColumnDataType.INT,
SqlKind.SUM.name(), List.of(arg));
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 4c101b17343..7ba9c8c64f4 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -41,6 +41,7 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.openMocks;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
@@ -107,6 +108,10 @@ public class HashJoinOperatorTest {
assertEquals(resultRows.size(), 2);
assertEquals(resultRows.get(0), new Object[]{2, "BB", 2, "Aa"});
assertEquals(resultRows.get(1), new Object[]{2, "BB", 2, "BB"});
+ StatMap<HashJoinOperator.StatKey> statMap =
+ OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class,
operator.calculateStats());
+ assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 3,
+ "Max rows in join should equal right table size");
}
@Test
@@ -341,6 +346,10 @@ public class HashJoinOperatorTest {
.contains("reached number of rows limit"));
assertTrue(((ErrorMseBlock)
block).getErrorMessages().get(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED)
.contains("Cannot build in memory hash table"));
+ StatMap<HashJoinOperator.StatKey> statMap =
+ OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class,
operator.calculateStats());
+ assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 3,
+ "Max rows in join should be recorded even on THROW");
}
@Test
@@ -373,6 +382,8 @@ public class HashJoinOperatorTest {
OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class,
operator.calculateStats());
assertTrue(statMap.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED),
"Max rows in join should be reached");
+ assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 1,
+ "Max rows in join should equal the truncated right table size");
}
@Test
@@ -399,6 +410,10 @@ public class HashJoinOperatorTest {
.contains("reached number of rows limit"));
assertTrue(((ErrorMseBlock)
block).getErrorMessages().get(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED)
.contains("Cannot process join"));
+ StatMap<HashJoinOperator.StatKey> statMap =
+ OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class,
operator.calculateStats());
+ assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 2,
+ "Max rows in join should be recorded even on THROW during output
phase");
}
@Test
@@ -441,6 +456,8 @@ public class HashJoinOperatorTest {
OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class,
operator.calculateStats());
assertTrue(statMap.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED),
"Max rows in join should be reached");
+ assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 2,
+ "Max rows in join should equal the truncated right table size");
}
@Test
@@ -765,6 +782,93 @@ public class HashJoinOperatorTest {
assertTrue(containsRow(resultRows, new Object[]{3, "Cc", 3.0})); //
Unmatched preserved
}
+ @Test
+ public void shouldRecordMaxRowsInJoinWhenRightTableFitsExactlyAtLimit() {
+ _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(1, "Aa")
+ .addRow(2, "BB")
+ .buildWithEos();
+ _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(2, "Aa")
+ .addRow(3, "BB")
+ .buildWithEos();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_col2", "string_col2"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.INT, ColumnDataType.STRING});
+ PlanNode.NodeHint nodeHint = new
PlanNode.NodeHint(Map.of(PinotHintOptions.JOIN_HINT_OPTIONS,
+ Map.of(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "BREAK",
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "2")));
+ HashJoinOperator operator =
+ getOperator(resultSchema, JoinRelType.INNER, List.of(0), List.of(0),
List.of(), nodeHint);
+ operator.nextBlock(); // data block
+ operator.nextBlock(); // eos
+
+ StatMap<HashJoinOperator.StatKey> statMap =
+ OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class,
operator.calculateStats());
+
assertFalse(statMap.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED),
+ "Max rows in join should not be reached when right table fits exactly
at limit");
+ assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 2,
+ "Max rows in join should equal right table size");
+ }
+
+ @Test
+ public void shouldRecordZeroMaxRowsInJoinWhenRightTableIsEmpty() {
+ _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(1, "Aa")
+ .buildWithEos();
+ _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .buildWithEos();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_col2", "string_col2"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.INT, ColumnDataType.STRING});
+ HashJoinOperator operator =
+ getOperator(resultSchema, JoinRelType.INNER, List.of(0), List.of(0),
List.of());
+ operator.nextBlock(); // eos (no matches)
+
+ StatMap<HashJoinOperator.StatKey> statMap =
+ OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class,
operator.calculateStats());
+ assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 0,
+ "Max rows in join should be 0 when right table is empty");
+ }
+
+ @Test
+ public void
shouldRecordJoinedOutputSizeWhenRightTableFitsButJoinedOutputExceedsLimit() {
+ // Right table has 2 rows (fits under limit of 5), but each left row
matches both right rows,
+ // producing 4 x 2 = 8 potential joined rows which exceeds the limit.
+ // MAX_ROWS_IN_JOIN should reflect the joined output size (5), not the
right table size (2).
+ _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .spied()
+ .addRow(1, "Aa")
+ .addRow(2, "Aa")
+ .addRow(3, "Aa")
+ .addRow(4, "Aa")
+ .buildWithEos();
+ _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(10, "Aa")
+ .addRow(20, "Aa")
+ .buildWithEos();
+ DataSchema resultSchema = new DataSchema(new String[]{"int_col1",
"string_col1", "int_col2", "string_col2"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.INT, ColumnDataType.STRING});
+ PlanNode.NodeHint nodeHint = new
PlanNode.NodeHint(Map.of(PinotHintOptions.JOIN_HINT_OPTIONS,
+ Map.of(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "BREAK",
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "5")));
+ HashJoinOperator operator =
+ getOperator(resultSchema, JoinRelType.INNER, List.of(1), List.of(1),
List.of(), nodeHint);
+
+ // When
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+
+ // Then
+ Mockito.verify(_leftInput).earlyTerminate();
+ assertEquals(resultRows.size(), 5, "Should have exactly 5 joined rows
(truncated from potential 8)");
+ MseBlock block2 = operator.nextBlock();
+ assertTrue(block2.isSuccess());
+ StatMap<HashJoinOperator.StatKey> statMap =
+ OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class,
operator.calculateStats());
+
assertTrue(statMap.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED),
+ "Max rows in join should be reached");
+ assertEquals(statMap.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN), 5,
+ "Max rows in join should reflect the joined output size, not the right
table size");
+ }
+
private HashJoinOperator getOperator(DataSchema leftSchema, DataSchema
resultSchema, JoinRelType joinType,
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression>
nonEquiConditions,
PlanNode.NodeHint nodeHint) {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index db57f40a068..0ab3cab16ae 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -50,6 +50,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.openMocks;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -137,6 +138,10 @@ public class WindowAggregateOperatorTest {
assertEquals(resultRows.size(), 1);
assertEquals(resultRows.get(0), new Object[]{2, 1, 1.0});
assertTrue(operator.nextBlock().isSuccess(), "Second block is EOS (done
processing)");
+ StatMap<WindowAggregateOperator.StatKey> windowStats =
+ OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class,
operator.calculateStats());
+
assertEquals(windowStats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW),
1,
+ "Max rows in window should equal number of input rows");
}
@Test
@@ -498,6 +503,10 @@ public class WindowAggregateOperatorTest {
assertTrue(block.isError(), "expected ERROR block from window overflow");
assertTrue(((ErrorMseBlock)
block).getErrorMessages().get(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED)
.contains("reach number of rows limit"));
+ StatMap<WindowAggregateOperator.StatKey> windowStats =
+ OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class,
operator.calculateStats());
+
assertEquals(windowStats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW),
2,
+ "Max rows in window should be recorded even on THROW");
}
@Test
@@ -533,6 +542,8 @@ public class WindowAggregateOperatorTest {
OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class,
operator.calculateStats());
assertTrue(windowStats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED),
"Max rows in window should be reached");
+
assertEquals(windowStats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW),
1,
+ "Max rows in window value should match the number of cached rows");
}
@Test
@@ -2987,6 +2998,63 @@ public class WindowAggregateOperatorTest {
assertEquals(e.getMessage(), "RANGE window frame with offset PRECEDING /
FOLLOWING is not supported");
}
+ @Test
+ public void testShouldRecordMaxRowsInWindowWhenInputFitsExactlyAtLimit() {
+ // Given: 1 input row, limit = 1 — fits exactly, no overflow
+ DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
+ MultiStageOperator input = new
BlockListMultiStageOperator.Builder(inputSchema)
+ .addBlock(new Object[]{2, 1})
+ .buildWithEos();
+ DataSchema resultSchema =
+ new DataSchema(new String[]{"group", "arg", "sum"}, new
ColumnDataType[]{INT, INT, DOUBLE});
+ List<Integer> keys = List.of(0);
+ List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new
RexExpression.InputRef(1)));
+ PlanNode.NodeHint nodeHint = new
PlanNode.NodeHint(Map.of(PinotHintOptions.WINDOW_HINT_OPTIONS,
+ Map.of(PinotHintOptions.WindowHintOptions.WINDOW_OVERFLOW_MODE,
"BREAK",
+ PinotHintOptions.WindowHintOptions.MAX_ROWS_IN_WINDOW, "1")));
+ WindowAggregateOperator operator =
+ getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls,
WindowNode.WindowFrameType.RANGE,
+ Integer.MIN_VALUE, Integer.MAX_VALUE, nodeHint, input);
+
+ // When:
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+
+ // Then:
+ assertEquals(resultRows.size(), 1);
+ assertTrue(operator.nextBlock().isSuccess());
+ StatMap<WindowAggregateOperator.StatKey> windowStats =
+ OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class,
operator.calculateStats());
+
assertFalse(windowStats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED),
+ "Max rows in window should not be reached when input fits exactly at
limit");
+
assertEquals(windowStats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW),
1,
+ "Max rows in window should equal number of input rows");
+ }
+
+ @Test
+ public void testShouldRecordZeroMaxRowsInWindowWhenInputIsEmpty() {
+ // Given: 0 input rows (just EOS)
+ DataSchema inputSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
+ MultiStageOperator input = new
BlockListMultiStageOperator.Builder(inputSchema)
+ .buildWithEos();
+ DataSchema resultSchema =
+ new DataSchema(new String[]{"group", "arg", "sum"}, new
ColumnDataType[]{INT, INT, DOUBLE});
+ List<Integer> keys = List.of(0);
+ List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new
RexExpression.InputRef(1)));
+ WindowAggregateOperator operator =
+ getOperator(inputSchema, resultSchema, keys, List.of(), aggCalls,
WindowNode.WindowFrameType.RANGE,
+ Integer.MIN_VALUE, Integer.MAX_VALUE, input);
+
+ // When:
+ MseBlock block = operator.nextBlock();
+
+ // Then:
+ assertTrue(block.isEos());
+ StatMap<WindowAggregateOperator.StatKey> windowStats =
+ OperatorTestUtil.getStatMap(WindowAggregateOperator.StatKey.class,
operator.calculateStats());
+
assertEquals(windowStats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW),
0,
+ "Max rows in window should be 0 when input is empty");
+ }
+
private WindowAggregateOperator getOperator(DataSchema inputSchema,
DataSchema resultSchema, List<Integer> keys,
List<RelFieldCollation> collations, List<RexExpression.FunctionCall>
aggCalls,
WindowNode.WindowFrameType windowFrameType, int lowerBound, int
upperBound, PlanNode.NodeHint nodeHint,
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStatsTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStatsTest.java
index fee7dfdfd7a..c54e2496269 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStatsTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStatsTest.java
@@ -21,11 +21,14 @@ package org.apache.pinot.query.runtime.plan;
import java.io.IOException;
import java.util.List;
import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.query.runtime.operator.AggregateOperator;
import org.apache.pinot.query.runtime.operator.BaseMailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.HashJoinOperator;
import org.apache.pinot.query.runtime.operator.LeafOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.SortOperator;
+import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
import org.apache.pinot.segment.spi.memory.DataBuffer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
@@ -57,7 +60,8 @@ public class MultiStageQueryStatsTest {
@DataProvider(name = "stats")
public static MultiStageQueryStats[] stats() {
return new MultiStageQueryStats[] {
- stats1()
+ stats1(),
+ stats2()
};
}
@@ -97,4 +101,46 @@ public class MultiStageQueryStatsTest {
.close())
.build();
}
+
+ public static MultiStageQueryStats stats2() {
+ return new MultiStageQueryStats.Builder(1)
+ .customizeOpen(open ->
+ open.addLastOperator(MultiStageOperator.Type.MAILBOX_RECEIVE,
+ new StatMap<>(BaseMailboxReceiveOperator.StatKey.class)
+
.merge(BaseMailboxReceiveOperator.StatKey.EXECUTION_TIME_MS, 50)
+
.merge(BaseMailboxReceiveOperator.StatKey.EMITTED_ROWS, 20))
+ .addLastOperator(MultiStageOperator.Type.HASH_JOIN,
+ new StatMap<>(HashJoinOperator.StatKey.class)
+ .merge(HashJoinOperator.StatKey.EXECUTION_TIME_MS, 30)
+ .merge(HashJoinOperator.StatKey.EMITTED_ROWS, 15)
+ .merge(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN,
100L))
+ .addLastOperator(MultiStageOperator.Type.WINDOW,
+ new StatMap<>(WindowAggregateOperator.StatKey.class)
+
.merge(WindowAggregateOperator.StatKey.EXECUTION_TIME_MS, 20)
+ .merge(WindowAggregateOperator.StatKey.EMITTED_ROWS,
15)
+
.merge(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW, 50L))
+ .addLastOperator(MultiStageOperator.Type.MAILBOX_SEND,
+ new StatMap<>(MailboxSendOperator.StatKey.class)
+ .merge(MailboxSendOperator.StatKey.STAGE, 1)
+ .merge(MailboxSendOperator.StatKey.EXECUTION_TIME_MS,
40)
+ .merge(MailboxSendOperator.StatKey.EMITTED_ROWS, 15))
+ )
+ .addLast(stageStats ->
+ stageStats.addLastOperator(MultiStageOperator.Type.LEAF,
+ new StatMap<>(LeafOperator.StatKey.class)
+ .merge(LeafOperator.StatKey.EXECUTION_TIME_MS, 80)
+ .merge(LeafOperator.StatKey.EMITTED_ROWS, 30))
+ .addLastOperator(MultiStageOperator.Type.AGGREGATE,
+ new StatMap<>(AggregateOperator.StatKey.class)
+ .merge(AggregateOperator.StatKey.EXECUTION_TIME_MS, 25)
+ .merge(AggregateOperator.StatKey.EMITTED_ROWS, 10)
+ .merge(AggregateOperator.StatKey.NUM_GROUPS, 5L))
+ .addLastOperator(MultiStageOperator.Type.MAILBOX_SEND,
+ new StatMap<>(MailboxSendOperator.StatKey.class)
+ .merge(MailboxSendOperator.StatKey.STAGE, 2)
+ .merge(MailboxSendOperator.StatKey.EXECUTION_TIME_MS,
60)
+ .merge(MailboxSendOperator.StatKey.EMITTED_ROWS, 10))
+ .close())
+ .build();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]