This is an automated email from the ASF dual-hosted git repository.

pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c7fcda980b [timeseries] Add Support for limit and numGroupsLimit 
(#14945)
c7fcda980b is described below

commit c7fcda980b686148952d055bed8d8e2390b7a530
Author: Ankit Sultana <[email protected]>
AuthorDate: Mon Feb 3 15:28:00 2025 -0600

    [timeseries] Add Support for limit and numGroupsLimit (#14945)
    
    * [timeseries] Add Support for limit and numGroupsLimit
    
    * fix bug and test
    
    * fix signature
    
    * add plan serde test
---
 .../requesthandler/TimeSeriesRequestHandler.java   | 11 +++++++++-
 .../timeseries/TimeSeriesOperatorUtils.java        |  5 +----
 .../pinot/tsdb/m3ql/M3TimeSeriesPlanner.java       | 14 +++++++++---
 .../PhysicalTimeSeriesServerPlanVisitor.java       |  7 +++---
 .../PhysicalTimeSeriesServerPlanVisitorTest.java   | 17 ++++++++++++---
 .../pinot/segment/spi/AggregationFunctionType.java |  2 +-
 .../tsdb/planner/TimeSeriesPlanFragmenterTest.java |  7 +++++-
 .../pinot/tsdb/spi/RangeTimeSeriesRequest.java     | 19 +++++++++++++++-
 .../tsdb/spi/plan/LeafTimeSeriesPlanNode.java      | 25 +++++++++++++++++-----
 .../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java  | 13 +++++++----
 .../spi/plan/serde/TimeSeriesPlanSerdeTest.java    | 13 +++++++++--
 11 files changed, 105 insertions(+), 28 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index d14f286013..ac6962c592 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -150,6 +150,8 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
     Long endTs = null;
     String step = null;
     String timeoutStr = null;
+    int limit = RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT;
+    int numGroupsLimit = RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT;
     for (NameValuePair nameValuePair : pairs) {
       switch (nameValuePair.getName()) {
         case "query":
@@ -167,6 +169,12 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
         case "timeout":
           timeoutStr = nameValuePair.getValue();
           break;
+        case "limit":
+          limit = Integer.parseInt(nameValuePair.getValue());
+          break;
+        case "numGroupsLimit":
+          numGroupsLimit = Integer.parseInt(nameValuePair.getValue());
+          break;
         default:
           /* Okay to ignore unknown parameters since the language implementor 
may be using them. */
           break;
@@ -182,7 +190,8 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
       timeout = HumanReadableDuration.from(timeoutStr);
     }
     // TODO: Pass full raw query param string to the request
-    return new RangeTimeSeriesRequest(language, query, startTs, endTs, 
stepSeconds, timeout, queryParamString);
+    return new RangeTimeSeriesRequest(language, query, startTs, endTs, 
stepSeconds, timeout, limit, numGroupsLimit,
+        queryParamString);
   }
 
   public static Long getStepSeconds(@Nullable String step) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
index 236d4f858f..7f095c47d1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesOperatorUtils.java
@@ -44,10 +44,7 @@ public class TimeSeriesOperatorUtils {
     if (groupByResultsBlock.getNumRows() == 0) {
       return new TimeSeriesBlock(timeBuckets, new HashMap<>());
     }
-    if (groupByResultsBlock.isNumGroupsLimitReached()) {
-      throw new IllegalStateException(String.format("Series limit reached. 
Number of series: %s",
-          groupByResultsBlock.getNumRows()));
-    }
+    // TODO: Check isNumGroupsLimitReached, and propagate it somehow to the 
caller.
     Map<Long, List<TimeSeries>> timeSeriesMap = new 
HashMap<>(groupByResultsBlock.getNumRows());
     List<String> tagNames = 
getTagNamesFromDataSchema(Objects.requireNonNull(groupByResultsBlock.getDataSchema(),
         "DataSchema is null in leaf stage of time-series query"));
diff --git 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
index 42515083c0..e38ae76d53 100644
--- 
a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
+++ 
b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java
@@ -21,8 +21,10 @@ package org.apache.pinot.tsdb.m3ql;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -78,7 +80,8 @@ public class M3TimeSeriesPlanner implements 
TimeSeriesLogicalPlanner {
       switch (command) {
         case "fetch":
           List<String> tokens = commands.get(commandId).subList(1, 
commands.get(commandId).size());
-          currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, 
children, aggInfo, groupByColumns);
+          currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, 
children, aggInfo, groupByColumns,
+              request);
           break;
         case "sum":
         case "min":
@@ -118,7 +121,8 @@ public class M3TimeSeriesPlanner implements 
TimeSeriesLogicalPlanner {
   }
 
   public BaseTimeSeriesPlanNode handleFetchNode(String planId, List<String> 
tokens,
-      List<BaseTimeSeriesPlanNode> children, AggInfo aggInfo, List<String> 
groupByColumns) {
+      List<BaseTimeSeriesPlanNode> children, AggInfo aggInfo, List<String> 
groupByColumns,
+      RangeTimeSeriesRequest request) {
     Preconditions.checkState(tokens.size() % 2 == 0, "Mismatched args");
     String tableName = null;
     String timeColumn = null;
@@ -152,7 +156,11 @@ public class M3TimeSeriesPlanner implements 
TimeSeriesLogicalPlanner {
     Preconditions.checkNotNull(timeColumn, "Time column not set. Set via 
time_col=");
     Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via 
time_unit=");
     Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via 
value=");
+    Map<String, String> queryOptions = new HashMap<>();
+    if (request.getNumGroupsLimit() > 0) {
+      queryOptions.put("numGroupsLimit", 
Integer.toString(request.getNumGroupsLimit()));
+    }
     return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, 
timeUnit, 0L, filter, valueExpr, aggInfo,
-        groupByColumns);
+        groupByColumns, request.getLimit(), queryOptions);
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
index cfcd80a9e7..72857d403b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitor.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.query.runtime.timeseries;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -111,14 +110,16 @@ public class PhysicalTimeSeriesServerPlanVisitor {
         leafNode.getTimeUnit(), timeBuckets, leafNode.getOffsetSeconds() == 
null ? 0 : leafNode.getOffsetSeconds());
     ExpressionContext aggregation = 
TimeSeriesAggregationFunction.create(context.getLanguage(),
         leafNode.getValueExpression(), timeTransform, timeBuckets, 
leafNode.getAggInfo());
+    Map<String, String> queryOptions = new 
HashMap<>(leafNode.getQueryOptions());
+    queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(Math.max(0L, 
context.getRemainingTimeMs())));
     return new QueryContext.Builder()
         .setTableName(leafNode.getTableName())
         .setFilter(filterContext)
         .setGroupByExpressions(groupByExpressions)
         .setSelectExpressions(List.of(aggregation))
-        .setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS, 
Long.toString(context.getRemainingTimeMs())))
+        .setQueryOptions(queryOptions)
         .setAliasList(Collections.emptyList())
-        .setLimit(Integer.MAX_VALUE)
+        .setLimit(leafNode.getLimit())
         .build();
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
index f98e4228e0..66654b0117 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.query.runtime.timeseries;
 
+import com.google.common.collect.ImmutableMap;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -27,6 +29,7 @@ import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
@@ -43,6 +46,8 @@ import static org.testng.Assert.assertTrue;
 public class PhysicalTimeSeriesServerPlanVisitorTest {
   private static final String LANGUAGE = "m3ql";
   private static final int DUMMY_DEADLINE_MS = 10_000;
+  private static final int SERIES_LIMIT = 1000;
+  private static final Map<String, String> QUERY_OPTIONS = 
Collections.emptyMap();
 
   @BeforeClass
   public void setUp() {
@@ -65,20 +70,24 @@ public class PhysicalTimeSeriesServerPlanVisitorTest {
               DUMMY_DEADLINE_MS, Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap());
       LeafTimeSeriesPlanNode leafNode =
           new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), 
tableName, timeColumn, TimeUnit.SECONDS, 0L,
-              filterExpr, "orderCount", aggInfo, 
Collections.singletonList("cityName"));
+              filterExpr, "orderCount", aggInfo, 
Collections.singletonList("cityName"), SERIES_LIMIT,
+              QUERY_OPTIONS);
       QueryContext queryContext = 
