This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4ba11e52a7 Part-5: Fix Offset Handling and Effective Time Filter + Enable Group-By Expressions + Add Unit Tests and Minor Cleanup (#14104) 4ba11e52a7 is described below commit 4ba11e52a76252082772529acb8fa37e2d12fb00 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Mon Sep 30 12:57:13 2024 -0500 Part-5: Fix Offset Handling and Effective Time Filter + Enable Group-By Expressions + Add Unit Tests and Minor Cleanup (#14104) --- .../common/request/context/TimeSeriesContext.java | 10 +-- .../timeseries/TimeSeriesAggregationOperator.java | 5 +- .../apache/pinot/core/plan/CombinePlanNode.java | 2 +- .../apache/pinot/core/plan/TimeSeriesPlanNode.java | 2 +- .../core/query/executor/QueryExecutorTest.java | 10 +-- .../timeseries/PhysicalTimeSeriesPlanVisitor.java | 28 ++++---- .../PhysicalTimeSeriesPlanVisitorTest.java | 80 ++++++++++++++++++++++ .../tsdb/spi/plan/LeafTimeSeriesPlanNode.java | 32 ++++----- .../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java | 78 +++++++++++++++++++++ .../spi/plan/serde/TimeSeriesPlanSerdeTest.java | 4 +- 10 files changed, 204 insertions(+), 47 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java b/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java index 2290a617cc..ba7858ea11 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/request/context/TimeSeriesContext.java @@ -24,7 +24,7 @@ import org.apache.pinot.tsdb.spi.TimeBuckets; public class TimeSeriesContext { - private final String _engine; + private final String _language; private final String _timeColumn; private final TimeUnit _timeUnit; private final TimeBuckets _timeBuckets; @@ -32,9 +32,9 @@ public class TimeSeriesContext { private final ExpressionContext _valueExpression; private final AggInfo _aggInfo; - public TimeSeriesContext(String engine, String timeColumn, TimeUnit timeUnit, TimeBuckets timeBuckets, + public TimeSeriesContext(String language, String timeColumn, TimeUnit timeUnit, TimeBuckets timeBuckets, Long offsetSeconds, ExpressionContext valueExpression, AggInfo aggInfo) { - _engine = engine; + _language = language; _timeColumn = timeColumn; _timeUnit = timeUnit; _timeBuckets = timeBuckets; @@ -43,8 +43,8 @@ public class TimeSeriesContext { _aggInfo = aggInfo; } - public String getEngine() { - return _engine; + public String getLanguage() { + return _language; } public String getTimeColumn() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java index 93ef05949b..c67dbfe240 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.operator.timeseries; import com.google.common.collect.ImmutableList; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,7 +62,7 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult public TimeSeriesAggregationOperator( String timeColumn, TimeUnit timeUnit, - Long timeOffset, + Long timeOffsetSeconds, AggInfo aggInfo, ExpressionContext valueExpression, List<String> groupByExpressions, @@ -70,7 +71,7 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult TimeSeriesBuilderFactory seriesBuilderFactory) { _timeColumn = timeColumn; _storedTimeUnit = timeUnit; - _timeOffset = timeOffset; + _timeOffset = timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds)); _aggInfo = aggInfo; _valueExpression = valueExpression; _groupByExpressions = groupByExpressions; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java index 43f6df531e..26a9208225 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java @@ -127,7 +127,7 @@ public class CombinePlanNode implements PlanNode { if (QueryContextUtils.isTimeSeriesQuery(_queryContext)) { return new TimeSeriesCombineOperator(new TimeSeriesAggResultsBlockMerger( - TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_queryContext.getTimeSeriesContext().getEngine()), + TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_queryContext.getTimeSeriesContext().getLanguage()), _queryContext.getTimeSeriesContext().getAggInfo()), operators, _queryContext, _executorService); } else if (_streamer != null && QueryContextUtils.isSelectionOnlyQuery(_queryContext) && _queryContext.getLimit() != 0) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java index b5e51e8e29..22e3d7b912 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java @@ -47,7 +47,7 @@ public class TimeSeriesPlanNode implements PlanNode { _queryContext = queryContext; _timeSeriesContext = Objects.requireNonNull(queryContext.getTimeSeriesContext(), "Missing time-series context in TimeSeriesPlanNode"); - _seriesBuilderFactory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_timeSeriesContext.getEngine()); + _seriesBuilderFactory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(_timeSeriesContext.getLanguage()); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index ff0e0ace8f..6ad0cc3fcb 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -95,7 +95,7 @@ public class QueryExecutorTest { private static final int NUM_SEGMENTS_TO_GENERATE = 2; private static final int NUM_EMPTY_SEGMENTS_TO_GENERATE = 2; private static final ExecutorService QUERY_RUNNERS = Executors.newFixedThreadPool(20); - private static final String TIME_SERIES_ENGINE_NAME = "QueryExecutorTest"; + private static final String TIME_SERIES_LANGUAGE_NAME = "QueryExecutorTest"; private static final String TIME_SERIES_TIME_COL_NAME = "orderCreatedTimestamp"; private static final Long TIME_SERIES_TEST_START_TIME = 1726228400L; @@ -171,7 +171,7 @@ public class QueryExecutorTest { _queryExecutor.init(new PinotConfiguration(queryExecutorConfig), instanceDataManager, ServerMetrics.get()); // Setup time series builder factory - TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_ENGINE_NAME, + TimeSeriesBuilderFactoryProvider.registerSeriesBuilderFactory(TIME_SERIES_LANGUAGE_NAME, new SimpleTimeSeriesBuilderFactory()); } @@ -219,7 +219,7 @@ public class QueryExecutorTest { public void testTimeSeriesSumQuery() { TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount"); - TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME, + TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM")); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); ServerQueryRequest serverQueryRequest = new ServerQueryRequest( @@ -235,7 +235,7 @@ public class QueryExecutorTest { public void testTimeSeriesMaxQuery() { TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); - TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME, + TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX")); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); ServerQueryRequest serverQueryRequest = new ServerQueryRequest( @@ -267,7 +267,7 @@ public class QueryExecutorTest { public void testTimeSeriesMinQuery() { TimeBuckets timeBuckets = TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofMinutes(1), 100); ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); - TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_ENGINE_NAME, TIME_SERIES_TIME_COL_NAME, + TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN")); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); ServerQueryRequest serverQueryRequest = new ServerQueryRequest( diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java index 5e42d1b4b5..dc7c704f29 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java @@ -63,9 +63,9 @@ public class PhysicalTimeSeriesPlanVisitor { for (int index = 0; index < planNode.getChildren().size(); index++) { BaseTimeSeriesPlanNode childNode = planNode.getChildren().get(index); if (childNode instanceof LeafTimeSeriesPlanNode) { - LeafTimeSeriesPlanNode sfpNode = (LeafTimeSeriesPlanNode) childNode; - List<String> segments = context.getPlanIdToSegmentsMap().get(sfpNode.getId()); - ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(sfpNode, segments, context); + LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) childNode; + List<String> segments = context.getPlanIdToSegmentsMap().get(leafNode.getId()); + ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(leafNode, segments, context); TimeSeriesPhysicalTableScan physicalTableScan = new TimeSeriesPhysicalTableScan(childNode.getId(), serverQueryRequest, _queryExecutor, _executorService); planNode.getChildren().set(index, physicalTableScan); @@ -75,26 +75,24 @@ public class PhysicalTimeSeriesPlanVisitor { } } - public ServerQueryRequest compileLeafServerQueryRequest(LeafTimeSeriesPlanNode sfpNode, List<String> segments, + public ServerQueryRequest compileLeafServerQueryRequest(LeafTimeSeriesPlanNode leafNode, List<String> segments, TimeSeriesExecutionContext context) { - return new ServerQueryRequest(compileQueryContext(sfpNode, context), + return new ServerQueryRequest(compileQueryContext(leafNode, context), segments, /* TODO: Pass metadata from request */ Collections.emptyMap(), _serverMetrics); } - public QueryContext compileQueryContext(LeafTimeSeriesPlanNode sfpNode, TimeSeriesExecutionContext context) { + public QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, TimeSeriesExecutionContext context) { FilterContext filterContext = RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression( - sfpNode.getEffectiveFilter(context.getInitialTimeBuckets()))); - List<ExpressionContext> groupByExpressions = sfpNode.getGroupByColumns().stream() - .map(ExpressionContext::forIdentifier).collect(Collectors.toList()); - ExpressionContext valueExpression = RequestContextUtils.getExpression(sfpNode.getValueExpression()); + leafNode.getEffectiveFilter(context.getInitialTimeBuckets()))); + List<ExpressionContext> groupByExpressions = leafNode.getGroupByExpressions().stream() + .map(RequestContextUtils::getExpression).collect(Collectors.toList()); + ExpressionContext valueExpression = RequestContextUtils.getExpression(leafNode.getValueExpression()); TimeSeriesContext timeSeriesContext = new TimeSeriesContext(context.getLanguage(), - sfpNode.getTimeColumn(), - sfpNode.getTimeUnit(), context.getInitialTimeBuckets(), sfpNode.getOffset(), - valueExpression, - sfpNode.getAggInfo()); + leafNode.getTimeColumn(), leafNode.getTimeUnit(), context.getInitialTimeBuckets(), leafNode.getOffsetSeconds(), + valueExpression, leafNode.getAggInfo()); return new QueryContext.Builder() - .setTableName(sfpNode.getTableName()) + .setTableName(leafNode.getTableName()) .setFilter(filterContext) .setGroupByExpressions(groupByExpressions) .setSelectExpressions(Collections.emptyList()) diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java new file mode 100644 index 0000000000..81b03fa131 --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.timeseries; + +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class PhysicalTimeSeriesPlanVisitorTest { + @Test + public void testCompileQueryContext() { + final String planId = "id"; + final String tableName = "orderTable"; + final String timeColumn = "orderTime"; + final AggInfo aggInfo = new AggInfo("SUM"); + final String filterExpr = "cityName = 'Chicago'"; + // Case-1: Without offset, simple column based group-by expression, simple column based value, and non-empty filter. + { + TimeSeriesExecutionContext context = + new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L, Duration.ofSeconds(10), 100), + Collections.emptyMap()); + LeafTimeSeriesPlanNode leafNode = + new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 0L, + filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName")); + QueryContext queryContext = PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context); + assertNotNull(queryContext.getTimeSeriesContext()); + assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql"); + assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 0L); + assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), timeColumn); + assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(), "orderCount"); + assertEquals(queryContext.getFilter().toString(), + "(cityName = 'Chicago' AND orderTime >= '1000' AND orderTime <= '2000')"); + } + // Case-2: With offset, complex group-by expression, complex value, and non-empty filter + { + TimeSeriesExecutionContext context = + new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L, Duration.ofSeconds(10), 100), + Collections.emptyMap()); + LeafTimeSeriesPlanNode leafNode = + new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 10L, + filterExpr, "orderCount*2", aggInfo, Collections.singletonList("concat(cityName, stateName, '-')")); + QueryContext queryContext = PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context); + assertNotNull(queryContext); + assertNotNull(queryContext.getGroupByExpressions()); + assertEquals("concat(cityName,stateName,'-')", queryContext.getGroupByExpressions().get(0).toString()); + assertNotNull(queryContext.getTimeSeriesContext()); + assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql"); + assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 10L); + assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), timeColumn); + assertEquals(queryContext.getTimeSeriesContext().getValueExpression().toString(), "times(orderCount,'2')"); + assertNotNull(queryContext.getFilter()); + assertEquals(queryContext.getFilter().toString(), + "(cityName = 'Chicago' AND orderTime >= '990' AND orderTime <= '1990')"); + } + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java index 18d6316776..c5f438596c 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java @@ -40,30 +40,29 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { private final String _tableName; private final String _timeColumn; private final TimeUnit _timeUnit; - private final Long _offset; + private final Long _offsetSeconds; private final String _filterExpression; private final String _valueExpression; private final AggInfo _aggInfo; - private final List<String> _groupByColumns; + private final List<String> _groupByExpressions; @JsonCreator public LeafTimeSeriesPlanNode( @JsonProperty("id") String id, @JsonProperty("children") List<BaseTimeSeriesPlanNode> children, @JsonProperty("tableName") String tableName, @JsonProperty("timeColumn") String timeColumn, - @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offset") Long offset, + @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offsetSeconds") Long offsetSeconds, @JsonProperty("filterExpression") String filterExpression, - @JsonProperty("valueExpression") String valueExpression, - @JsonProperty("aggInfo") AggInfo aggInfo, @JsonProperty("groupByColumns") List<String> groupByColumns) { + @JsonProperty("valueExpression") String valueExpression, @JsonProperty("aggInfo") AggInfo aggInfo, + @JsonProperty("groupByExpressions") List<String> groupByExpressions) { super(id, children); _tableName = tableName; _timeColumn = timeColumn; _timeUnit = timeUnit; - // TODO: This is broken technically. Adjust offset to meet TimeUnit resolution. For now use 0 offset. - _offset = offset; + _offsetSeconds = offsetSeconds; _filterExpression = filterExpression; _valueExpression = valueExpression; _aggInfo = aggInfo; - _groupByColumns = groupByColumns; + _groupByExpressions = groupByExpressions; } @Override @@ -78,7 +77,7 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { @Override public BaseTimeSeriesOperator run() { - throw new UnsupportedOperationException(""); + throw new UnsupportedOperationException("Leaf plan node is replaced with a physical plan node at runtime"); } public String getTableName() { @@ -93,8 +92,8 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { return _timeUnit; } - public Long getOffset() { - return _offset; + public Long getOffsetSeconds() { + return _offsetSeconds; } public String getFilterExpression() { @@ -109,15 +108,16 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { return _aggInfo; } - public List<String> getGroupByColumns() { - return _groupByColumns; + public List<String> getGroupByExpressions() { + return _groupByExpressions; } public String getEffectiveFilter(TimeBuckets timeBuckets) { String filter = _filterExpression == null ? "" : _filterExpression; - // TODO: This is wrong. offset should be converted to seconds before arithmetic. For now use 0 offset. - long startTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - _offset)); - long endTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getEndTime() - _offset)); + long startTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - _offsetSeconds)); + long endTime = + _timeUnit.convert(Duration.ofSeconds( + timeBuckets.getEndTime() + timeBuckets.getBucketSize().toSeconds() - _offsetSeconds)); String addnFilter = String.format("%s >= %d AND %s <= %d", _timeColumn, startTime, _timeColumn, endTime); if (filter.strip().isEmpty()) { return addnFilter; diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java new file mode 100644 index 0000000000..82694e19a0 --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.plan; + +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.tsdb.spi.AggInfo; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class LeafTimeSeriesPlanNodeTest { + private static final String ID = "plan_id123"; + private static final String TABLE = "myTable"; + private static final String TIME_COLUMN = "orderTime"; + private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; + + @Test + public void testGetEffectiveFilter() { + TimeBuckets timeBuckets = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(13), 9); + final long expectedStartTimeInFilter = 1000; + final long expectedEndTimeInFilter = 1000 + 13 * 9; + final String nonEmptyFilter = "cityName = 'Chicago'"; + // Case-1: No offset, and empty filter. + { + LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, + TIME_UNIT, 0L, "", "value_col", new AggInfo("SUM"), + Collections.singletonList("cityName")); + assertEquals(planNode.getEffectiveFilter(timeBuckets), + "orderTime >= " + expectedStartTimeInFilter + " AND orderTime <= " + expectedEndTimeInFilter); + } + // Case-2: Offset, but empty filter + { + LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, + TIME_UNIT, 123L, "", "value_col", new AggInfo("SUM"), + Collections.singletonList("cityName")); + assertEquals(planNode.getEffectiveFilter(timeBuckets), + "orderTime >= " + (expectedStartTimeInFilter - 123) + " AND orderTime <= " + (expectedEndTimeInFilter - 123)); + } + // Case-3: Offset and non-empty filter + { + LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, + TIME_UNIT, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM"), + Collections.singletonList("cityName")); + assertEquals(planNode.getEffectiveFilter(timeBuckets), + String.format("(%s) AND (orderTime >= %s AND orderTime <= %s)", nonEmptyFilter, + (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 123))); + } + // Case-4: Offset, and non-empty filter, and time-unit that is not seconds + { + LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, + TimeUnit.MILLISECONDS, 123L, nonEmptyFilter, "value_col", new AggInfo("SUM"), + Collections.singletonList("cityName")); + assertEquals(planNode.getEffectiveFilter(timeBuckets), + String.format("(%s) AND (orderTime >= %s AND orderTime <= %s)", nonEmptyFilter, + (expectedStartTimeInFilter * 1000 - 123 * 1000), (expectedEndTimeInFilter * 1000 - 123 * 1000))); + } + } +} diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java index a5015dc991..df66ea8fd9 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java @@ -43,10 +43,10 @@ public class TimeSeriesPlanSerdeTest { assertEquals(deserializedNode.getTableName(), "myTable"); assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn"); assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS); - assertEquals(deserializedNode.getOffset(), 0L); + assertEquals(deserializedNode.getOffsetSeconds(), 0L); assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression"); assertEquals(deserializedNode.getValueExpression(), "myValueExpression"); assertNotNull(deserializedNode.getAggInfo()); - assertEquals(deserializedNode.getGroupByColumns().size(), 0); + assertEquals(deserializedNode.getGroupByExpressions().size(), 0); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org