This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ba11e52a7 Part-5: Fix Offset Handling and Effective Time Filter + 
Enable Group-By Expressions + Add Unit Tests and Minor Cleanup (#14104)
4ba11e52a7 is described below

commit 4ba11e52a76252082772529acb8fa37e2d12fb00
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Mon Sep 30 12:57:13 2024 -0500

    Part-5: Fix Offset Handling and Effective Time Filter + Enable Group-By 
Expressions + Add Unit Tests and Minor Cleanup (#14104)
---
 .../common/request/context/TimeSeriesContext.java  | 10 +--
 .../timeseries/TimeSeriesAggregationOperator.java  |  5 +-
 .../apache/pinot/core/plan/CombinePlanNode.java    |  2 +-
 .../apache/pinot/core/plan/TimeSeriesPlanNode.java |  2 +-
 .../core/query/executor/QueryExecutorTest.java     | 10 +--
 .../timeseries/PhysicalTimeSeriesPlanVisitor.java  | 28 ++++----
 .../PhysicalTimeSeriesPlanVisitorTest.java         | 80 ++++++++++++++++++++++
 .../tsdb/spi/plan/LeafTimeSeriesPlanNode.java      | 32 ++++-----
 .../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java  | 78 +++++++++++++++++++++
 .../spi/plan/serde/TimeSeriesPlanSerdeTest.java    |  4 +-
 10 files changed, 204 insertions(+), 47 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java
index 2290a617cc..ba7858ea11 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java
@@ -24,7 +24,7 @@ import org.apache.pinot.tsdb.spi.TimeBuckets;
 
 
 public class TimeSeriesContext {
-  private final String _engine;
+  private final String _language;
   private final String _timeColumn;
   private final TimeUnit _timeUnit;
   private final TimeBuckets _timeBuckets;
@@ -32,9 +32,9 @@ public class TimeSeriesContext {
   private final ExpressionContext _valueExpression;
   private final AggInfo _aggInfo;
 
-  public TimeSeriesContext(String engine, String timeColumn, TimeUnit 
timeUnit, TimeBuckets timeBuckets,
+  public TimeSeriesContext(String language, String timeColumn, TimeUnit 
timeUnit, TimeBuckets timeBuckets,
       Long offsetSeconds, ExpressionContext valueExpression, AggInfo aggInfo) {
-    _engine = engine;
+    _language = language;
     _timeColumn = timeColumn;
     _timeUnit = timeUnit;
     _timeBuckets = timeBuckets;
@@ -43,8 +43,8 @@ public class TimeSeriesContext {
     _aggInfo = aggInfo;
   }
 
-  public String getEngine() {
-    return _engine;
+  public String getLanguage() {
+    return _language;
   }
 
   public String getTimeColumn() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index 93ef05949b..c67dbfe240 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.timeseries;
 
 import com.google.common.collect.ImmutableList;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,7 +62,7 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
   public TimeSeriesAggregationOperator(
       String timeColumn,
       TimeUnit timeUnit,
-      Long timeOffset,
+      Long timeOffsetSeconds,
       AggInfo aggInfo,
       ExpressionContext valueExpression,
       List<String> groupByExpressions,
@@ -70,7 +71,7 @@ public class TimeSeriesAggregationOperator extends 
BaseOperator<TimeSeriesResult
       TimeSeriesBuilderFactory seriesBuilderFactory) {
     _timeColumn = timeColumn;
     _storedTimeUnit = timeUnit;
-    _timeOffset = timeOffset;
+    _timeOffset = timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds));
     _aggInfo = aggInfo;
     _valueExpression = valueExpression;
     _groupByExpressions = groupByExpressions;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index 43f6df531e..26a9208225 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -127,7 +127,7 @@ public class CombinePlanNode implements PlanNode {
 
     if (QueryContextUtils.isTimeSeriesQuery(_queryContext)) {
       return new TimeSeriesCombineOperator(new TimeSeriesAggResultsBlockMerger(
-          
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_queryContext.getTimeSeriesContext().getEngine()),
+          
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_queryContext.getTimeSeriesContext().getLanguage()),
           _queryContext.getTimeSeriesContext().getAggInfo()), operators, 
_queryContext, _executorService);
     } else if (_streamer != null
           && QueryContextUtils.isSelectionOnlyQuery(_queryContext) && 
_queryContext.getLimit() != 0) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
index b5e51e8e29..22e3d7b912 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java
@@ -47,7 +47,7 @@ public class TimeSeriesPlanNode implements PlanNode {
     _queryContext = queryContext;
     _timeSeriesContext = 
Objects.requireNonNull(queryContext.getTimeSeriesContext(),
         "Missing time-series context in TimeSeriesPlanNode");
-    _seriesBuilderFactory = 
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_timeSeriesContext.getEngine());
+    _seriesBuilderFactory = 
TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_timeSeriesContext.getLanguage());
   }
 
   @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index ff0e0ace8f..6ad0cc3fcb 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -95,7 +95,7 @@ public class QueryExecutorTest {
   private static final int NUM_SEGMENTS_TO_GENERATE = 2;
   private static final int NUM_EMPTY_SEGMENTS_TO_GENERATE = 2;
   private static final ExecutorService QUERY_RUNNERS = 
Executors.newFixedThreadPool(20);
-  private static final String TIME_SERIES_ENGINE_NAME = "QueryExecutorTest";
+  private static final String TIME_SERIES_LANGUAGE_NAME = "QueryExecutorTest";
   private static final String TIME_SERIES_TIME_COL_NAME = 
"orderCreatedTimestamp";
   private static final Long TIME_SERIES_TEST_START_TIME = 1726228400L;
 
@@ -171,7 +171,7 @@ public class QueryExecutorTest {
     _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), 
instanceDataManager, ServerMetrics.get());
 
     // Setup time series builder factory
-    
TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_ENGINE_NAME,
+    
TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_LANGUAGE_NAME,
         new SimpleTimeSeriesBuilderFactory());
   }
 
@@ -219,7 +219,7 @@ public class QueryExecutorTest {
   public void testTimeSeriesSumQuery() {
     TimeBuckets timeBuckets = 
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
     ExpressionContext valueExpression = 
ExpressionContext.forIdentifier("orderAmount");
-    TimeSeriesContext timeSeriesContext = new 
TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME,
+    TimeSeriesContext timeSeriesContext = new 
TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
         TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, 
valueExpression, new AggInfo("SUM"));
     QueryContext queryContext = 
getQueryContextForTimeSeries(timeSeriesContext);
     ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
@@ -235,7 +235,7 @@ public class QueryExecutorTest {
   public void testTimeSeriesMaxQuery() {
     TimeBuckets timeBuckets = 
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
     ExpressionContext valueExpression = 
ExpressionContext.forIdentifier("orderItemCount");
-    TimeSeriesContext timeSeriesContext = new 
TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME,
+    TimeSeriesContext timeSeriesContext = new 
TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
         TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, 
valueExpression, new AggInfo("MAX"));
     QueryContext queryContext = 
getQueryContextForTimeSeries(timeSeriesContext);
     ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
@@ -267,7 +267,7 @@ public class QueryExecutorTest {
   public void testTimeSeriesMinQuery() {
     TimeBuckets timeBuckets = 
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100);
     ExpressionContext valueExpression = 
ExpressionContext.forIdentifier("orderItemCount");
-    TimeSeriesContext timeSeriesContext = new 
TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME,
+    TimeSeriesContext timeSeriesContext = new 
TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME,
         TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, 
valueExpression, new AggInfo("MIN"));
     QueryContext queryContext = 
getQueryContextForTimeSeries(timeSeriesContext);
     ServerQueryRequest serverQueryRequest = new ServerQueryRequest(
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
index 5e42d1b4b5..dc7c704f29 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java
@@ -63,9 +63,9 @@ public class PhysicalTimeSeriesPlanVisitor {
     for (int index = 0; index < planNode.getChildren().size(); index++) {
       BaseTimeSeriesPlanNode childNode = planNode.getChildren().get(index);
       if (childNode instanceof LeafTimeSeriesPlanNode) {
-        LeafTimeSeriesPlanNode sfpNode = (LeafTimeSeriesPlanNode) childNode;
-        List<String> segments = 
context.getPlanIdToSegmentsMap().get(sfpNode.getId());
-        ServerQueryRequest serverQueryRequest = 
compileLeafServerQueryRequest(sfpNode, segments, context);
+        LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) childNode;
+        List<String> segments = 
context.getPlanIdToSegmentsMap().get(leafNode.getId());
+        ServerQueryRequest serverQueryRequest = 
compileLeafServerQueryRequest(leafNode, segments, context);
         TimeSeriesPhysicalTableScan physicalTableScan = new 
TimeSeriesPhysicalTableScan(childNode.getId(),
             serverQueryRequest, _queryExecutor, _executorService);
         planNode.getChildren().set(index, physicalTableScan);
@@ -75,26 +75,24 @@ public class PhysicalTimeSeriesPlanVisitor {
     }
   }
 
-  public ServerQueryRequest 
compileLeafServerQueryRequest(LeafTimeSeriesPlanNode sfpNode, List<String> 
segments,
+  public ServerQueryRequest 
compileLeafServerQueryRequest(LeafTimeSeriesPlanNode leafNode, List<String> 
segments,
       TimeSeriesExecutionContext context) {
-    return new ServerQueryRequest(compileQueryContext(sfpNode, context),
+    return new ServerQueryRequest(compileQueryContext(leafNode, context),
         segments, /* TODO: Pass metadata from request */ 
Collections.emptyMap(), _serverMetrics);
   }
 
-  public QueryContext compileQueryContext(LeafTimeSeriesPlanNode sfpNode, 
TimeSeriesExecutionContext context) {
+  public QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, 
TimeSeriesExecutionContext context) {
     FilterContext filterContext =
         RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(
-            sfpNode.getEffectiveFilter(context.getInitialTimeBuckets())));
-    List<ExpressionContext> groupByExpressions = 
sfpNode.getGroupByColumns().stream()
-        .map(ExpressionContext::forIdentifier).collect(Collectors.toList());
-    ExpressionContext valueExpression = 
RequestContextUtils.getExpression(sfpNode.getValueExpression());
+            leafNode.getEffectiveFilter(context.getInitialTimeBuckets())));
+    List<ExpressionContext> groupByExpressions = 
leafNode.getGroupByExpressions().stream()
+        .map(RequestContextUtils::getExpression).collect(Collectors.toList());
+    ExpressionContext valueExpression = 
RequestContextUtils.getExpression(leafNode.getValueExpression());
     TimeSeriesContext timeSeriesContext = new 
TimeSeriesContext(context.getLanguage(),
-        sfpNode.getTimeColumn(),
-        sfpNode.getTimeUnit(), context.getInitialTimeBuckets(), 
sfpNode.getOffset(),
-        valueExpression,
-        sfpNode.getAggInfo());
+        leafNode.getTimeColumn(), leafNode.getTimeUnit(), 
context.getInitialTimeBuckets(), leafNode.getOffsetSeconds(),
+        valueExpression, leafNode.getAggInfo());
     return new QueryContext.Builder()
-        .setTableName(sfpNode.getTableName())
+        .setTableName(leafNode.getTableName())
         .setFilter(filterContext)
         .setGroupByExpressions(groupByExpressions)
         .setSelectExpressions(Collections.emptyList())
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
new file mode 100644
index 0000000000..81b03fa131
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class PhysicalTimeSeriesPlanVisitorTest {
+  @Test
+  public void testCompileQueryContext() {
+    final String planId = "id";
+    final String tableName = "orderTable";
+    final String timeColumn = "orderTime";
+    final AggInfo aggInfo = new AggInfo("SUM");
+    final String filterExpr = "cityName = 'Chicago'";
+    // Case-1: Without offset, simple column based group-by expression, simple 
column based value, and non-empty filter.
+    {
+      TimeSeriesExecutionContext context =
+          new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L, 
Duration.ofSeconds(10), 100),
+              Collections.emptyMap());
+      LeafTimeSeriesPlanNode leafNode =
+          new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), 
tableName, timeColumn, TimeUnit.SECONDS, 0L,
+              filterExpr, "orderCount", aggInfo, 
Collections.singletonList("cityName"));
+      QueryContext queryContext = 
PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context);
+      assertNotNull(queryContext.getTimeSeriesContext());
+      assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql");
+      assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 0L);
+      assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), 
timeColumn);
+      
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(),
 "orderCount");