serverPlanVisitor.compileQueryContext(leafNode, context);
       assertEquals(queryContext.getFilter().toString(),
           "(cityName = 'Chicago' AND orderTime > '990' AND orderTime <= 
'1990')");
       
assertTrue(isNumber(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)));
+      assertEquals(queryContext.getLimit(), SERIES_LIMIT);
     }
-    // Case-2: With offset, complex group-by expression, complex value, and 
non-empty filter
+    // Case-2: With offset, complex group-by expression, complex value, 
non-empty filter, 0 limit, query options.
     {
+      Map<String, String> queryOptions = ImmutableMap.of("numGroupsLimit", 
"1000");
       TimeSeriesExecutionContext context =
           new TimeSeriesExecutionContext(LANGUAGE, 
TimeBuckets.ofSeconds(1000L, Duration.ofSeconds(10), 100),
               DUMMY_DEADLINE_MS, Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap());
       LeafTimeSeriesPlanNode leafNode =
           new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), 
tableName, timeColumn, TimeUnit.SECONDS, 10L,
-              filterExpr, "orderCount*2", aggInfo, 
Collections.singletonList("concat(cityName, stateName, '-')"));
+              filterExpr, "orderCount*2", aggInfo, 
Collections.singletonList("concat(cityName, stateName, '-')"),
+              0 /* limit */, queryOptions);
       QueryContext queryContext = 
