This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c5fbe5b23d [timeseries] Make Time Buckets Half Open on the Left
(#14413)
c5fbe5b23d is described below
commit c5fbe5b23d4ab7ac7819b7a9744eb9e3b9562f8c
Author: Ankit Sultana <[email protected]>
AuthorDate: Mon Nov 11 15:24:47 2024 -0600
[timeseries] Make Time Buckets Half Open on the Left (#14413)
---
.../timeseries/TimeSeriesAggregationOperator.java | 38 ++++-----
.../TimeSeriesAggregationOperatorTest.java | 96 ++++++++++++++++++++++
.../core/query/executor/QueryExecutorTest.java | 6 +-
.../query/service/dispatch/QueryDispatcher.java | 11 ++-
.../PhysicalTimeSeriesPlanVisitorTest.java | 4 +-
.../org/apache/pinot/tsdb/spi/TimeBuckets.java | 37 ++++++---
.../tsdb/spi/plan/LeafTimeSeriesPlanNode.java | 12 ++-
.../org/apache/pinot/tsdb/spi/TimeBucketsTest.java | 53 ++++++++++++
.../tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java | 12 +--
9 files changed, 214 insertions(+), 55 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
index 25ee168ef8..fd895a4607 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.operator.timeseries;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.HashMap;
@@ -51,7 +52,7 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
private static final String EXPLAIN_NAME = "TIME_SERIES_AGGREGATION";
private final String _timeColumn;
private final TimeUnit _storedTimeUnit;
- private final Long _timeOffset;
+ private final long _timeOffset;
private final AggInfo _aggInfo;
private final ExpressionContext _valueExpression;
private final List<String> _groupByExpressions;
@@ -62,7 +63,7 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
public TimeSeriesAggregationOperator(
String timeColumn,
TimeUnit timeUnit,
- Long timeOffsetSeconds,
+ @Nullable Long timeOffsetSeconds,
AggInfo aggInfo,
ExpressionContext valueExpression,
List<String> groupByExpressions,
@@ -71,7 +72,7 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
TimeSeriesBuilderFactory seriesBuilderFactory) {
_timeColumn = timeColumn;
_storedTimeUnit = timeUnit;
- _timeOffset = timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds));
+ _timeOffset = timeOffsetSeconds == null ? 0L :
timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds));
_aggInfo = aggInfo;
_valueExpression = valueExpression;
_groupByExpressions = groupByExpressions;
@@ -89,10 +90,8 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
// TODO: This is quite unoptimized and allocates liberally
BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn);
long[] timeValues = blockValSet.getLongValuesSV();
- if (_timeOffset != null && _timeOffset != 0L) {
- timeValues = applyTimeshift(_timeOffset, timeValues, numDocs);
- }
- int[] timeValueIndexes = getTimeValueIndex(timeValues, _storedTimeUnit,
numDocs);
+ applyTimeOffset(timeValues, numDocs);
+ int[] timeValueIndexes = getTimeValueIndex(timeValues, numDocs);
Object[][] tagValues = new Object[_groupByExpressions.size()][];
for (int i = 0; i < _groupByExpressions.size(); i++) {
blockValSet = valueBlock.getBlockValueSet(_groupByExpressions.get(i));
@@ -152,23 +151,26 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
return new ExecutionStatistics(0, 0, 0, 0);
}
- private int[] getTimeValueIndex(long[] actualTimeValues, TimeUnit timeUnit,
int numDocs) {
- if (timeUnit == TimeUnit.MILLISECONDS) {
+ @VisibleForTesting
+ protected int[] getTimeValueIndex(long[] actualTimeValues, int numDocs) {
+ if (_storedTimeUnit == TimeUnit.MILLISECONDS) {
return getTimeValueIndexMillis(actualTimeValues, numDocs);
}
int[] timeIndexes = new int[numDocs];
+ final long reference = _timeBuckets.getTimeRangeStartExclusive();
+ final long divisor = _timeBuckets.getBucketSize().getSeconds();
for (int index = 0; index < numDocs; index++) {
- timeIndexes[index] = (int) ((actualTimeValues[index] -
_timeBuckets.getStartTime())
- / _timeBuckets.getBucketSize().getSeconds());
+ timeIndexes[index] = (int) ((actualTimeValues[index] - reference - 1) /
divisor);
}
return timeIndexes;
}
private int[] getTimeValueIndexMillis(long[] actualTimeValues, int numDocs) {
int[] timeIndexes = new int[numDocs];
+ final long reference = _timeBuckets.getTimeRangeStartExclusive() * 1000L;
+ final long divisor = _timeBuckets.getBucketSize().toMillis();
for (int index = 0; index < numDocs; index++) {
- timeIndexes[index] = (int) ((actualTimeValues[index] -
_timeBuckets.getStartTime() * 1000L)
- / _timeBuckets.getBucketSize().toMillis());
+ timeIndexes[index] = (int) ((actualTimeValues[index] - reference - 1) /
divisor);
}
return timeIndexes;
}
@@ -240,14 +242,12 @@ public class TimeSeriesAggregationOperator extends
BaseOperator<TimeSeriesResult
}
}
- public static long[] applyTimeshift(long timeshift, long[] timeValues, int
numDocs) {
- if (timeshift == 0) {
- return timeValues;
+ private void applyTimeOffset(long[] timeValues, int numDocs) {
+ if (_timeOffset == 0L) {
+ return;
}
- long[] shiftedTimeValues = new long[numDocs];
for (int index = 0; index < numDocs; index++) {
- shiftedTimeValues[index] = timeValues[index] + timeshift;
+ timeValues[index] = timeValues[index] + _timeOffset;
}
- return shiftedTimeValues;
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
new file mode 100644
index 0000000000..888d0658e9
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.timeseries;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.operator.BaseProjectOperator;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesAggregationOperatorTest {
+ private static final String DUMMY_TIME_COLUMN = "someTimeColumn";
+ private static final AggInfo AGG_INFO = new AggInfo("",
Collections.emptyMap());
+ private static final ExpressionContext VALUE_EXPRESSION =
ExpressionContext.forIdentifier("someValueColumn");
+
+ @Test
+ public void testGetTimeValueIndexForSeconds() {
+ /*
+ * TimeBuckets: [10_000, 10_100, 10_200, ..., 10_900]
+ * storedTimeValues: [9_999, 10_000, 9_999, 9_901, 10_100, 10_899, 10_900]
+ * expected indexes: [0, 0, 0, 0, 1, 9, 9]
+ */
+ final int[] expectedIndexes = new int[]{0, 0, 0, 0, 1, 9, 9};
+ final TimeUnit storedTimeUnit = TimeUnit.SECONDS;
+ TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000,
Duration.ofSeconds(100), 10);
+ TimeSeriesAggregationOperator aggregationOperator =
buildOperator(storedTimeUnit, timeBuckets);
+ long[] storedTimeValues = new long[]{9_999L, 10_000L, 9_999L, 9_901L,
10_100L, 10_899L, 10_900L};
+ int[] indexes = aggregationOperator.getTimeValueIndex(storedTimeValues,
storedTimeValues.length);
+ assertEquals(indexes, expectedIndexes);
+ }
+
+ @Test
+ public void testGetTimeValueIndexForMillis() {
+ /*
+ * TimeBuckets: [10_000, 10_100, 10_200, ..., 10_900]
+ * storedTimeValues: [9_999_000, 10_000_000, 10_500_000, 10_899_999,
10_800_001, 10_900_000]
+ * expected indexes: [0, 0, 5, 9, 9, 9]
+ */
+ final int[] expectedIndexes = new int[]{0, 0, 5, 9, 9, 9};
+ final TimeUnit storedTimeUnit = TimeUnit.MILLISECONDS;
+ TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000,
Duration.ofSeconds(100), 10);
+ TimeSeriesAggregationOperator aggregationOperator =
buildOperator(storedTimeUnit, timeBuckets);
+ long[] storedTimeValues = new long[]{9_999_000L, 10_000_000L, 10_500_000L,
10_899_999L, 10_800_001L, 10_900_000L};
+ int[] indexes = aggregationOperator.getTimeValueIndex(storedTimeValues,
storedTimeValues.length);
+ assertEquals(indexes, expectedIndexes);
+ }
+
+ @Test
+ public void testGetTimeValueIndexOutOfBounds() {
+ final TimeUnit storedTimeUnit = TimeUnit.SECONDS;
+ final int numTimeBuckets = 10;
+ final int windowSeconds = 100;
+ TimeBuckets timeBuckets = TimeBuckets.ofSeconds(10_000,
Duration.ofSeconds(windowSeconds), numTimeBuckets);
+ TimeSeriesAggregationOperator aggregationOperator =
buildOperator(storedTimeUnit, timeBuckets);
+ testOutOfBoundsTimeValueIndex(new long[]{8_000}, numTimeBuckets,
aggregationOperator);
+ testOutOfBoundsTimeValueIndex(new
long[]{timeBuckets.getTimeRangeEndInclusive() + 1}, numTimeBuckets,
+ aggregationOperator);
+ }
+
+ private void testOutOfBoundsTimeValueIndex(long[] storedTimeValues, int
numTimeBuckets,
+ TimeSeriesAggregationOperator aggOperator) {
+ assertEquals(storedTimeValues.length, 1, "Misconfigured test: pass single
stored time value");
+ int[] indexes = aggOperator.getTimeValueIndex(storedTimeValues,
storedTimeValues.length);
+ assertTrue(indexes[0] < 0 || indexes[0] >= numTimeBuckets, "Expected time
index to spill beyond valid range");
+ }
+
+ private TimeSeriesAggregationOperator buildOperator(TimeUnit storedTimeUnit,
TimeBuckets timeBuckets) {
+ return new TimeSeriesAggregationOperator(
+ DUMMY_TIME_COLUMN, storedTimeUnit, 0L, AGG_INFO, VALUE_EXPRESSION,
Collections.emptyList(),
+ timeBuckets, mock(BaseProjectOperator.class),
mock(TimeSeriesBuilderFactory.class));
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 457e916b6d..404bd3efc3 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -82,6 +82,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -217,7 +218,7 @@ public class QueryExecutorTest {
@Test
public void testTimeSeriesSumQuery() {
- TimeBuckets timeBuckets =
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 1);
+ TimeBuckets timeBuckets =
TimeBuckets.ofSeconds(TIME_SERIES_TEST_START_TIME, Duration.ofHours(2), 2);
ExpressionContext valueExpression =
ExpressionContext.forIdentifier("orderAmount");
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME,
TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
@@ -230,7 +231,8 @@ public class QueryExecutorTest {
TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock)
instanceResponse.getResultsBlock();
TimeSeriesBlock timeSeriesBlock =
resultsBlock.getTimeSeriesBuilderBlock().build();
assertEquals(timeSeriesBlock.getSeriesMap().size(), 1);
-
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0],
29885544.0);
+
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0]);
+
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[1],
29885544.0);
}
@Test
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 1abe32d11b..fc10c96346 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -82,6 +82,7 @@ import
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerRequestMetada
import
org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants.WorkerResponseMetadataKeys;
import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -301,13 +302,11 @@ public class QueryDispatcher {
Map<String, String>
initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan) {
Map<String, String> result = new HashMap<>();
+ TimeBuckets timeBuckets = dispatchablePlan.getTimeBuckets();
result.put(WorkerRequestMetadataKeys.LANGUAGE,
dispatchablePlan.getLanguage());
- result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS,
- Long.toString(dispatchablePlan.getTimeBuckets().getStartTime()));
- result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS,
-
Long.toString(dispatchablePlan.getTimeBuckets().getBucketSize().getSeconds()));
- result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS,
-
Long.toString(dispatchablePlan.getTimeBuckets().getTimeBuckets().length));
+ result.put(WorkerRequestMetadataKeys.START_TIME_SECONDS,
Long.toString(timeBuckets.getTimeBuckets()[0]));
+ result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS,
Long.toString(timeBuckets.getBucketSize().getSeconds()));
+ result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS,
Long.toString(timeBuckets.getTimeBuckets().length));
for (Map.Entry<String, List<String>> entry :
dispatchablePlan.getPlanIdToSegments().entrySet()) {
result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()),
String.join(",", entry.getValue()));
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
index 8f877aec01..c9a687c5ff 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitorTest.java
@@ -54,7 +54,7 @@ public class PhysicalTimeSeriesPlanVisitorTest {
assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(),
timeColumn);
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(),
"orderCount");
assertEquals(queryContext.getFilter().toString(),
- "(cityName = 'Chicago' AND orderTime >= '1000' AND orderTime <
'2000')");
+ "(cityName = 'Chicago' AND orderTime > '990' AND orderTime <=
'1990')");
}
// Case-2: With offset, complex group-by expression, complex value, and
non-empty filter
{
@@ -75,7 +75,7 @@ public class PhysicalTimeSeriesPlanVisitorTest {
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().toString(),
"times(orderCount,'2')");
assertNotNull(queryContext.getFilter());
assertEquals(queryContext.getFilter().toString(),
- "(cityName = 'Chicago' AND orderTime >= '990' AND orderTime <
'1990')");
+ "(cityName = 'Chicago' AND orderTime > '980' AND orderTime <=
'1980')");
}
}
}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
index a92eb3bcc2..0dba0ec828 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
@@ -25,7 +25,7 @@ import java.util.Objects;
/**
* Time buckets used for query execution. Each element (say x) in the {@link
#getTimeBuckets()} array represents a
- * time-range which is half open on the right side: [x, x +
bucketSize.getSeconds()). Some query languages allow some
+ * time-range which is half open on the left side: (x -
bucketSize.getSeconds(), x]. Some query languages allow some
* operators to mutate the time-buckets on the fly, so it is not guaranteed
that the time resolution and/or range
* will be the same across all operators. For instance, Uber's M3QL supports a
"summarize 1h sum" operator which will
* change the bucket resolution to 1 hour for all subsequent operators.
@@ -47,16 +47,16 @@ public class TimeBuckets {
return _bucketSize;
}
- public long getStartTime() {
- return _timeBuckets[0];
+ public long getTimeRangeStartExclusive() {
+ return _timeBuckets[0] - _bucketSize.getSeconds();
}
- public long getEndTime() {
+ public long getTimeRangeEndInclusive() {
return _timeBuckets[_timeBuckets.length - 1];
}
public long getRangeSeconds() {
- return _timeBuckets[_timeBuckets.length - 1] - _timeBuckets[0] +
_bucketSize.getSeconds();
+ return getTimeRangeEndInclusive() - getTimeRangeStartExclusive();
}
public int getNumBuckets() {
@@ -67,13 +67,12 @@ public class TimeBuckets {
if (_timeBuckets.length == 0) {
return -1;
}
- if (timeValue < _timeBuckets[0]) {
+ if (timeValue <= getTimeRangeStartExclusive() || timeValue >
getTimeRangeEndInclusive()) {
return -1;
}
- if (timeValue >= _timeBuckets[_timeBuckets.length - 1] +
_bucketSize.getSeconds()) {
- return -1;
- }
- return (int) ((timeValue - _timeBuckets[0]) / _bucketSize.getSeconds());
+ long offsetFromRangeStart = timeValue - getTimeRangeStartExclusive();
+ // Subtract 1 from the offset because we have intervals half-open on the
left.
+ return (int) ((offsetFromRangeStart - 1) / _bucketSize.getSeconds());
}
@Override
@@ -82,7 +81,8 @@ public class TimeBuckets {
return false;
}
TimeBuckets other = (TimeBuckets) o;
- return this.getStartTime() == other.getStartTime() && this.getEndTime() ==
other.getEndTime()
+ return this.getTimeRangeStartExclusive() ==
other.getTimeRangeStartExclusive()
+ && this.getTimeRangeEndInclusive() == other.getTimeRangeEndInclusive()
&& this.getBucketSize().equals(other.getBucketSize());
}
@@ -93,11 +93,22 @@ public class TimeBuckets {
return result;
}
- public static TimeBuckets ofSeconds(long startTimeSeconds, Duration
bucketSize, int numElements) {
+ /**
+ * Creates time-buckets, with the first value in the bucket being
firstBucketValue (FBV). The time range represented
+ * by the buckets are:
+ * <pre>
+ * (FBV - bucketSize.getSeconds(), FBV + (numElements - 1) *
bucketSize.getSeconds()]
+ * </pre>
+ * The raw Long[] time values are:
+ * <pre>
+ * FBV, FBV + bucketSize.getSeconds(), ... , FBV + (numElements - 1) *
bucketSize.getSeconds()
+ * </pre>
+ */
+ public static TimeBuckets ofSeconds(long firstBucketValue, Duration
bucketSize, int numElements) {
long stepSize = bucketSize.getSeconds();
Long[] timeBuckets = new Long[numElements];
for (int i = 0; i < numElements; i++) {
- timeBuckets[i] = startTimeSeconds + i * stepSize;
+ timeBuckets[i] = firstBucketValue + i * stepSize;
}
return new TimeBuckets(timeBuckets, bucketSize);
}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
index 856b81622a..773d675242 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java
@@ -115,14 +115,12 @@ public class LeafTimeSeriesPlanNode extends
BaseTimeSeriesPlanNode {
public String getEffectiveFilter(TimeBuckets timeBuckets) {
String filter = _filterExpression == null ? "" : _filterExpression;
- long startTime =
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() -
_offsetSeconds));
- long endTime =
- _timeUnit.convert(Duration.ofSeconds(
- timeBuckets.getEndTime() + timeBuckets.getBucketSize().toSeconds()
- _offsetSeconds));
- String addnFilter = String.format("%s >= %d AND %s < %d", _timeColumn,
startTime, _timeColumn, endTime);
+ long startTime =
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeStartExclusive() -
_offsetSeconds));
+ long endTime =
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeEndInclusive() -
_offsetSeconds));
+ String timeFilter = String.format("%s > %d AND %s <= %d", _timeColumn,
startTime, _timeColumn, endTime);
if (filter.strip().isEmpty()) {
- return addnFilter;
+ return timeFilter;
}
- return String.format("(%s) AND (%s)", filter, addnFilter);
+ return String.format("(%s) AND (%s)", filter, timeFilter);
}
}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/TimeBucketsTest.java
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/TimeBucketsTest.java
new file mode 100644
index 0000000000..27d4ce7543
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/TimeBucketsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import java.time.Duration;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeBucketsTest {
+ @Test
+ public void testTimeBucketsSemantics() {
+ /*
+ * time-bucket values: [10_000, 10_100, 10_200, ... , 10_900]
+ */
+ final int firstBucketValue = 10_000;
+ final int bucketSize = 100;
+ final int numElements = 10;
+ TimeBuckets timeBuckets = TimeBuckets.ofSeconds(firstBucketValue,
Duration.ofSeconds(bucketSize), numElements);
+ assertEquals(timeBuckets.getNumBuckets(), numElements);
+ assertEquals(timeBuckets.getBucketSize().getSeconds(), bucketSize);
+ assertEquals(timeBuckets.getTimeRangeStartExclusive(), firstBucketValue -
bucketSize);
+ assertEquals(timeBuckets.getTimeRangeEndInclusive(), firstBucketValue +
(numElements - 1) * bucketSize);
+ assertEquals(timeBuckets.getRangeSeconds(),
+ timeBuckets.getTimeRangeEndInclusive() -
timeBuckets.getTimeRangeStartExclusive());
+ assertEquals(timeBuckets.resolveIndex(10_000), 0);
+ assertEquals(timeBuckets.resolveIndex(9_999), 0);
+ assertEquals(timeBuckets.resolveIndex(9_901), 0);
+ assertEquals(timeBuckets.resolveIndex(10_100), 1);
+ assertEquals(timeBuckets.resolveIndex(10_101), 2);
+ assertEquals(timeBuckets.resolveIndex(10_900), 9);
+ // Test out of bound indexes
+ assertEquals(timeBuckets.resolveIndex(9_900), -1);
+ assertEquals(timeBuckets.resolveIndex(10_901), -1);
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
index f439bfc028..011cb6fbc6 100644
---
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
+++
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java
@@ -37,8 +37,8 @@ public class LeafTimeSeriesPlanNodeTest {
@Test
public void testGetEffectiveFilter() {
TimeBuckets timeBuckets = TimeBuckets.ofSeconds(1000,
Duration.ofSeconds(13), 9);
- final long expectedStartTimeInFilter = 1000;
- final long expectedEndTimeInFilter = 1000 + 13 * 9;
+ final long expectedStartTimeInFilter =
timeBuckets.getTimeRangeStartExclusive();
+ final long expectedEndTimeInFilter =
timeBuckets.getTimeRangeEndInclusive();
final String nonEmptyFilter = "cityName = 'Chicago'";
// Case-1: No offset, and empty filter.
{
@@ -46,7 +46,7 @@ public class LeafTimeSeriesPlanNodeTest {
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TIME_UNIT, 0L, "", "value_col",
new AggInfo("SUM", null), Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
- "orderTime >= " + expectedStartTimeInFilter + " AND orderTime < " +
expectedEndTimeInFilter);
+ "orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " +
expectedEndTimeInFilter);
}
// Case-2: Offset, but empty filter
{
@@ -54,7 +54,7 @@ public class LeafTimeSeriesPlanNodeTest {
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TIME_UNIT, 123L, "", "value_col",
new AggInfo("SUM", null), Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
- "orderTime >= " + (expectedStartTimeInFilter - 123) + " AND
orderTime < " + (expectedEndTimeInFilter - 123));
+ "orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime
<= " + (expectedEndTimeInFilter - 123));
}
// Case-3: Offset and non-empty filter
{
@@ -62,7 +62,7 @@ public class LeafTimeSeriesPlanNodeTest {
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter,
"value_col", new AggInfo("SUM", null),
Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
- String.format("(%s) AND (orderTime >= %s AND orderTime < %s)",
nonEmptyFilter,
+ String.format("(%s) AND (orderTime > %s AND orderTime <= %s)",
nonEmptyFilter,
(expectedStartTimeInFilter - 123), (expectedEndTimeInFilter -
123)));
}
// Case-4: Offset, and non-empty filter, and time-unit that is not seconds
@@ -71,7 +71,7 @@ public class LeafTimeSeriesPlanNodeTest {
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE,
TIME_COLUMN, TimeUnit.MILLISECONDS, 123L,
nonEmptyFilter, "value_col", new AggInfo("SUM", null),
Collections.singletonList("cityName"));
assertEquals(planNode.getEffectiveFilter(timeBuckets),
- String.format("(%s) AND (orderTime >= %s AND orderTime < %s)",
nonEmptyFilter,
+ String.format("(%s) AND (orderTime > %s AND orderTime <= %s)",
nonEmptyFilter,
(expectedStartTimeInFilter * 1000 - 123 * 1000),
(expectedEndTimeInFilter * 1000 - 123 * 1000)));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]