+      assertEquals(queryContext.getFilter().toString(),
+          "(cityName = 'Chicago' AND orderTime >= '1000' AND orderTime <= 
'2000')");
+    }
+    // Case-2: With offset, complex group-by expression, complex value, and 
non-empty filter
+    {
+      TimeSeriesExecutionContext context =
+          new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L, 
Duration.ofSeconds(10), 100),
+              Collections.emptyMap());
+      LeafTimeSeriesPlanNode leafNode =
+          new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), 
tableName, timeColumn, TimeUnit.SECONDS, 10L,
+              filterExpr, "orderCount*2", aggInfo, 
Collections.singletonList("concat(cityName, stateName, '-')"));
+      QueryContext queryContext = 
PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context);
+      assertNotNull(queryContext);
+      assertNotNull(queryContext.getGroupByExpressions());
+      assertEquals("concat(cityName,stateName,'-')", 
queryContext.getGroupByExpressions().get(0).toString());
+      assertNotNull(queryContext.getTimeSeriesContext());
+      assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql");
+      assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 
10L);
+      assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), 
timeColumn);
+      
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().toString(),
 "times(orderCount,'2')");
+      assertNotNull(queryContext.getFilter());
+      assertEquals(queryContext.getFilter().toString(),
+          "(cityName = 'Chicago' AND orderTime >= '990' AND orderTime <= 
'1990')");
+    }
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
index 18d6316776..c5f438596c 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
@@ -40,30 +40,29 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
   private final String _tableName;
   private final String _timeColumn;
   private final TimeUnit _timeUnit;