serverPlanVisitor.compileQueryContext(leafNode, context);
       assertNotNull(queryContext);
       assertNotNull(queryContext.getGroupByExpressions());
@@ -87,6 +96,8 @@ public class PhysicalTimeSeriesServerPlanVisitorTest {
       assertEquals(queryContext.getFilter().toString(),
           "(cityName = 'Chicago' AND orderTime > '980' AND orderTime <= 
'1980')");
       
assertTrue(isNumber(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)));
+      assertEquals(queryContext.getLimit(), 
RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT);
+      assertEquals(queryContext.getQueryOptions().get("numGroupsLimit"), 
"1000");
     }
   }
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 74e7a135b4..4f0f0c48f9 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -223,7 +223,7 @@ public enum AggregationFunctionType {
   PERCENTILERAWKLLMV("percentileRawKLLMV", ReturnTypes.VARCHAR,
       OperandTypes.family(List.of(SqlTypeFamily.ARRAY, SqlTypeFamily.NUMERIC, 
SqlTypeFamily.INTEGER), i -> i == 2),
       SqlTypeName.OTHER),
-  TIMESERIESAGGREGATE("timeSeriesAggregate", SqlTypeName.OTHER, 
SqlTypeName.VARCHAR);
+  TIMESERIESAGGREGATE("timeSeriesAggregate", SqlTypeName.OTHER, 
SqlTypeName.OTHER);
 
   private static final Set<String> NAMES = Arrays.stream(values())
       .flatMap(func -> Stream.of(func.name(), func.getName(), 
func.getName().toLowerCase()))
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
index 8727f64ddc..8ae40ebbba 100644
--- 
a/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
+++ 
b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.tsdb.planner;
 import com.google.common.collect.ImmutableList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
@@ -31,6 +32,9 @@ import static org.testng.Assert.*;
 
 
 public class TimeSeriesPlanFragmenterTest {
+  private static final int SERIES_LIMIT = 1000;
+  private static final Map<String, String> QUERY_OPTIONS = 
Collections.emptyMap();
+
   @Test
   public void testGetFragmentsWithMultipleLeafNodes() {
     /*
@@ -136,7 +140,8 @@ public class TimeSeriesPlanFragmenterTest {
 
   private LeafTimeSeriesPlanNode createMockLeafNode(String id) {
     return new LeafTimeSeriesPlanNode(id, Collections.emptyList(), 
"someTableName", "someTimeColumn",
-        TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList());
+        TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList(),
+        SERIES_LIMIT, QUERY_OPTIONS);
   }
 
   static class MockTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
index ecbc3b3f6b..f44f181ff9 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
@@ -48,6 +48,9 @@ import java.time.Duration;
  * </ul>
  */
 public class RangeTimeSeriesRequest {
+  // TODO: It's not ideal to have another default, that plays with 
numGroupsLimit, etc.
+  public static final int DEFAULT_SERIES_LIMIT = 100_000;
+  public static final int DEFAULT_NUM_GROUPS_LIMIT = -1;
   /** Engine allows a Pinot cluster to support multiple Time-Series Query 
Languages. */
   private final String _language;
   /** Query is the raw query sent by the caller. */
@@ -63,11 +66,15 @@ public class RangeTimeSeriesRequest {
   private final long _stepSeconds;
   /** E2E timeout for the query. */
   private final Duration _timeout;
+  /** Series limit for the query */
+  private final int _limit;
+  /** The numGroupsLimit value used in Pinot's Grouping Algorithm. */
+  private final int _numGroupsLimit;
   /** Full query string to allow language implementations to pass custom 
parameters. */
   private final String _fullQueryString;
 
   public RangeTimeSeriesRequest(String language, String query, long 
startSeconds, long endSeconds, long stepSeconds,
-      Duration timeout, String fullQueryString) {
+      Duration timeout, int limit, int numGroupsLimit, String fullQueryString) 
{
     Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. 
startSeconds "
         + "should be greater than or equal to endSeconds. Found 
startSeconds=%s and endSeconds=%s",
         startSeconds, endSeconds);
@@ -77,6 +84,8 @@ public class RangeTimeSeriesRequest {
     _endSeconds = endSeconds;
     _stepSeconds = stepSeconds;
     _timeout = timeout;
+    _limit = limit;
+    _numGroupsLimit = numGroupsLimit;
     _fullQueryString = fullQueryString;
   }
 
@@ -104,6 +113,14 @@ public class RangeTimeSeriesRequest {
     return _timeout;
   }
 
+  public int getLimit() {
+    return _limit;
+  }
+
+  public int getNumGroupsLimit() {
+    return _numGroupsLimit;
+  }
+
   public String getFullQueryString() {
     return _fullQueryString;
   }
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
index 3deb4c68e6..b0c7046466 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
@@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import java.time.Duration;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
 import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
@@ -44,6 +46,8 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
   private final String _valueExpression;
   private final AggInfo _aggInfo;
   private final List<String> _groupByExpressions;
+  private final Map<String, String> _queryOptions;
+  private final int _limit;
 
   @JsonCreator
   public LeafTimeSeriesPlanNode(
@@ -52,7 +56,8 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
       @JsonProperty("timeUnit") TimeUnit timeUnit, 
@JsonProperty("offsetSeconds") Long offsetSeconds,
       @JsonProperty("filterExpression") String filterExpression,
       @JsonProperty("valueExpression") String valueExpression, 
@JsonProperty("aggInfo") AggInfo aggInfo,
-      @JsonProperty("groupByExpressions") List<String> groupByExpressions) {
+      @JsonProperty("groupByExpressions") List<String> groupByExpressions,
+      @JsonProperty("limit") int limit, @JsonProperty("queryOptions") 
Map<String, String> queryOptions) {
     super(id, inputs);
     _tableName = tableName;
     _timeColumn = timeColumn;
@@ -62,17 +67,19 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
     _valueExpression = valueExpression;
     _aggInfo = aggInfo;
     _groupByExpressions = groupByExpressions;
+    _limit = limit <= 0 ? RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT : limit;
+    _queryOptions = queryOptions;
   }
 
   public LeafTimeSeriesPlanNode withAggInfo(AggInfo newAggInfo) {
     return new LeafTimeSeriesPlanNode(_id, _inputs, _tableName, _timeColumn, 
_timeUnit, _offsetSeconds,
-        _filterExpression, _valueExpression, newAggInfo, _groupByExpressions);
+        _filterExpression, _valueExpression, newAggInfo, _groupByExpressions, 
_limit, _queryOptions);
   }
 
   @Override
   public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> 
newInputs) {
     return new LeafTimeSeriesPlanNode(_id, newInputs, _tableName, _timeColumn, 
_timeUnit, _offsetSeconds,
-        _filterExpression, _valueExpression, _aggInfo, _groupByExpressions);
+        _filterExpression, _valueExpression, _aggInfo, _groupByExpressions, 
_limit, _queryOptions);
   }
 
   @Override
@@ -83,8 +90,8 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
   @Override
   public String getExplainName() {
     return String.format("LEAF_TIME_SERIES_PLAN_NODE(%s, table=%s, 
timeExpr=%s, valueExpr=%s, aggInfo=%s, "
-        + "groupBy=%s, filter=%s, offsetSeconds=%s)", _id, _tableName, 
_timeColumn, _valueExpression,
-        _aggInfo.getAggFunction(), _groupByExpressions, _filterExpression, 
_offsetSeconds);
+        + "groupBy=%s, filter=%s, offsetSeconds=%s, limit=%s)", _id, 
_tableName, _timeColumn, _valueExpression,
+        _aggInfo.getAggFunction(), _groupByExpressions, _filterExpression, 
_offsetSeconds, _limit);
   }
 
   @Override
@@ -124,6 +131,14 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
     return _groupByExpressions;
   }
 
