This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b828280882 Part-1: Pinot Timeseries Engine SPI (#13885)
b828280882 is described below
commit b8282808826b143cabcbf59fa408493fe9d03665
Author: Ankit Sultana <[email protected]>
AuthorDate: Tue Sep 10 02:11:09 2024 +0530
Part-1: Pinot Timeseries Engine SPI (#13885)
---
.gitignore | 3 +-
pinot-timeseries/pinot-timeseries-spi/pom.xml | 49 ++++++++
.../java/org/apache/pinot/tsdb/spi/AggInfo.java | 44 +++++++
.../tsdb/spi/PinotTimeSeriesConfiguration.java | 57 +++++++++
.../pinot/tsdb/spi/RangeTimeSeriesRequest.java | 103 +++++++++++++++++
.../org/apache/pinot/tsdb/spi/TimeBuckets.java | 104 +++++++++++++++++
.../tsdb/spi/TimeSeriesLogicalPlanResult.java | 43 +++++++
.../pinot/tsdb/spi/TimeSeriesLogicalPlanner.java | 39 +++++++
.../tsdb/spi/operator/BaseTimeSeriesOperator.java | 65 +++++++++++
.../tsdb/spi/plan/BaseTimeSeriesPlanNode.java | 55 +++++++++
.../spi/plan/ScanFilterAndProjectPlanNode.java | 127 +++++++++++++++++++++
.../tsdb/spi/plan/serde/TimeSeriesPlanSerde.java | 95 +++++++++++++++
.../tsdb/spi/series/BaseTimeSeriesBuilder.java | 75 ++++++++++++
.../apache/pinot/tsdb/spi/series/TimeSeries.java | 113 ++++++++++++++++++
.../pinot/tsdb/spi/series/TimeSeriesBlock.java | 48 ++++++++
.../tsdb/spi/series/TimeSeriesBuilderFactory.java | 36 ++++++
.../series/TimeSeriesBuilderFactoryProvider.java | 63 ++++++++++
.../spi/series/builders/MaxTimeSeriesBuilder.java | 57 +++++++++
.../spi/series/builders/MinTimeSeriesBuilder.java | 57 +++++++++
.../series/builders/SummingTimeSeriesBuilder.java | 55 +++++++++
.../spi/plan/serde/TimeSeriesPlanSerdeTest.java | 52 +++++++++
pinot-timeseries/pom.xml | 44 +++++++
pom.xml | 1 +
23 files changed, 1384 insertions(+), 1 deletion(-)
diff --git a/.gitignore b/.gitignore
index ca69e90d0e..00404492f8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -45,7 +45,8 @@ yarn-error.log*
quickstart*
#build symlink directory
-build*
+build
+build/*
#helm related files
kubernetes/helm/**/charts/
diff --git a/pinot-timeseries/pinot-timeseries-spi/pom.xml
b/pinot-timeseries/pinot-timeseries-spi/pom.xml
new file mode 100644
index 0000000000..c21e9971d5
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-spi/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>pinot-timeseries-spi</artifactId>
+
+ <properties>
+ <pinot.root>${basedir}/../..</pinot.root>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
new file mode 100644
index 0000000000..03d9cc8aa9
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * AggInfo is used to represent the aggregation function. Aggregation
functions are simply stored as a string,
+ * since time-series languages are allowed to implement their own aggregation
functions.
+ * TODO: We will likely be adding more parameters to this. One candidate is
partial/full aggregation information or
+ * aggregation result type to allow for intermediate result types.
+ */
+public class AggInfo {
+ private final String _aggFunction;
+
+ @JsonCreator
+ public AggInfo(@JsonProperty("aggFunction") String aggFunction) {
+ Preconditions.checkNotNull(aggFunction, "Received null aggFunction in
AggInfo");
+ _aggFunction = aggFunction;
+ }
+
+ public String getAggFunction() {
+ return _aggFunction;
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfiguration.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfiguration.java
new file mode 100644
index 0000000000..d0fac4369f
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/PinotTimeSeriesConfiguration.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
+
+
+public class PinotTimeSeriesConfiguration {
+ private PinotTimeSeriesConfiguration() {
+ }
+
+ public static final String CONFIG_PREFIX = "pinot.timeseries";
+ private static final String ENABLE_LANGUAGES_SUFFIX = ".languages";
+ private static final String SERIES_BUILDER_FACTORY_SUFFIX =
".series.builder.factory";
+ private static final String LOGICAL_PLANNER_CLASS_SUFFIX =
".logical.planner.class";
+
+ /**
+ * Config key that controls which time-series languages are enabled in a
given Pinot cluster.
+ */
+ public static String getEnabledLanguagesConfigKey() {
+ return CONFIG_PREFIX + ENABLE_LANGUAGES_SUFFIX;
+ }
+
+ /**
+ * Returns the config key which determines the class name for the {@link
TimeSeriesBuilderFactory} to be used for a
+ * given language. Each language can have its own {@link
TimeSeriesBuilderFactory}, which allows each language to
+ * support custom time-series functions.
+ */
+ public static String getSeriesBuilderFactoryConfigKey(String language) {
+ return CONFIG_PREFIX + "." + language + SERIES_BUILDER_FACTORY_SUFFIX;
+ }
+
+ /**
+ * Returns config key which determines the class name for the {@link
TimeSeriesLogicalPlanner} to be used for a given
+ * language. Pinot broker will load this logical planner on start-up
dynamically. This is called for each language
+ * configured via {@link #getEnabledLanguagesConfigKey()}.
+ */
+ public static String getLogicalPlannerConfigKey(String language) {
+ return CONFIG_PREFIX + "." + language + LOGICAL_PLANNER_CLASS_SUFFIX;
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
new file mode 100644
index 0000000000..2c4fc045a4
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/RangeTimeSeriesRequest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+
+
+/**
+ * A time-series request received by the Pinot Broker. This is passed to the
{@link TimeSeriesLogicalPlanner} so
+ * each query language can parse and plan the query based on their spec.
+ * <br/>
+ * <br/>
+ * <b>Notes:</b>
+ * <ul>
+ * <li>[start, end] are both inclusive.</li>
+ * <li>
+ * The result can contain time values outside [start, end], though we
generally recommend to keep your results
+ * within the requested range. This decision is left to the time-series
query language implementations. In some
+ * cases, returning data outside the requested time-range can help (e.g.
for debugging purposes when you are
+ * computing moving 1d sum but are only looking at data for the last 12
hours).
+ * </li>
+ * <li>stepSeconds is used to define the default resolution for the
query</li>
+ * <li>
+ * Some query languages allow users to change the resolution via a
function, and in those cases the returned
+ * time-series may have a resolution different than stepSeconds
+ * </li>
+ * <li>
+ * The query execution may scan and process data outside of the time-range
[start, end]. The actual data scanned
+ * and processed is defined by the {@link TimeBuckets} used by the operator.
+ * </li>
+ * </ul>
+ */
+public class RangeTimeSeriesRequest {
+ /** Engine allows a Pinot cluster to support multiple Time-Series Query
Languages. */
+ private final String _engine;
+ /** Query is the raw query sent by the caller. */
+ private final String _query;
+ /** Start time of the time-window being queried. */
+ private final long _startSeconds;
+ /** End time of the time-window being queried. */
+ private final long _endSeconds;
+ /**
+ * <b>Optional</b> field which the caller can use to suggest the default
resolution for the query. Language
+ * implementations can choose to skip this suggestion and choose their own
resolution based on their semantics.
+ */
+ private final long _stepSeconds;
+ /** E2E timeout for the query. */
+ private final Duration _timeout;
+
+ public RangeTimeSeriesRequest(String engine, String query, long
startSeconds, long endSeconds, long stepSeconds,
+ Duration timeout) {
+ Preconditions.checkState(endSeconds >= startSeconds, "Invalid range.
startSeconds "
+ + "should be greater than or equal to endSeconds. Found
startSeconds=%s and endSeconds=%s",
+ startSeconds, endSeconds);
+ _engine = engine;
+ _query = query;
+ _startSeconds = startSeconds;
+ _endSeconds = endSeconds;
+ _stepSeconds = stepSeconds;
+ _timeout = timeout;
+ }
+
+ public String getEngine() {
+ return _engine;
+ }
+
+ public String getQuery() {
+ return _query;
+ }
+
+ public long getStartSeconds() {
+ return _startSeconds;
+ }
+
+ public long getEndSeconds() {
+ return _endSeconds;
+ }
+
+ public long getStepSeconds() {
+ return _stepSeconds;
+ }
+
+ public Duration getTimeout() {
+ return _timeout;
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
new file mode 100644
index 0000000000..866249845e
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeBuckets.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Objects;
+
+
+/**
+ * Time buckets used for query execution. Each element (say x) in the {@link
#getTimeBuckets()} array represents a
+ * time-range which is half open on the right side: [x, x +
bucketSize.getSeconds()). Some query languages allow some
+ * operators to mutate the time-buckets on the fly, so it is not guaranteed
that the time resolution and/or range
+ * will be the same across all operators. For instance, Uber's M3QL supports a
"summarize 1h sum" operator which will
+ * change the bucket resolution to 1 hour for all subsequent operators.
+ */
+public class TimeBuckets {
+ private final Long[] _timeBuckets;
+ private final Duration _bucketSize;
+
+ private TimeBuckets(Long[] timeBuckets, Duration bucketSize) {
+ _timeBuckets = timeBuckets;
+ _bucketSize = bucketSize;
+ }
+
+ public Long[] getTimeBuckets() {
+ return _timeBuckets;
+ }
+
+ public Duration getBucketSize() {
+ return _bucketSize;
+ }
+
+ public long getStartTime() {
+ return _timeBuckets[0];
+ }
+
+ public long getEndTime() {
+ return _timeBuckets[_timeBuckets.length - 1];
+ }
+
+ public long getRangeSeconds() {
+ return _timeBuckets[_timeBuckets.length - 1] - _timeBuckets[0];
+ }
+
+ public int getNumBuckets() {
+ return _timeBuckets.length;
+ }
+
+ public int resolveIndex(long timeValue) {
+ if (_timeBuckets.length == 0) {
+ return -1;
+ }
+ if (timeValue < _timeBuckets[0]) {
+ return -1;
+ }
+ if (timeValue >= _timeBuckets[_timeBuckets.length - 1] +
_bucketSize.getSeconds()) {
+ return -1;
+ }
+ return (int) ((timeValue - _timeBuckets[0]) / _bucketSize.getSeconds());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TimeBuckets)) {
+ return false;
+ }
+ TimeBuckets other = (TimeBuckets) o;
+ return this.getStartTime() == other.getStartTime() && this.getEndTime() ==
other.getEndTime()
+ && this.getBucketSize().equals(other.getBucketSize());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(_bucketSize);
+ result = 31 * result + Arrays.hashCode(_timeBuckets);
+ return result;
+ }
+
+ public static TimeBuckets ofSeconds(long startTimeSeconds, Duration
bucketSize, int numElements) {
+ long stepSize = bucketSize.getSeconds();
+ Long[] timeBuckets = new Long[numElements];
+ for (int i = 0; i < numElements; i++) {
+ timeBuckets[i] = startTimeSeconds + i * stepSize;
+ }
+ return new TimeBuckets(timeBuckets, bucketSize);
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanResult.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanResult.java
new file mode 100644
index 0000000000..829a4042bb
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanResult.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+
+
+/**
+ * The result of logical planning.
+ */
+public class TimeSeriesLogicalPlanResult {
+ private final BaseTimeSeriesPlanNode _planNode;
+ private final TimeBuckets _timeBuckets;
+
+ public TimeSeriesLogicalPlanResult(BaseTimeSeriesPlanNode planNode,
TimeBuckets timeBuckets) {
+ _planNode = planNode;
+ _timeBuckets = timeBuckets;
+ }
+
+ public BaseTimeSeriesPlanNode getPlanNode() {
+ return _planNode;
+ }
+
+ public TimeBuckets getTimeBuckets() {
+ return _timeBuckets;
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
new file mode 100644
index 0000000000..0c7e724ca8
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/TimeSeriesLogicalPlanner.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi;
+
+import java.util.Map;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+
+
+/**
+ * Allows time-series query languages to implement their own logical planner.
The input to this planner is a
+ * {@link RangeTimeSeriesRequest} and the output is a {@link
TimeSeriesLogicalPlanResult}. Put simply, this abstraction
+ * takes in the query text and other parameters, and returns a logical plan
which is a tree of
+ * {@link BaseTimeSeriesPlanNode}. Other than the plan-tree, the planner also
returns a {@link TimeBuckets} which is
+ * the default TimeBuckets used by the query operators at runtime.
Implementations are free to adjust them as they see
+ * fit. For instance, one query language might want to extend to the left or
right of the time-range based on certain
+ * operators. Also, see {@link
ScanFilterAndProjectPlanNode#getEffectiveFilter(TimeBuckets)}.
+ */
+public interface TimeSeriesLogicalPlanner {
+ void init(Map<String, Object> config);
+
+ TimeSeriesLogicalPlanResult plan(RangeTimeSeriesRequest request);
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/operator/BaseTimeSeriesOperator.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/operator/BaseTimeSeriesOperator.java
new file mode 100644
index 0000000000..28be511c37
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/operator/BaseTimeSeriesOperator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.operator;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+
+
+/**
+ * Every time-series operator takes in a {@link TimeSeriesBlock} and returns
another {@link TimeSeriesBlock}.
+ * Parent operators/callers must call {@link #nextBlock()} to get the next
block from the child operators, and implement
+ * {@link #getNextBlock()} to implement the business logic for their operator.
Also see {@link BaseTimeSeriesPlanNode}.
+ * TODO: Add common hierarchy with other operators like Multistage and Pinot
core. This will likely require us to
+ * define a pinot-query-spi or add/move some abstractions to pinot-spi.
+ */
+public abstract class BaseTimeSeriesOperator {
+ protected final List<BaseTimeSeriesOperator> _childOperators;
+
+ public BaseTimeSeriesOperator(List<BaseTimeSeriesOperator> childOperators) {
+ _childOperators = childOperators;
+ }
+
+ /**
+ * Called by parent time-series operators.
+ */
+ public final TimeSeriesBlock nextBlock() {
+ long startTime = System.currentTimeMillis();
+ try {
+ return getNextBlock();
+ } finally {
+ // TODO: add stats
+ }
+ }
+
+ public List<BaseTimeSeriesOperator> getChildOperators() {
+ return _childOperators;
+ }
+
+ /**
+ * Time series query languages can implement their own business logic in
their operators.
+ */
+ public abstract TimeSeriesBlock getNextBlock();
+
+ /**
+ * Name that will show up in the explain plan.
+ */
+ public abstract String getExplainName();
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java
new file mode 100644
index 0000000000..dd7a951752
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.plan;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+
+
+/**
+ * Generic plan node for time series queries. This allows each time-series
query language to define their own plan
+ * nodes, which in turn generate the language specific {@link
BaseTimeSeriesOperator}.
+ */
+public abstract class BaseTimeSeriesPlanNode {
+ protected final String _id;
+ protected final List<BaseTimeSeriesPlanNode> _children;
+
+ public BaseTimeSeriesPlanNode(String id, List<BaseTimeSeriesPlanNode>
children) {
+ _id = id;
+ _children = children;
+ }
+
+ public String getId() {
+ return _id;
+ }
+
+ public List<BaseTimeSeriesPlanNode> getChildren() {
+ return _children;
+ }
+
+ public void addChildNode(BaseTimeSeriesPlanNode planNode) {
+ _children.add(planNode);
+ }
+
+ public abstract String getKlass();
+
+ public abstract String getExplainName();
+
+ public abstract BaseTimeSeriesOperator run();
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
new file mode 100644
index 0000000000..e2a0a15f27
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/ScanFilterAndProjectPlanNode.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.plan;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+
+
+/**
+ * This would typically be the leaf node of a plan-tree generated by a
time-series engine's logical planner. At runtime,
+ * this gets compiled to a Combine Operator.
+ * <b>Note:</b> You don't need to pass the time-filter to the filter
expression, since Pinot will automatically compute
+ * the time filter based on the computed time buckets in {@link
TimeSeriesLogicalPlanner}.
+ */
+public class ScanFilterAndProjectPlanNode extends BaseTimeSeriesPlanNode {
+ private static final String EXPLAIN_NAME = "SCAN_FILTER_AND_PROJECT";
+ private final String _tableName;
+ private final String _timeColumn;
+ private final TimeUnit _timeUnit;
+ private final Long _offset;
+ private final String _filterExpression;
+ private final String _valueExpression;
+ private final AggInfo _aggInfo;
+ private final List<String> _groupByColumns;
+
+ @JsonCreator
+ public ScanFilterAndProjectPlanNode(
+ @JsonProperty("id") String id, @JsonProperty("children")
List<BaseTimeSeriesPlanNode> children,
+ @JsonProperty("tableName") String tableName, @JsonProperty("timeColumn")
String timeColumn,
+ @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offset")
Long offset,
+ @JsonProperty("filterExpression") String filterExpression,
+ @JsonProperty("valueExpression") String valueExpression,
+ @JsonProperty("aggInfo") AggInfo aggInfo,
@JsonProperty("groupByColumns") List<String> groupByColumns) {
+ super(id, children);
+ _tableName = tableName;
+ _timeColumn = timeColumn;
+ _timeUnit = timeUnit;
+ // TODO: This is broken technically. Adjust offset to meet TimeUnit
resolution. For now use 0 offset.
+ _offset = offset;
+ _filterExpression = filterExpression;
+ _valueExpression = valueExpression;
+ _aggInfo = aggInfo;
+ _groupByColumns = groupByColumns;
+ }
+
+ @Override
+ public String getKlass() {
+ return ScanFilterAndProjectPlanNode.class.getName();
+ }
+
+ @Override
+ public String getExplainName() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ public BaseTimeSeriesOperator run() {
+ throw new UnsupportedOperationException("");
+ }
+
+ public String getTableName() {
+ return _tableName;
+ }
+
+ public String getTimeColumn() {
+ return _timeColumn;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return _timeUnit;
+ }
+
+ public Long getOffset() {
+ return _offset;
+ }
+
+ public String getFilterExpression() {
+ return _filterExpression;
+ }
+
+ public String getValueExpression() {
+ return _valueExpression;
+ }
+
+ public AggInfo getAggInfo() {
+ return _aggInfo;
+ }
+
+ public List<String> getGroupByColumns() {
+ return _groupByColumns;
+ }
+
+ public String getEffectiveFilter(TimeBuckets timeBuckets) {
+ String filter = _filterExpression == null ? "" : _filterExpression;
+ // TODO: This is wrong. offset should be converted to seconds before
arithmetic. For now use 0 offset.
+ long startTime =
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getStartTime() - _offset));
+ long endTime =
_timeUnit.convert(Duration.ofSeconds(timeBuckets.getEndTime() - _offset));
+ String addnFilter = String.format("%s >= %d AND %s <= %d", _timeColumn,
startTime, _timeColumn, endTime);
+ if (filter.strip().isEmpty()) {
+ return addnFilter;
+ }
+ return String.format("(%s) AND (%s)", filter, addnFilter);
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java
new file mode 100644
index 0000000000..e4e036e1ff
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.plan.serde;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+
+
+/**
+ * We have implemented a custom serialization/deserialization mechanism for
time series plans. This allows users to
+ * use Jackson to annotate their plan nodes as shown in {@link
ScanFilterAndProjectPlanNode}, which is used for
+ * plan serde for broker/server communication.
+ * TODO: There are limitations to this and we will change this soon. Issues:
+ * 1. Pinot TS SPI is compiled in Pinot distribution and Jackson deps get
shaded usually.
+ * 2. The plugins have to shade the dependency in the exact same way, which
is obviously error-prone and not ideal.
+ */
[email protected]
+public class TimeSeriesPlanSerde {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ static {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ }
+
+ private TimeSeriesPlanSerde() {
+ }
+
+ public static String serialize(BaseTimeSeriesPlanNode planNode) {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(planNode);
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while serializing plan", e);
+ }
+ }
+
+ public static BaseTimeSeriesPlanNode deserialize(String planString) {
+ try {
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(planString);
+ return create(jsonNode);
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while deserializing plan",
e);
+ }
+ }
+
+ public static BaseTimeSeriesPlanNode create(JsonNode jsonNode)
+ throws JsonProcessingException, ClassNotFoundException {
+ JsonNode children = null;
+ if (jsonNode instanceof ObjectNode) {
+ // Remove children field to prevent Jackson from deserializing it. We
will handle it manually.
+ ObjectNode objectNode = (ObjectNode) jsonNode;
+ if (objectNode.has("children")) {
+ children = objectNode.get("children");
+ objectNode.remove("children");
+ }
+ objectNode.putIfAbsent("children", OBJECT_MAPPER.createArrayNode());
+ }
+ BaseTimeSeriesPlanNode planNode = null;
+ try {
+ String klassName = jsonNode.get("klass").asText();
+ Class<BaseTimeSeriesPlanNode> klass = (Class<BaseTimeSeriesPlanNode>)
Class.forName(klassName);
+ planNode = OBJECT_MAPPER.readValue(jsonNode.toString(), klass);
+ } finally {
+ if (planNode != null && children instanceof ArrayNode) {
+ ArrayNode childArray = (ArrayNode) children;
+ for (JsonNode childJsonNode : childArray) {
+ planNode.addChildNode(create(childJsonNode));
+ }
+ }
+ }
+ return planNode;
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
new file mode 100644
index 0000000000..3509e7cfcd
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+/**
+ * BaseSeriesBuilder allows language implementations to build their own
aggregation and other time-series functions.
+ * Each time-series operator would typically call either of {@link #addValue}
or {@link #addValueAtIndex}. When
+ * the operator is done, it will call {@link #build()} to allow the builder to
compute the final {@link TimeSeries}.
+ */
+public abstract class BaseTimeSeriesBuilder {
+ protected final String _id;
+ @Nullable
+ protected final Long[] _timeValues;
+ @Nullable
+ protected final TimeBuckets _timeBuckets;
+ protected final List<String> _tagNames;
+ protected final Object[] _tagValues;
+
+ public BaseTimeSeriesBuilder(String id, @Nullable Long[] timeValues,
@Nullable TimeBuckets timeBuckets,
+ List<String> tagNames, Object[] tagValues) {
+ _id = id;
+ _timeValues = timeValues;
+ _timeBuckets = timeBuckets;
+ _tagNames = tagNames;
+ _tagValues = tagValues;
+ }
+
+ public abstract void addValueAtIndex(int timeBucketIndex, Double value);
+
+ public void addValueAtIndex(int timeBucketIndex, String value) {
+ throw new IllegalStateException("This aggregation function does not
support string input");
+ }
+
+ public abstract void addValue(long timeValue, Double value);
+
+ public void mergeSeries(TimeSeries series) {
+ int numDataPoints = series.getValues().length;
+ Long[] timeValues = Objects.requireNonNull(series.getTimeValues(),
+ "Cannot merge series: found null timeValues");
+ for (int i = 0; i < numDataPoints; i++) {
+ addValue(timeValues[i], series.getValues()[i]);
+ }
+ }
+
+ public void mergeAlignedSeries(TimeSeries series) {
+ int numDataPoints = series.getValues().length;
+ for (int i = 0; i < numDataPoints; i++) {
+ addValueAtIndex(i, series.getValues()[i]);
+ }
+ }
+
+ public abstract TimeSeries build();
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
new file mode 100644
index 0000000000..5fcf70b42e
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+/**
+ * Logically, a time-series is a list of pairs of time and data values, where
time is stored in increasing order.
+ * A time-series is identified using its ID, which can be retrieved using
{@link #getId()}.
+ * A time series typically also has a set of pairs of keys and values which
are called tags or labels.
+ * We allow a Series to store time either via {@link TimeBuckets} or via a
long array as in {@link #getTimeValues()}.
+ * Using {@link TimeBuckets} is ideal when your queries are working on evenly
spaced time ranges. The other option
+ * exists to support use-cases such as "Instant Vectors" in PromQL.
+ * <p>
+ * <b>Warning:</b> The time and value arrays passed to the Series are not
copied, and can be modified by anyone with
+ * access to them. This is by design, to make it easier to re-use buffers
during time-series operations.
+ * </p>
+ */
+public class TimeSeries {
+ private final String _id;
+ private final Long[] _timeValues;
+ private final TimeBuckets _timeBuckets;
+ private final Double[] _values;
+ private final List<String> _tagNames;
+ private final Object[] _tagValues;
+
+ public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable
TimeBuckets timeBuckets, Double[] values,
+ List<String> tagNames, Object[] tagValues) {
+ _id = id;
+ _timeValues = timeValues;
+ _timeBuckets = timeBuckets;
+ _values = values;
+ _tagNames = Collections.unmodifiableList(tagNames);
+ _tagValues = tagValues;
+ }
+
+ public String getId() {
+ return _id;
+ }
+
+ @Nullable
+ public Long[] getTimeValues() {
+ return _timeValues;
+ }
+
+ @Nullable
+ public TimeBuckets getTimeBuckets() {
+ return _timeBuckets;
+ }
+
+ public Double[] getValues() {
+ return _values;
+ }
+
+ public List<String> getTagNames() {
+ return _tagNames;
+ }
+
+ public Object[] getTagValues() {
+ return _tagValues;
+ }
+
+ public Map<String, String> getTagKeyValuesAsMap() {
+ Map<String, String> result = new HashMap<>();
+ for (int index = 0; index < _tagNames.size(); index++) {
+ String tagValue = _tagValues[index] == null ? "null" :
_tagValues[index].toString();
+ result.put(_tagNames.get(index), tagValue);
+ }
+ return result;
+ }
+
+ public String getTagsSerialized() {
+ if (_tagNames.isEmpty()) {
+ return "*";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < _tagNames.size(); i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(String.format("%s=%s", _tagNames.get(i), _tagValues[i]));
+ }
+ return sb.toString();
+ }
+
+ // TODO: This can be cleaned up
+ public static long hash(Object[] tagNamesAndValues) {
+ return Objects.hash(tagNamesAndValues);
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
new file mode 100644
index 0000000000..fe7fd5be42
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBlock.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+/**
+ * Block used by time series operators. We store the series data in a map
keyed by the series' ID. The value is a
+ * list of series, because some query languages support "union" operations
which allow series with the same tags/labels
+ * to exist either in the query response or temporarily during execution
before some n-ary series function
+ * is applied.
+ */
+public class TimeSeriesBlock {
+ private final TimeBuckets _timeBuckets;
+ private final Map<Long, List<TimeSeries>> _seriesMap;
+
+ public TimeSeriesBlock(TimeBuckets timeBuckets, Map<Long, List<TimeSeries>>
seriesMap) {
+ _timeBuckets = timeBuckets;
+ _seriesMap = seriesMap;
+ }
+
+ public TimeBuckets getTimeBuckets() {
+ return _timeBuckets;
+ }
+
+ public Map<Long, List<TimeSeries>> getSeriesMap() {
+ return _seriesMap;
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
new file mode 100644
index 0000000000..088f9b3c85
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.util.List;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+
+
+public abstract class TimeSeriesBuilderFactory {
+ public abstract BaseTimeSeriesBuilder newTimeSeriesBuilder(
+ AggInfo aggInfo,
+ String id,
+ TimeBuckets timeBuckets,
+ List<String> tagNames,
+ Object[] tagValues);
+
+ public abstract void init(PinotConfiguration pinotConfiguration);
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java
new file mode 100644
index 0000000000..b757579839
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactoryProvider.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
+
+
+/**
+ * Loads all series builder providers for all configured time-series query
languages.
+ */
+public class TimeSeriesBuilderFactoryProvider {
+ private static final Map<String, TimeSeriesBuilderFactory> FACTORY_MAP = new
HashMap<>();
+
+ private TimeSeriesBuilderFactoryProvider() {
+ }
+
+ public static void init(PinotConfiguration pinotConfiguration) {
+ String[] languages =
pinotConfiguration.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey(),
"")
+ .split(",");
+ for (String language : languages) {
+ String seriesBuilderClass = pinotConfiguration
+
.getProperty(PinotTimeSeriesConfiguration.getSeriesBuilderFactoryConfigKey(language));
+ try {
+ Object untypedSeriesBuilderFactory =
Class.forName(seriesBuilderClass).getConstructor().newInstance();
+ if (!(untypedSeriesBuilderFactory instanceof
TimeSeriesBuilderFactory)) {
+ throw new RuntimeException("Series builder factory class " +
seriesBuilderClass
+ + " does not implement SeriesBuilderFactory");
+ }
+ TimeSeriesBuilderFactory seriesBuilderFactory =
(TimeSeriesBuilderFactory) untypedSeriesBuilderFactory;
+ seriesBuilderFactory.init(pinotConfiguration.subset(
+ PinotTimeSeriesConfiguration.CONFIG_PREFIX + "." + language));
+ FACTORY_MAP.put(language, seriesBuilderFactory);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static TimeSeriesBuilderFactory getSeriesBuilderFactory(String
engine) {
+ return Objects.requireNonNull(FACTORY_MAP.get(engine),
+ "No series builder factory found for engine: " + engine);
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java
new file mode 100644
index 0000000000..742b1b32c6
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MaxTimeSeriesBuilder.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series.builders;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+
+
+/**
+ * MaxSeriesBuilder is a series builder that computes the maximum value in
each time bucket.
+ * <b>Context:</b>We provide some ready to use implementations for some of the
most common use-cases in the SPI. This
+ * reduces redundancy and also serves as a reference implementation for
language developers.
+ */
+public class MaxTimeSeriesBuilder extends BaseTimeSeriesBuilder {
+ private final Double[] _values;
+
+ public MaxTimeSeriesBuilder(String id, TimeBuckets timeBuckets, List<String>
tagNames, Object[] tagValues) {
+ super(id, null, timeBuckets, tagNames, tagValues);
+ _values = new Double[timeBuckets.getNumBuckets()];
+ }
+
+ @Override
+ public void addValueAtIndex(int timeBucketIndex, Double value) {
+ if (_values[timeBucketIndex] == null || value > _values[timeBucketIndex]) {
+ _values[timeBucketIndex] = value;
+ }
+ }
+
+ @Override
+ public void addValue(long timeValue, Double value) {
+ int timeBucketIndex = _timeBuckets.resolveIndex(timeValue);
+ addValueAtIndex(timeBucketIndex, value);
+ }
+
+ @Override
+ public TimeSeries build() {
+ return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames,
_tagValues);
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java
new file mode 100644
index 0000000000..93cdab77d4
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/MinTimeSeriesBuilder.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series.builders;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+
+
+/**
+ * MinSeriesBuilder is a series builder that computes the minimum value in
each time bucket.
+ * <b>Context:</b>We provide some ready to use implementations for some of the
most common use-cases in the SPI. This
+ * reduces redundancy and also serves as a reference implementation for
language developers.
+ */
+public class MinTimeSeriesBuilder extends BaseTimeSeriesBuilder {
+ private final Double[] _values;
+
+ public MinTimeSeriesBuilder(String id, TimeBuckets timeBuckets, List<String>
tagNames, Object[] tagValues) {
+ super(id, null, timeBuckets, tagNames, tagValues);
+ _values = new Double[timeBuckets.getNumBuckets()];
+ }
+
+ @Override
+ public void addValueAtIndex(int timeBucketIndex, Double value) {
+ if (_values[timeBucketIndex] == null || value < _values[timeBucketIndex]) {
+ _values[timeBucketIndex] = value;
+ }
+ }
+
+ @Override
+ public void addValue(long timeValue, Double value) {
+ int timeBucketIndex = _timeBuckets.resolveIndex(timeValue);
+ addValueAtIndex(timeBucketIndex, value);
+ }
+
+ @Override
+ public TimeSeries build() {
+ return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames,
_tagValues);
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java
new file mode 100644
index 0000000000..2cf723b8e4
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/builders/SummingTimeSeriesBuilder.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.series.builders;
+
+import java.util.List;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+
+
+/**
+ * SummingSeriesBuilder is a series builder that computes the sum of all
values in each time bucket.
+ * <b>Context:</b>We provide some ready to use implementations for some of the
most common use-cases in the SPI. This
+ * reduces redundancy and also serves as a reference implementation for
language developers.
+ */
+public class SummingTimeSeriesBuilder extends BaseTimeSeriesBuilder {
+ private final Double[] _values;
+
+ public SummingTimeSeriesBuilder(String id, TimeBuckets timeBuckets,
List<String> tagNames, Object[] tagValues) {
+ super(id, null, timeBuckets, tagNames, tagValues);
+ _values = new Double[timeBuckets.getNumBuckets()];
+ }
+
+ @Override
+ public void addValueAtIndex(int timeBucketIndex, Double value) {
+ _values[timeBucketIndex] = (_values[timeBucketIndex] == null ? 0 :
_values[timeBucketIndex]) + value;
+ }
+
+ @Override
+ public void addValue(long timeValue, Double value) {
+ int timeBucketIndex = _timeBuckets.resolveIndex(timeValue);
+ addValueAtIndex(timeBucketIndex, value);
+ }
+
+ @Override
+ public TimeSeries build() {
+ return new TimeSeries(_id, null, _timeBuckets, _values, _tagNames,
_tagValues);
+ }
+}
diff --git
a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
new file mode 100644
index 0000000000..ff74b6ef35
--- /dev/null
+++
b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.spi.plan.serde;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.ScanFilterAndProjectPlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesPlanSerdeTest {
+ @Test
+ public void testSerdeForScanFilterProjectNode() {
+ ScanFilterAndProjectPlanNode scanFilterAndProjectPlanNode = new
ScanFilterAndProjectPlanNode(
+ "sfp#0", new ArrayList<>(), "myTable", "myTimeColumn",
TimeUnit.MILLISECONDS,
+ 0L, "myFilterExpression", "myValueExpression",
+ new AggInfo("SUM"), new ArrayList<>()
+ );
+ BaseTimeSeriesPlanNode planNode =
+
TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(scanFilterAndProjectPlanNode));
+ assertTrue(planNode instanceof ScanFilterAndProjectPlanNode);
+ ScanFilterAndProjectPlanNode deserializedNode =
(ScanFilterAndProjectPlanNode) planNode;
+ assertEquals(deserializedNode.getTableName(), "myTable");
+ assertEquals(deserializedNode.getTimeColumn(), "myTimeColumn");
+ assertEquals(deserializedNode.getTimeUnit(), TimeUnit.MILLISECONDS);
+ assertEquals(deserializedNode.getOffset(), 0L);
+ assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression");
+ assertEquals(deserializedNode.getValueExpression(), "myValueExpression");
+ assertNotNull(deserializedNode.getAggInfo());
+ assertEquals(deserializedNode.getGroupByColumns().size(), 0);
+ }
+}
diff --git a/pinot-timeseries/pom.xml b/pinot-timeseries/pom.xml
new file mode 100644
index 0000000000..f49d0cb922
--- /dev/null
+++ b/pinot-timeseries/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <packaging>pom</packaging>
+
+ <artifactId>pinot-timeseries</artifactId>
+
+ <properties>
+ <pinot.root>${basedir}/..</pinot.root>
+ </properties>
+
+ <modules>
+ <module>pinot-timeseries-spi</module>
+ </modules>
+
+</project>
diff --git a/pom.xml b/pom.xml
index 066b542250..3f701f474f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@
<module>pinot-compatibility-verifier</module>
<module>pinot-query-planner</module>
<module>pinot-query-runtime</module>
+ <module>pinot-timeseries</module>
</modules>
<licenses>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]