-  private final Long _offset;
+  private final Long _offsetSeconds;
   private final String _filterExpression;
   private final String _valueExpression;
   private final AggInfo _aggInfo;
-  private final List<String> _groupByColumns;
+  private final List<String> _groupByExpressions;
 
   @JsonCreator
   public LeafTimeSeriesPlanNode(
       @JsonProperty("id") String id, @JsonProperty("children") 
List<BaseTimeSeriesPlanNode> children,
       @JsonProperty("tableName") String tableName, @JsonProperty("timeColumn") 
String timeColumn,
-      @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offset") 
Long offset,
+      @JsonProperty("timeUnit") TimeUnit timeUnit, 
@JsonProperty("offsetSeconds") Long offsetSeconds,
       @JsonProperty("filterExpression") String filterExpression,
-      @JsonProperty("valueExpression") String valueExpression,
-      @JsonProperty("aggInfo") AggInfo aggInfo, 
@JsonProperty("groupByColumns") List<String> groupByColumns) {
+      @JsonProperty("valueExpression") String valueExpression, 
@JsonProperty("aggInfo") AggInfo aggInfo,
+      @JsonProperty("groupByExpressions") List<String> groupByExpressions) {
     super(id, children);
     _tableName = tableName;
     _timeColumn = timeColumn;
     _timeUnit = timeUnit;
-    // TODO: This is broken technically. Adjust offset to meet TimeUnit 
resolution. For now use 0 offset.
-    _offset = offset;
+    _offsetSeconds = offsetSeconds;
     _filterExpression = filterExpression;
     _valueExpression = valueExpression;
     _aggInfo = aggInfo;
-    _groupByColumns = groupByColumns;
+    _groupByExpressions = groupByExpressions;
   }
 
   @Override