+  public int getLimit() {
+    return _limit;
+  }
+
+  public Map<String, String> getQueryOptions() {
+    return _queryOptions;
+  }
+
   public String getEffectiveFilter(TimeBuckets timeBuckets) {
     String filter = _filterExpression == null ? "" : _filterExpression;
     long startTime = 
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeStartExclusive() - 
_offsetSeconds));
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
index d326ed49b5..bb65619ceb 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.tsdb.spi.plan;
 
+import com.google.common.collect.ImmutableMap;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
@@ -33,6 +35,8 @@ public class LeafTimeSeriesPlanNodeTest {
   private static final String TABLE = "myTable";
   private static final String TIME_COLUMN = "orderTime";
   private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+  private static final int SERIES_LIMIT = 10;
+  private static final Map<String, String> QUERY_OPTIONS = 
ImmutableMap.of("numGroupsLimit", "100000");
 
   @Test
   public void testGetEffectiveFilter() {
@@ -44,7 +48,7 @@ public class LeafTimeSeriesPlanNodeTest {
     {
       LeafTimeSeriesPlanNode planNode =
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TIME_UNIT, 0L, "", "value_col",
-              new AggInfo("SUM", false, null), 
Collections.singletonList("cityName"));
+              new AggInfo("SUM", false, null), 
Collections.singletonList("cityName"), SERIES_LIMIT, QUERY_OPTIONS);
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
           "orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " + 
expectedEndTimeInFilter);
     }
