This is an automated email from the ASF dual-hosted git repository.
pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c7fcda980b [timeseries] Add Support for limit and numGroupsLimit
(#14945)
c7fcda980b is described below
commit c7fcda980b686148952d055bed8d8e2390b7a530
Author: Ankit Sultana <[email protected]>
AuthorDate: Mon Feb 3 15:28:00 2025 -0600
[timeseries] Add Support for limit and numGroupsLimit (#14945)
* [timeseries] Add Support for limit and numGroupsLimit
* fix bug and test
* fix signature
* add plan serde test
---
.../requesthandler/TimeSeriesRequestHandler.java | 11 +++++++++-
.../timeseries/TimeSeriesOperatorUtils.java | 5 +----
.../pinot/tsdb/m3ql/M3TimeSeriesPlanner.java | 14 +++++++++---
.../PhysicalTimeSeriesServerPlanVisitor.java | 7 +++---
.../PhysicalTimeSeriesServerPlanVisitorTest.java | 17 ++++++++++++---
.../pinot/segment/spi/AggregationFunctionType.java | 2 +-
.../tsdb/planner/TimeSeriesPlanFragmenterTest.java | 7 +++++-
.../pinot/tsdb/spi/RangeTimeSeriesRequest.java | 19 +++++++++++++++-
.../tsdb/spi/plan/LeafTimeSeriesPlanNode.java | 25 +++++++++++++++++-----
.../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java | 13 +++++++----
.../spi/plan/serde/TimeSeriesPlanSerdeTest.java | 13 +++++++++--
11 files changed, 105 insertions(+), 28 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index d14f286013..ac6962c592 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -150,6 +150,8 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
Long endTs = null;
String step = null;
String timeoutStr = null;
+ int limit = RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT;
+ int numGroupsLimit = RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT;
for (NameValuePair nameValuePair : pairs) {
switch (nameValuePair.getName()) {
case "query":
@@ -167,6 +169,12 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
case "timeout":
timeoutStr = nameValuePair.getValue();
break;
+ case "limit":
+ limit = Integer.parseInt(nameValuePair.getValue());
+ break;
+ case "numGroupsLimit":
+ numGroupsLimit = Integer.parseInt(nameValuePair.getValue());
+ break;
default:
/* Okay to ignore unknown parameters since the language implementor
may be using them. */
break;
@@ -182,7 +190,8 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
timeout = HumanReadableDuration.from(timeoutStr);
}
// TODO: Pass full raw query param string to the request
- return new RangeTimeSeriesRequest(language, query, startTs, endTs,
stepSeconds, timeout, queryParamString);
+ return new RangeTimeSeriesRequest(language, query, startTs, endTs,
stepSeconds, timeout, limit, numGroupsLimit,
+ queryParamString);
}
public static Long getStepSeconds(@Nullable String step) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
index 236d4f858f..7f095c47d1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
@@ -44,10 +44,7 @@ public class TimeSeriesOperatorUtils {
if (groupByResultsBlock.getNumRows() == 0) {
return new TimeSeriesBlock(timeBuckets, new HashMap<>());
}
- if (groupByResultsBlock.isNumGroupsLimitReached()) {
- throw new IllegalStateException(String.format("Series limit reached.
Number of series: %s",
- groupByResultsBlock.getNumRows()));
- }
+ // TODO: Check isNumGroupsLimitReached, and propagate it somehow to the
caller.
Map<Long, List<TimeSeries>> timeSeriesMap = new
HashMap<>(groupByResultsBlock.getNumRows());
List<String> tagNames =
getTagNamesFromDataSchema(Objects.requireNonNull(groupByResultsBlock.getDataSchema(),
"DataSchema is null in leaf stage of time-series query"));
diff --git
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
index 42515083c0..e38ae76d53 100644
---
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
+++
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
@@ -21,8 +21,10 @@ package org.apache.pinot.tsdb.m3ql;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -78,7 +80,8 @@ public class M3TimeSeriesPlanner implements
TimeSeriesLogicalPlanner {
switch (command) {
case "fetch":
List<String> tokens = commands.get(commandId).subList(1,
commands.get(commandId).size());
- currentNode = handleFetchNode(planIdGenerator.generateId(), tokens,
children, aggInfo, groupByColumns);
+ currentNode = handleFetchNode(planIdGenerator.generateId(), tokens,
children, aggInfo, groupByColumns,
+ request);
break;
case "sum":
case "min":
@@ -118,7 +121,8 @@ public class M3TimeSeriesPlanner implements
TimeSeriesLogicalPlanner {
}
public BaseTimeSeriesPlanNode handleFetchNode(String planId, List<String>
tokens,
- List<BaseTimeSeriesPlanNode> children, AggInfo aggInfo, List<String>
groupByColumns) {
+ List<BaseTimeSeriesPlanNode> children, AggInfo aggInfo, List<String>
groupByColumns,
+ RangeTimeSeriesRequest request) {
Preconditions.checkState(tokens.size() % 2 == 0, "Mismatched args");
String tableName = null;
String timeColumn = null;
@@ -152,7 +156,11 @@ public class M3TimeSeriesPlanner implements
TimeSeriesLogicalPlanner {
Preconditions.checkNotNull(timeColumn, "Time column not set. Set via
time_col=");
Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via
time_unit=");
Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via
value=");
+ Map<String, String> queryOptions = new HashMap<>();
+ if (request.getNumGroupsLimit() > 0) {
+ queryOptions.put("numGroupsLimit",
Integer.toString(request.getNumGroupsLimit()));
+ }
return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn,
timeUnit, 0L, filter, valueExpr, aggInfo,
- groupByColumns);
+ groupByColumns, request.getLimit(), queryOptions);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
index cfcd80a9e7..72857d403b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
@@ -19,7 +19,6 @@
package org.apache.pinot.query.runtime.timeseries;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -111,14 +110,16 @@ public class PhysicalTimeSeriesServerPlanVisitor {
leafNode.getTimeUnit(), timeBuckets, leafNode.getOffsetSeconds() ==
null ? 0 : leafNode.getOffsetSeconds());
ExpressionContext aggregation =
TimeSeriesAggregationFunction.create(context.getLanguage(),
leafNode.getValueExpression(), timeTransform, timeBuckets,
leafNode.getAggInfo());
+ Map<String, String> queryOptions = new
HashMap<>(leafNode.getQueryOptions());
+ queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(Math.max(0L,
context.getRemainingTimeMs())));
return new QueryContext.Builder()
.setTableName(leafNode.getTableName())
.setFilter(filterContext)
.setGroupByExpressions(groupByExpressions)
.setSelectExpressions(List.of(aggregation))
- .setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS,
Long.toString(context.getRemainingTimeMs())))
+ .setQueryOptions(queryOptions)
.setAliasList(Collections.emptyList())
- .setLimit(Integer.MAX_VALUE)
+ .setLimit(leafNode.getLimit())
.build();
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
index f98e4228e0..66654b0117 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.query.runtime.timeseries;
+import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -27,6 +29,7 @@ import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
@@ -43,6 +46,8 @@ import static org.testng.Assert.assertTrue;
public class PhysicalTimeSeriesServerPlanVisitorTest {
private static final String LANGUAGE = "m3ql";
private static final int DUMMY_DEADLINE_MS = 10_000;
+ private static final int SERIES_LIMIT = 1000;
+ private static final Map<String, String> QUERY_OPTIONS =
Collections.emptyMap();
@BeforeClass
public void setUp() {
@@ -65,20 +70,24 @@ public class PhysicalTimeSeriesServerPlanVisitorTest {
DUMMY_DEADLINE_MS, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(),
tableName, timeColumn, TimeUnit.SECONDS, 0L,
- filterExpr, "orderCount", aggInfo,
Collections.singletonList("cityName"));
+ filterExpr, "orderCount", aggInfo,
Collections.singletonList("cityName"), SERIES_LIMIT,
+ QUERY_OPTIONS);
QueryContext queryContext =
serverPlanVisitor.compileQueryContext(leafNode, context);
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime > '990' AND orderTime <=
'1990')");
assertTrue(isNumber(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)));
+ assertEquals(queryContext.getLimit(), SERIES_LIMIT);
}
- // Case-2: With offset, complex group-by expression, complex value, and
non-empty filter
+ // Case-2: With offset, complex group-by expression, complex value,
non-empty filter, 0 limit, query options.
{
+ Map<String, String> queryOptions = ImmutableMap.of("numGroupsLimit",
"1000");
TimeSeriesExecutionContext context =
new TimeSeriesExecutionContext(LANGUAGE,
TimeBuckets.ofSeconds(1000L, Duration.ofSeconds(10), 100),
DUMMY_DEADLINE_MS, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(),
tableName, timeColumn, TimeUnit.SECONDS, 10L,
- filterExpr, "orderCount*2", aggInfo,
Collections.singletonList("concat(cityName, stateName, '-')"));
+ filterExpr, "orderCount*2", aggInfo,
Collections.singletonList("concat(cityName, stateName, '-')"),
+ 0 /* limit */, queryOptions);
QueryContext queryContext =
serverPlanVisitor.compileQueryContext(leafNode, context);
assertNotNull(queryContext);
assertNotNull(queryContext.getGroupByExpressions());
@@ -87,6 +96,8 @@ public class PhysicalTimeSeriesServerPlanVisitorTest {
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime > '980' AND orderTime <=
'1980')");
assertTrue(isNumber(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)));
+ assertEquals(queryContext.getLimit(),
RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT);
+ assertEquals(queryContext.getQueryOptions().get("numGroupsLimit"),
"1000");
}
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 74e7a135b4..4f0f0c48f9 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -223,7 +223,7 @@ public enum AggregationFunctionType {
PERCENTILERAWKLLMV("percentileRawKLLMV", ReturnTypes.VARCHAR,
OperandTypes.family(List.of(SqlTypeFamily.ARRAY, SqlTypeFamily.NUMERIC,
SqlTypeFamily.INTEGER), i -> i == 2),
SqlTypeName.OTHER),
- TIMESERIESAGGREGATE("timeSeriesAggregate", SqlTypeName.OTHER,
SqlTypeName.VARCHAR);
+ TIMESERIESAGGREGATE("timeSeriesAggregate", SqlTypeName.OTHER,
SqlTypeName.OTHER);
private static final Set<String> NAMES = Arrays.stream(values())
.flatMap(func -> Stream.of(func.name(), func.getName(),
func.getName().toLowerCase()))
diff --git
a/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
index 8727f64ddc..8ae40ebbba 100644
---
a/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
+++
b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.tsdb.planner;
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
@@ -31,6 +32,9 @@ import static org.testng.Assert.*;
public class TimeSeriesPlanFragmenterTest {
+ private static final int SERIES_LIMIT = 1000;
+ private static final Map<String, String> QUERY_OPTIONS =
Collections.emptyMap();
+
@Test
public void testGetFragmentsWithMultipleLeafNodes() {
/*
@@ -136,7 +140,8 @@ public class TimeSeriesPlanFragmenterTest {
private LeafTimeSeriesPlanNode createMockLeafNode(String id) {
return new LeafTimeSeriesPlanNode(id, Collections.emptyList(),
"someTableName", "someTimeColumn",
- TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList());
+ TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList(),
+ SERIES_LIMIT, QUERY_OPTIONS);
}
static class MockTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
index ecbc3b3f6b..f44f181ff9 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
@@ -48,6 +48,9 @@ import java.time.Duration;
* </ul>
*/
public class RangeTimeSeriesRequest {
+ // TODO: It's not ideal to have another default, that plays with
numGroupsLimit, etc.
+ public static final int DEFAULT_SERIES_LIMIT = 100_000;
+ public static final int DEFAULT_NUM_GROUPS_LIMIT = -1;
/** Engine allows a Pinot cluster to support multiple Time-Series Query
Languages. */
private final String _language;
/** Query is the raw query sent by the caller. */
@@ -63,11 +66,15 @@ public class RangeTimeSeriesRequest {
private final long _stepSeconds;
/** E2E timeout for the query. */
private final Duration _timeout;
+ /** Series limit for the query */
+ private final int _limit;
+ /** The numGroupsLimit value used in Pinot's Grouping Algorithm. */
+ private final int _numGroupsLimit;
/** Full query string to allow language implementations to pass custom
parameters. */
private final String _fullQueryString;
public RangeTimeSeriesRequest(String language, String query, long
startSeconds, long endSeconds, long stepSeconds,
- Duration timeout, String fullQueryString) {
+ Duration timeout, int limit, int numGroupsLimit, String fullQueryString)
{
Preconditions.checkState(endSeconds >= startSeconds, "Invalid range.
startSeconds "
+ "should be greater than or equal to endSeconds. Found
startSeconds=%s and endSeconds=%s",
startSeconds, endSeconds);
@@ -77,6 +84,8 @@ public class RangeTimeSeriesRequest {
_endSeconds = endSeconds;
_stepSeconds = stepSeconds;
_timeout = timeout;
+ _limit = limit;
+ _numGroupsLimit = numGroupsLimit;
_fullQueryString = fullQueryString;
}
@@ -104,6 +113,14 @@ public class RangeTimeSeriesRequest {
return _timeout;
}
+ public int getLimit() {
+ return _limit;
+ }
+
+ public int getNumGroupsLimit() {
+ return _numGroupsLimit;
+ }
+
public String getFullQueryString() {
return _fullQueryString;
}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
index 3deb4c68e6..b0c7046466 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
@@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Duration;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
@@ -44,6 +46,8 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
private final String _valueExpression;
private final AggInfo _aggInfo;
private final List<String> _groupByExpressions;
+ private final Map<String, String> _queryOptions;
+ private final int _limit;
@JsonCreator
public LeafTimeSeriesPlanNode(
@@ -52,7 +56,8 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
@JsonProperty("timeUnit") TimeUnit timeUnit,
@JsonProperty("offsetSeconds") Long offsetSeconds,
@JsonProperty("filterExpression") String filterExpression,
@JsonProperty("valueExpression") String valueExpression,
@JsonProperty("aggInfo") AggInfo aggInfo,
- @JsonProperty("groupByExpressions") List<String> groupByExpressions) {
+ @JsonProperty("groupByExpressions") List<String> groupByExpressions,
+ @JsonProperty("limit") int limit, @JsonProperty("queryOptions")
Map<String, String> queryOptions) {
super(id, inputs);
_tableName = tableName;
_timeColumn = timeColumn;
@@ -62,17 +67,19 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
_valueExpression = valueExpression;
_aggInfo = aggInfo;
_groupByExpressions = groupByExpressions;
+ _limit = limit <= 0 ? RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT : limit;
+ _queryOptions = queryOptions;
}
public LeafTimeSeriesPlanNode withAggInfo(AggInfo newAggInfo) {
return new LeafTimeSeriesPlanNode(_id, _inputs, _tableName, _timeColumn,
_timeUnit, _offsetSeconds,
- _filterExpression, _valueExpression, newAggInfo, _groupByExpressions);
+ _filterExpression, _valueExpression, newAggInfo, _groupByExpressions,
_limit, _queryOptions);
}
@Override
public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode>
newInputs) {
return new LeafTimeSeriesPlanNode(_id, newInputs, _tableName, _timeColumn,
_timeUnit, _offsetSeconds,
- _filterExpression, _valueExpression, _aggInfo, _groupByExpressions);
+ _filterExpression, _valueExpression, _aggInfo, _groupByExpressions,
_limit, _queryOptions);
}
@Override
@@ -83,8 +90,8 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
@Override
public String getExplainName() {
return String.format("LEAF_TIME_SERIES_PLAN_NODE(%s, table=%s,
timeExpr=%s, valueExpr=%s, aggInfo=%s, "
- + "groupBy=%s, filter=%s, offsetSeconds=%s)", _id, _tableName,
_timeColumn, _valueExpression,
- _aggInfo.getAggFunction(), _groupByExpressions, _filterExpression,
_offsetSeconds);
+ + "groupBy=%s, filter=%s, offsetSeconds=%s, limit=%s)", _id,
_tableName, _timeColumn, _valueExpression,
+ _aggInfo.getAggFunction(), _groupByExpressions, _filterExpression,
_offsetSeconds, _limit);
}
@Override
@@ -124,6 +131,14 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
return _groupByExpressions;
}
+ public int getLimit() {
+ return _limit;
+ }
+
+ public Map<String, String> getQueryOptions() {
+ return _queryOptions;
+ }
+
public String getEffectiveFilter(TimeBuckets timeBuckets) {
String filter = _filterExpression == null ? "" : _filterExpression;
long startTime =
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeStartExclusive() -
_offsetSeconds));
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
index d326ed49b5..bb65619ceb 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.tsdb.spi.plan;
+import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
@@ -33,6 +35,8 @@ public class LeafTimeSeriesPlanNodeTest {
private static final String TABLE = "myTable";
private static final String TIME_COLUMN = "orderTime";
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+ private static final int SERIES_LIMIT = 10;
+ private static final Map<String, String> QUERY_OPTIONS =
ImmutableMap.of("numGroupsLimit", "100000");
@Test
public void testGetEffectiveFilter() {
@@ -44,7 +48,7 @@ public class LeafTimeSeriesPlanNodeTest {
{
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TIME_UNIT, 0L, "", "value_col",
- new AggInfo("SUM", false, null),
Collections.singletonList("cityName"));
+ new AggInfo("SUM", false, null),
Collections.singletonList("cityName"), SERIES_LIMIT, QUERY_OPTIONS);
assertEquals(planNode.getEffectiveFilter(timeBuckets),
"orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " +
expectedEndTimeInFilter);
}
@@ -52,7 +56,7 @@ public class LeafTimeSeriesPlanNodeTest {
{
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TIME_UNIT, 123L, "", "value_col",
- new AggInfo("SUM", false, null),
Collections.singletonList("cityName"));
+ new AggInfo("SUM", false, null),
Collections.singletonList("cityName"), SERIES_LIMIT, QUERY_OPTIONS);
assertEquals(planNode.getEffectiveFilter(timeBuckets),
"orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime
<= " + (expectedEndTimeInFilter - 123));
}
@@ -60,7 +64,8 @@ public class LeafTimeSeriesPlanNodeTest {
{
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter,
- "value_col", new AggInfo("SUM", false, Collections.emptyMap()),
Collections.singletonList("cityName"));
+ "value_col", new AggInfo("SUM", false, Collections.emptyMap()),
Collections.singletonList("cityName"),
+ SERIES_LIMIT, QUERY_OPTIONS);
assertEquals(planNode.getEffectiveFilter(timeBuckets),
String.format("(%s) AND (orderTime > %s AND orderTime <= %s)",
nonEmptyFilter,
(expectedStartTimeInFilter - 123), (expectedEndTimeInFilter -
123)));
@@ -70,7 +75,7 @@ public class LeafTimeSeriesPlanNodeTest {
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TimeUnit.MILLISECONDS, 123L,
nonEmptyFilter, "value_col", new AggInfo("SUM", false,
Collections.emptyMap()),
- Collections.singletonList("cityName"));
+ Collections.singletonList("cityName"), SERIES_LIMIT,
QUERY_OPTIONS);
assertEquals(planNode.getEffectiveFilter(timeBuckets),
String.format("(%s) AND (orderTime > %s AND orderTime <= %s)",
nonEmptyFilter,
(expectedStartTimeInFilter * 1000 - 123 * 1000),
(expectedEndTimeInFilter * 1000 - 123 * 1000)));
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
index 71bf2323fd..a8eb68a5b9 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tsdb.spi.plan.serde;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -34,18 +35,24 @@ import static org.testng.Assert.assertTrue;
public class TimeSeriesPlanSerdeTest {
+ private static final int SERIES_LIMIT = 1000;
+ private static final Map<String, String> QUERY_OPTIONS =
ImmutableMap.of("numGroupsLimit", "1000");
+
@Test
public void testSerdeForScanFilterProjectNode() {
Map<String, String> aggParams = new HashMap<>();
aggParams.put("window", "5m");
-
+ // create leaf node
LeafTimeSeriesPlanNode leafTimeSeriesPlanNode =
new LeafTimeSeriesPlanNode("sfp#0", new ArrayList<>(), "myTable",
"myTimeColumn", TimeUnit.MILLISECONDS, 0L,
- "myFilterExpression", "myValueExpression", new AggInfo("SUM",
false, aggParams), new ArrayList<>());
+ "myFilterExpression", "myValueExpression", new AggInfo("SUM",
false, aggParams), new ArrayList<>(),
+ SERIES_LIMIT, QUERY_OPTIONS);
+ // serialize and deserialize to re-create another node
BaseTimeSeriesPlanNode planNode =
TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode));
assertTrue(planNode instanceof LeafTimeSeriesPlanNode);
LeafTimeSeriesPlanNode deserializedNode = (LeafTimeSeriesPlanNode)
planNode;
+ // assert that deserialized node is same as serialized node
assertEquals(deserializedNode.getTableName(), "myTable");
assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn");
assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS);
@@ -57,5 +64,7 @@ public class TimeSeriesPlanSerdeTest {
assertNotNull(deserializedNode.getAggInfo().getParams());
assertEquals(deserializedNode.getAggInfo().getParams().get("window"),
"5m");
assertEquals(deserializedNode.getGroupByExpressions().size(), 0);
+ assertEquals(deserializedNode.getLimit(), SERIES_LIMIT);
+ assertEquals(deserializedNode.getQueryOptions(), QUERY_OPTIONS);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]