@@ -78,7 +77,7 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
 
   @Override
   public BaseTimeSeriesOperator run() {
-    throw new UnsupportedOperationException("");
+    throw new UnsupportedOperationException("Leaf plan node is replaced with a 
physical plan node at runtime");
   }
 
   public String getTableName() {
@@ -93,8 +92,8 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
     return _timeUnit;
   }
 
-  public Long getOffset() {
-    return _offset;
+  public Long getOffsetSeconds() {
+    return _offsetSeconds;
   }
 
   public String getFilterExpression() {
@@ -109,15 +108,16 @@ public class LeafTimeSeriesPlanNode extends 
BaseTimeSeriesPlanNode {
     return _aggInfo;
   }
 
-  public List<String> getGroupByColumns() {
-    return _groupByColumns;
+  public List<String> getGroupByExpressions() {
+    return _groupByExpressions;
   }
 
   public String getEffectiveFilter(TimeBuckets timeBuckets) {
     String filter = _filterExpression == null ? "" : _filterExpression;
-    // TODO: This is wrong. offset should be converted to seconds before 
arithmetic. For now use 0 offset.
-    long startTime = 
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - _offset));
-    long endTime = 
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getEndTime() - _offset));
+    long startTime = 
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - 
_offsetSeconds));
+    long endTime =
+        _timeUnit.convert(Duration.ofSeconds(
+            timeBuckets.getEndTime() + timeBuckets.getBucketSize().toSeconds() 
- _offsetSeconds));
     String addnFilter = String.format("%s >= %d AND %s <= %d", _timeColumn, 
startTime, _timeColumn, endTime);
     if (filter.strip().isEmpty()) {
       return addnFilter;
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
new file mode 100644
index 0000000000..82694e19a0
--- /dev/null
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.plan;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class LeafTimeSeriesPlanNodeTest {
+  private static final String ID = "plan_id123";
+  private static final String TABLE = "myTable";
+  private static final String TIME_COLUMN = "orderTime";
+  private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+
+  @Test
+  public void testGetEffectiveFilter() {
+    TimeBuckets timeBuckets = TimeBuckets.ofSeconds(1000, 
Duration.ofSeconds(13), 9);
+    final long expectedStartTimeInFilter = 1000;
+    final long expectedEndTimeInFilter = 1000 + 13 * 9;
+    final String nonEmptyFilter = "cityName = 'Chicago'";
+    // Case-1: No offset, and empty filter.
+    {
+      LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, 
Collections.emptyList(), TABLE, TIME_COLUMN,
+          TIME_UNIT, 0L, "", "value_col", new AggInfo("SUM"),
+          Collections.singletonList("cityName"));
+      assertEquals(planNode.getEffectiveFilter(timeBuckets),
+          "orderTime >= " + expectedStartTimeInFilter + " AND orderTime <= " + 
expectedEndTimeInFilter);
+    }
+    // Case-2: Offset, but empty filter
+    {
+      LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, 
Collections.emptyList(), TABLE, TIME_COLUMN,
+          TIME_UNIT, 123L, "", "value_col", new AggInfo("SUM"),
+          Collections.singletonList("cityName"));
+      assertEquals(planNode.getEffectiveFilter(timeBuckets),
+          "orderTime >= " + (expectedStartTimeInFilter - 123) + " AND 
orderTime <= " + (expectedEndTimeInFilter - 123));
+    }
+    // Case-3: Offset and non-empty filter
+    {
+      LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, 
Collections.emptyList(), TABLE, TIME_COLUMN,
+          TIME_UNIT, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM"),
+          Collections.singletonList("cityName"));
+      assertEquals(planNode.getEffectiveFilter(timeBuckets),
+          String.format("(%s) AND (orderTime >= %s AND orderTime <= %s)", 
nonEmptyFilter,
+              (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 
123)));
+    }
+    // Case-4: Offset, and non-empty filter, and time-unit that is not seconds
+    {
+      LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, 
Collections.emptyList(), TABLE, TIME_COLUMN,
+          TimeUnit.MILLISECONDS, 123L, nonEmptyFilter, "value_col", new 
AggInfo("SUM"),
+          Collections.singletonList("cityName"));
+      assertEquals(planNode.getEffectiveFilter(timeBuckets),
+          String.format("(%s) AND (orderTime >= %s AND orderTime <= %s)", 
nonEmptyFilter,
+              (expectedStartTimeInFilter * 1000 - 123 * 1000), 
(expectedEndTimeInFilter * 1000 - 123 * 1000)));
+    }
+  }
+}
diff --git 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
index a5015dc991..df66ea8fd9 100644
--- 
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
+++ 
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
@@ -43,10 +43,10 @@ public class TimeSeriesPlanSerdeTest {
     assertEquals(deserializedNode.getTableName(), "myTable");
     assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn");
     assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS);
-    assertEquals(deserializedNode.getOffset(), 0L);
+    assertEquals(deserializedNode.getOffsetSeconds(), 0L);
     assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression");
     assertEquals(deserializedNode.getValueExpression(), "myValueExpression");
     assertNotNull(deserializedNode.getAggInfo());
-    assertEquals(deserializedNode.getGroupByColumns().size(), 0);
+    assertEquals(deserializedNode.getGroupByExpressions().size(), 0);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to