@@ -52,7 +56,7 @@ public class LeafTimeSeriesPlanNodeTest {
     {
       LeafTimeSeriesPlanNode planNode =
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TIME_UNIT, 123L, "", "value_col",
-              new AggInfo("SUM", false, null), 
Collections.singletonList("cityName"));
+              new AggInfo("SUM", false, null), 
Collections.singletonList("cityName"), SERIES_LIMIT, QUERY_OPTIONS);
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
           "orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime 
<= " + (expectedEndTimeInFilter - 123));
     }
@@ -60,7 +64,8 @@ public class LeafTimeSeriesPlanNodeTest {
     {
       LeafTimeSeriesPlanNode planNode =
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter,
-              "value_col", new AggInfo("SUM", false, Collections.emptyMap()), 
Collections.singletonList("cityName"));
+              "value_col", new AggInfo("SUM", false, Collections.emptyMap()), 
Collections.singletonList("cityName"),
+              SERIES_LIMIT, QUERY_OPTIONS);
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
           String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", 
nonEmptyFilter,
               (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 
123)));
@@ -70,7 +75,7 @@ public class LeafTimeSeriesPlanNodeTest {
       LeafTimeSeriesPlanNode planNode =
           new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, 
TIME_COLUMN, TimeUnit.MILLISECONDS, 123L,
               nonEmptyFilter, "value_col", new AggInfo("SUM", false, 
Collections.emptyMap()),
-              Collections.singletonList("cityName"));
+              Collections.singletonList("cityName"), SERIES_LIMIT, 
QUERY_OPTIONS);
       assertEquals(planNode.getEffectiveFilter(timeBuckets),
           String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", 
nonEmptyFilter,
               (expectedStartTimeInFilter * 1000 - 123 * 1000), 
(expectedEndTimeInFilter * 1000 - 123 * 1000)));
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
index 71bf2323fd..a8eb68a5b9 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.tsdb.spi.plan.serde;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -34,18 +35,24 @@ import static org.testng.Assert.assertTrue;
 
 
 public class TimeSeriesPlanSerdeTest {
+  private static final int SERIES_LIMIT = 1000;
+  private static final Map<String, String> QUERY_OPTIONS = 
ImmutableMap.of("numGroupsLimit", "1000");
+
   @Test
   public void testSerdeForScanFilterProjectNode() {
     Map<String, String> aggParams = new HashMap<>();
     aggParams.put("window", "5m");
-
+    // create leaf node
     LeafTimeSeriesPlanNode leafTimeSeriesPlanNode =
         new LeafTimeSeriesPlanNode("sfp#0", new ArrayList<>(), "myTable", 
"myTimeColumn", TimeUnit.MILLISECONDS, 0L,
-            "myFilterExpression", "myValueExpression", new AggInfo("SUM", 
false, aggParams), new ArrayList<>());
+            "myFilterExpression", "myValueExpression", new AggInfo("SUM", 
false, aggParams), new ArrayList<>(),
+            SERIES_LIMIT, QUERY_OPTIONS);
+    // serialize and deserialize to re-create another node
     BaseTimeSeriesPlanNode planNode =
         
TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode));
     assertTrue(planNode instanceof LeafTimeSeriesPlanNode);
     LeafTimeSeriesPlanNode deserializedNode = (LeafTimeSeriesPlanNode) 
planNode;
+    // assert that deserialized node is same as serialized node
     assertEquals(deserializedNode.getTableName(), "myTable");
     assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn");
     assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS);
@@ -57,5 +64,7 @@ public class TimeSeriesPlanSerdeTest {
     assertNotNull(deserializedNode.getAggInfo().getParams());
     assertEquals(deserializedNode.getAggInfo().getParams().get("window"), 
"5m");
     assertEquals(deserializedNode.getGroupByExpressions().size(), 0);
+    assertEquals(deserializedNode.getLimit(), SERIES_LIMIT);
+    assertEquals(deserializedNode.getQueryOptions(), QUERY_OPTIONS);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to