This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6e9a70fd15 Timestamp in MSE (#14690)
6e9a70fd15 is described below
commit 6e9a70fd15f268967126fe303528e0ab1e7b0ee8
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Jan 7 10:26:43 2025 +0100
Timestamp in MSE (#14690)
---
.../BaseSingleStageBrokerRequestHandler.java | 20 +--
.../pinot/common/utils/request/RequestUtils.java | 31 ++++
.../core/plan/maker/InstancePlanMakerImplV2.java | 14 ++
.../tests/BaseClusterIntegrationTest.java | 8 +-
.../pinot/integration/tests/ClusterTest.java | 2 +-
.../tests/ExplainIntegrationTestTrait.java | 123 +++++++++++++
.../MultiStageEngineExplainIntegrationTest.java | 57 +-----
.../tests/custom/TimestampIndexMseTest.java | 200 +++++++++++++++++++++
.../tests/custom/TimestampIndexSseTest.java | 146 +++++++++++++++
.../plan/server/ServerPlanRequestVisitor.java | 30 +++-
10 files changed, 547 insertions(+), 84 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index b031e47617..72b69a24fa 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -106,7 +106,6 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.DataSizeUtils;
-import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
@@ -938,24 +937,7 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
return;
}
Function function = expression.getFunctionCall();
- switch (function.getOperator()) {
- case "datetrunc":
- String granularString =
function.getOperands().get(0).getLiteral().getStringValue().toUpperCase();
- Expression timeExpression = function.getOperands().get(1);
- if (((function.getOperandsSize() == 2) || (function.getOperandsSize()
== 3 && "MILLISECONDS".equalsIgnoreCase(
- function.getOperands().get(2).getLiteral().getStringValue()))) &&
TimestampIndexUtils.isValidGranularity(
- granularString) && timeExpression.getIdentifier() != null) {
- String timeColumn = timeExpression.getIdentifier().getName();
- String timeColumnWithGranularity =
TimestampIndexUtils.getColumnWithGranularity(timeColumn, granularString);
- if (timestampIndexColumns.contains(timeColumnWithGranularity)) {
- pinotQuery.putToExpressionOverrideHints(expression,
-
RequestUtils.getIdentifierExpression(timeColumnWithGranularity));
- }
- }
- break;
- default:
- break;
- }
+ RequestUtils.applyTimestampIndexOverrideHints(expression, pinotQuery,
timestampIndexColumns::contains);
function.getOperands()
.forEach(operand -> setTimestampIndexExpressionOverrideHints(operand,
timestampIndexColumns, pinotQuery));
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index b8c013427d..2d1e38d84a 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -42,6 +43,7 @@ import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNumericLiteral;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.request.DataSource;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.ExpressionType;
@@ -53,6 +55,7 @@ import
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
+import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlCompilationException;
@@ -631,4 +634,32 @@ public class RequestUtils {
public static Map<String, String> getOptionsFromString(String optionStr) {
return
Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(optionStr);
}
+
+ public static void applyTimestampIndexOverrideHints(Expression expression,
PinotQuery query) {
+ applyTimestampIndexOverrideHints(expression, query,
timeColumnWithGranularity -> true);
+ }
+
+ public static void applyTimestampIndexOverrideHints(
+ Expression expression, PinotQuery query, Predicate<String>
timeColumnWithGranularityPredicate
+ ) {
+ if (!expression.isSetFunctionCall()) {
+ return;
+ }
+ Function function = expression.getFunctionCall();
+ if
(!function.getOperator().equalsIgnoreCase(TransformFunctionType.DATE_TRUNC.getName()))
{
+ return;
+ }
+ String granularString =
function.getOperands().get(0).getLiteral().getStringValue().toUpperCase();
+ Expression timeExpression = function.getOperands().get(1);
+ if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3
&& "MILLISECONDS".equalsIgnoreCase(
+ function.getOperands().get(2).getLiteral().getStringValue()))) &&
TimestampIndexUtils.isValidGranularity(
+ granularString) && timeExpression.getIdentifier() != null) {
+ String timeColumn = timeExpression.getIdentifier().getName();
+ String timeColumnWithGranularity =
TimestampIndexUtils.getColumnWithGranularity(timeColumn, granularString);
+
+ if (timeColumnWithGranularityPredicate.test(timeColumnWithGranularity)) {
+ query.putToExpressionOverrideHints(expression,
getIdentifierExpression(timeColumnWithGranularity));
+ }
+ }
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index cadce4bcf6..ca74245606 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
@@ -46,6 +47,7 @@ import org.apache.pinot.core.plan.SelectionPlanNode;
import org.apache.pinot.core.plan.StreamingInstanceResponsePlanNode;
import org.apache.pinot.core.plan.StreamingSelectionPlanNode;
import org.apache.pinot.core.plan.TimeSeriesPlanNode;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.executor.ResultsBlockStreamer;
import org.apache.pinot.core.query.prefetch.FetchPlanner;
import org.apache.pinot.core.query.prefetch.FetchPlannerRegistry;
@@ -321,6 +323,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public PlanNode makeStreamingSegmentPlanNode(SegmentContext segmentContext,
QueryContext queryContext) {
if (QueryContextUtils.isSelectionOnlyQuery(queryContext) &&
queryContext.getLimit() != 0) {
// Use streaming operator only for non-empty selection-only query
+ rewriteQueryContextWithHints(queryContext,
segmentContext.getIndexSegment());
return new StreamingSelectionPlanNode(segmentContext, queryContext);
} else {
return makeSegmentPlanNode(segmentContext, queryContext);
@@ -344,6 +347,17 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
selectExpressions.replaceAll(
expression -> overrideWithExpressionHints(expression, indexSegment,
expressionOverrideHints));
+ List<Pair<AggregationFunction, FilterContext>> filtAggrFuns =
queryContext.getFilteredAggregationFunctions();
+ if (filtAggrFuns != null) {
+ for (Pair<AggregationFunction, FilterContext>
filteredAggregationFunction : filtAggrFuns) {
+ FilterContext right = filteredAggregationFunction.getRight();
+ if (right != null) {
+ Predicate predicate = right.getPredicate();
+ predicate.setLhs(overrideWithExpressionHints(predicate.getLhs(),
indexSegment, expressionOverrideHints));
+ }
+ }
+ }
+
List<ExpressionContext> groupByExpressions =
queryContext.getGroupByExpressions();
if (CollectionUtils.isNotEmpty(groupByExpressions)) {
groupByExpressions.replaceAll(
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index a3b46ad270..a6cbad653e 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -186,22 +186,22 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
@Nullable
protected List<String> getInvertedIndexColumns() {
- return DEFAULT_INVERTED_INDEX_COLUMNS;
+ return new ArrayList<>(DEFAULT_INVERTED_INDEX_COLUMNS);
}
@Nullable
protected List<String> getNoDictionaryColumns() {
- return DEFAULT_NO_DICTIONARY_COLUMNS;
+ return new ArrayList<>(DEFAULT_NO_DICTIONARY_COLUMNS);
}
@Nullable
protected List<String> getRangeIndexColumns() {
- return DEFAULT_RANGE_INDEX_COLUMNS;
+ return new ArrayList<>(DEFAULT_RANGE_INDEX_COLUMNS);
}
@Nullable
protected List<String> getBloomFilterColumns() {
- return DEFAULT_BLOOM_FILTER_COLUMNS;
+ return new ArrayList<>(DEFAULT_BLOOM_FILTER_COLUMNS);
}
@Nullable
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 1338e9f529..05e534a203 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -509,7 +509,7 @@ public abstract class ClusterTest extends ControllerTest {
/**
* Queries the broker's sql query endpoint (/query/sql)
*/
- protected JsonNode postQuery(String query)
+ public JsonNode postQuery(String query)
throws Exception {
return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(),
useMultiStageQueryEngine()), null,
getExtraQueryProperties());
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
new file mode 100644
index 0000000000..cbe0ffd09f
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.intellij.lang.annotations.Language;
+import org.testng.Assert;
+
+
+public interface ExplainIntegrationTestTrait {
+
+ JsonNode postQuery(@Language("sql") String query)
+ throws Exception;
+
+ default void explainLogical(@Language("sql") String query, String expected) {
+ try {
+ JsonNode jsonNode = postQuery("explain plan without implementation for "
+ query);
+ JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
+
+ Assert.assertEquals(plan.asText(), expected);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ default void explainSse(boolean verbose, @Language("sql") String query,
Object... expected) {
+ try {
+ @Language("sql")
+ String actualQuery = "SET useMultistageEngine=false; explain plan for "
+ query;
+ if (verbose) {
+ actualQuery = "SET explainPlanVerbose=true; " + actualQuery;
+ }
+ JsonNode jsonNode = postQuery(actualQuery);
+ JsonNode plan = jsonNode.get("resultTable").get("rows");
+ List<String> planAsStrList = (List<String>)
JsonUtils.jsonNodeToObject(plan, List.class).stream()
+ .map(Object::toString)
+ .collect(Collectors.toList());
+
+ if (planAsStrList.size() != expected.length) {
+ Assert.fail("Actual: " + planAsStrList + ", Expected: " +
Arrays.toString(expected)
+ + ". Size mismatch. Actual: " + planAsStrList.size() + ",
Expected: " + expected.length);
+ }
+ for (int i = 0; i < planAsStrList.size(); i++) {
+ String planAsStr = planAsStrList.get(i);
+ Object expectedObj = expected[i];
+ if (expectedObj instanceof Pattern) {
+ Assert.assertTrue(((Pattern)
expectedObj).matcher(planAsStr).matches(),
+ "Pattern doesn't match. Actual: " + planAsStr + ", Expected: " +
expectedObj
+ + ", Actual complete plan: " + planAsStrList);
+ } else if (expectedObj instanceof String) {
+ Assert.assertEquals(planAsStr, expectedObj, "Actual: " + planAsStr +
", Expected: " + expectedObj
+ + ", Actual complete plan: " + planAsStrList);
+ } else {
+ Assert.fail("Expected object should be either Pattern or String in
position " + i + ". Actual: "
+ + expectedObj + " of type " + expectedObj.getClass());
+ }
+ }
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ default void explainSse(@Language("sql") String query, Object... expected) {
+ explainSse(false, query, expected);
+ }
+
+ default void explain(@Language("sql") String query, String expected) {
+ try {
+ JsonNode jsonNode = postQuery("explain plan for " + query);
+ JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
+
+ Assert.assertEquals(plan.asText(), expected);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ default void explainVerbose(@Language("sql") String query, String expected) {
+ try {
+ JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan
for " + query);
+ JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
+
+ String actual = plan.asText()
+ .replaceAll("numDocs=\\[[^\\]]*]", "numDocs=[any]")
+ .replaceAll("segment=\\[[^\\]]*]", "segment=[any]")
+ .replaceAll("totalDocs=\\[[^\\]]*]", "totalDocs=[any]");
+
+
+ Assert.assertEquals(actual, expected);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java
index 8303a583d3..52c5687801 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.integration.tests;
-import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.util.List;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -26,16 +25,15 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
-import org.intellij.lang.annotations.Language;
import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-public class MultiStageEngineExplainIntegrationTest extends
BaseClusterIntegrationTest {
+public class MultiStageEngineExplainIntegrationTest extends
BaseClusterIntegrationTest
+ implements ExplainIntegrationTestTrait {
@BeforeClass
public void setUp()
@@ -78,7 +76,6 @@ public class MultiStageEngineExplainIntegrationTest extends
BaseClusterIntegrati
@Test
public void simpleQuery() {
explain("SELECT 1 FROM mytable",
- //@formatter:off
"Execution Plan\n"
+ "PinotLogicalExchange(distribution=[broadcast])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
@@ -89,13 +86,11 @@ public class MultiStageEngineExplainIntegrationTest extends
BaseClusterIntegrati
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[120000])\n"
+ " FilterMatchEntireSegment(numDocs=[115545])\n");
- //@formatter:on
}
@Test
public void simpleQueryVerbose() {
explainVerbose("SELECT 1 FROM mytable",
- //@formatter:off
"Execution Plan\n"
+ "PinotLogicalExchange(distribution=[broadcast])\n"
+ " LeafStageCombineOperator(table=[mytable])\n"
@@ -161,17 +156,14 @@ public class MultiStageEngineExplainIntegrationTest
extends BaseClusterIntegrati
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[10000])\n"
+ " FilterMatchEntireSegment(numDocs=[any])\n");
- //@formatter:on
}
@Test
public void simpleQueryLogical() {
explainLogical("SELECT 1 FROM mytable",
- //@formatter:off
"Execution Plan\n"
+ "LogicalProject(EXPR$0=[1])\n"
+ " LogicalTableScan(table=[[default, mytable]])\n");
- //@formatter:on
}
@AfterClass
@@ -186,49 +178,4 @@ public class MultiStageEngineExplainIntegrationTest
extends BaseClusterIntegrati
FileUtils.deleteDirectory(_tempDir);
}
-
- private void explainVerbose(@Language("sql") String query, String expected) {
- try {
- JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan
for " + query);
- JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
-
- String actual = plan.asText()
- .replaceAll("numDocs=\\[[^\\]]*]", "numDocs=[any]")
- .replaceAll("segment=\\[[^\\]]*]", "segment=[any]")
- .replaceAll("totalDocs=\\[[^\\]]*]", "totalDocs=[any]");
-
-
- Assert.assertEquals(actual, expected);
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private void explain(@Language("sql") String query, String expected) {
- try {
- JsonNode jsonNode = postQuery("explain plan for " + query);
- JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
-
- Assert.assertEquals(plan.asText(), expected);
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private void explainLogical(@Language("sql") String query, String expected) {
- try {
- JsonNode jsonNode = postQuery("set explainAskingServers=false; explain
plan for " + query);
- JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
-
- Assert.assertEquals(plan.asText(), expected);
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexMseTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexMseTest.java
new file mode 100644
index 0000000000..072b21f3bc
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexMseTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.ExplainIntegrationTestTrait;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TimestampConfig;
+import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TimestampIndexMseTest extends BaseClusterIntegrationTest
implements ExplainIntegrationTestTrait {
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServers(2);
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createOfflineTableConfig();
+ addTableConfig(tableConfig);
+
+ // Unpack the Avro files
+ List<File> avroFiles = unpackAvroData(_tempDir);
+
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), _tarDir);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+ String property =
CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN;
+ brokerConf.setProperty(property, "true");
+ }
+
+ @Test
+ public void timestampIndexSubstitutedInProjections() {
+ setUseMultiStageQueryEngine(true);
+ explain("SELECT datetrunc('SECOND',ArrTime) FROM mytable",
+ "Execution Plan\n"
+ + "PinotLogicalExchange(distribution=[broadcast])\n"
+ + " LeafStageCombineOperator(table=[mytable])\n"
+ + " StreamingInstanceResponse\n"
+ + " StreamingCombineSelect\n"
+ + " SelectStreaming(table=[mytable], totalDocs=[115545])\n"
+ + " Project(columns=[[$ArrTime$SECOND]])\n"
+ + " DocIdSet(maxDocs=[120000])\n"
+ + " FilterMatchEntireSegment(numDocs=[115545])\n");
+ }
+
+ @Test
+ public void timestampIndexSubstitutedInFilters() {
+ setUseMultiStageQueryEngine(true);
+ explain("SELECT 1 FROM mytable where datetrunc('SECOND',ArrTime) > 1",
+ "Execution Plan\n"
+ + "PinotLogicalExchange(distribution=[broadcast])\n"
+ + " LeafStageCombineOperator(table=[mytable])\n"
+ + " StreamingInstanceResponse\n"
+ + " StreamingCombineSelect\n"
+ + " SelectStreaming(table=[mytable], totalDocs=[115545])\n"
+ + " Transform(expressions=[['1']])\n"
+ + " Project(columns=[[]])\n"
+ + " DocIdSet(maxDocs=[120000])\n"
+ + " FilterRangeIndex(predicate=[$ArrTime$SECOND >
'1'], indexLookUp=[range_index], "
+ + "operator=[RANGE])\n");
+ }
+
+ @Test
+ public void timestampIndexSubstitutedInAggregateFilter() {
+ setUseMultiStageQueryEngine(true);
+ explain("SELECT sum(case when datetrunc('SECOND',ArrTime) > 1 then 2 else
0 end) FROM mytable",
+ "Execution Plan\n"
+ + "LogicalProject(EXPR$0=[CASE(=($1, 0), null:BIGINT, $0)])\n"
+ + " PinotLogicalAggregate(group=[{}], agg#0=[$SUM0($0)],
agg#1=[COUNT($1)], aggType=[FINAL])\n"
+ + " PinotLogicalExchange(distribution=[hash])\n"
+ + " LeafStageCombineOperator(table=[mytable])\n"
+ + " StreamingInstanceResponse\n"
+ + " CombineAggregate\n"
+ + " AggregateFiltered(aggregations=[[sum('2'),
count(*)]])\n"
+ + " Transform(expressions=[['2']])\n"
+ + " Project(columns=[[]])\n"
+ + " DocIdSet(maxDocs=[120000])\n"
+ + " FilterRangeIndex(predicate=[$ArrTime$SECOND
> '1'], indexLookUp=[range_index], "
+ + "operator=[RANGE])\n"
+ + " Project(columns=[[]])\n"
+ + " DocIdSet(maxDocs=[120000])\n"
+ + "
FilterMatchEntireSegment(numDocs=[115545])\n");
+ }
+
+ @Test
+ public void timestampIndexSubstitutedInGroupBy() {
+ setUseMultiStageQueryEngine(true);
+ explain("SELECT count(*) FROM mytable group by
datetrunc('SECOND',ArrTime)",
+ "Execution Plan\n"
+ + "LogicalProject(EXPR$0=[$1])\n"
+ + " PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)],
aggType=[FINAL])\n"
+ + " PinotLogicalExchange(distribution=[hash[0]])\n"
+ + " LeafStageCombineOperator(table=[mytable])\n"
+ + " StreamingInstanceResponse\n"
+ + " CombineGroupBy\n"
+ + " GroupBy(groupKeys=[[$ArrTime$SECOND]],
aggregations=[[count(*)]])\n"
+ + " Project(columns=[[$ArrTime$SECOND]])\n"
+ + " DocIdSet(maxDocs=[120000])\n"
+ + "
FilterMatchEntireSegment(numDocs=[115545])\n");
+ }
+
+ @Test
+ public void timestampIndexSubstitutedInJoinMSE() {
+ setUseMultiStageQueryEngine(true);
+ explain("SELECT 1 "
+ + "FROM mytable as a1 "
+ + "join mytable as a2 "
+ + "on datetrunc('SECOND',a1.ArrTime) =
datetrunc('DAY',a2.ArrTime)",
+ "Execution Plan\n"
+ + "LogicalProject(EXPR$0=[1])\n"
+ + " LogicalJoin(condition=[=($0, $1)], joinType=[inner])\n"
+ + " PinotLogicalExchange(distribution=[hash[0]])\n"
+ + " LeafStageCombineOperator(table=[mytable])\n"
+ + " StreamingInstanceResponse\n"
+ + " StreamingCombineSelect\n"
+ + " SelectStreaming(table=[mytable],
totalDocs=[115545])\n"
+ + " Project(columns=[[$ArrTime$SECOND]])\n" //
substituted because we have SECOND granularity
+ + " DocIdSet(maxDocs=[120000])\n"
+ + " FilterMatchEntireSegment(numDocs=[115545])\n"
+ + " PinotLogicalExchange(distribution=[hash[0]])\n"
+ + " LeafStageCombineOperator(table=[mytable])\n"
+ + " StreamingInstanceResponse\n"
+ + " StreamingCombineSelect\n"
+ + " SelectStreaming(table=[mytable],
totalDocs=[115545])\n"
+ + "
Transform(expressions=[[datetrunc('DAY',ArrTime)]])\n" // we don't set the DAY
granularity
+ + " Project(columns=[[ArrTime]])\n"
+ + " DocIdSet(maxDocs=[120000])\n"
+ + "
FilterMatchEntireSegment(numDocs=[115545])\n");
+ }
+
+
+ protected TableConfig createOfflineTableConfig() {
+ String colName = "ArrTime";
+
+ TableConfig tableConfig = super.createOfflineTableConfig();
+ List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+ if (fieldConfigList == null) {
+ fieldConfigList = new ArrayList<>();
+ tableConfig.setFieldConfigList(fieldConfigList);
+ } else {
+ fieldConfigList.stream()
+ .filter(fieldConfig -> fieldConfig.getName().equals(colName))
+ .findFirst()
+ .ifPresent(
+ fieldConfig -> {
+ throw new IllegalStateException("Time column already exists in
the field config list");
+ }
+ );
+ }
+ FieldConfig newTimeFieldConfig = new FieldConfig.Builder(colName)
+ .withTimestampConfig(
+ new TimestampConfig(List.of(TimestampIndexGranularity.SECOND))
+ )
+ .build();
+ fieldConfigList.add(newTimeFieldConfig);
+ return tableConfig;
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexSseTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexSseTest.java
new file mode 100644
index 0000000000..0620778693
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexSseTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.ExplainIntegrationTestTrait;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TimestampConfig;
+import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TimestampIndexSseTest extends BaseClusterIntegrationTest
implements ExplainIntegrationTestTrait {
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServers(2);
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createOfflineTableConfig();
+ addTableConfig(tableConfig);
+
+ // Unpack the Avro files
+ List<File> avroFiles = unpackAvroData(_tempDir);
+
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), _tarDir);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ @Test
+ public void timestampIndexSubstitutedInProjections() {
+ setUseMultiStageQueryEngine(false);
+ explainSse("SELECT datetrunc('SECOND',ArrTime) FROM mytable",
+ "[BROKER_REDUCE(limit:10), 1, 0]",
+ "[COMBINE_SELECT, 2, 1]",
+ "[PLAN_START(numSegmentsForThisPlan:1), -1, -1]",
+ "[SELECT(selectList:$ArrTime$SECOND), 3, 2]",
+ "[PROJECT($ArrTime$SECOND), 4, 3]",
+ "[DOC_ID_SET, 5, 4]",
+ Pattern.compile("\\[FILTER_MATCH_ENTIRE_SEGMENT\\(docs:[0-9]+\\), 6,
5]"));
+ }
+
+ @Test
+ public void timestampIndexSubstitutedInFilters() {
+ setUseMultiStageQueryEngine(false);
+ explainSse("SELECT ArrTime FROM mytable where datetrunc('SECOND',ArrTime)
> 1",
+ "[BROKER_REDUCE(limit:10), 1, 0]",
+ "[COMBINE_SELECT, 2, 1]",
+ "[PLAN_START(numSegmentsForThisPlan:12), -1, -1]",
+ "[SELECT(selectList:ArrTime), 3, 2]",
+ "[PROJECT(ArrTime), 4, 3]",
+ "[DOC_ID_SET, 5, 4]",
+
"[FILTER_RANGE_INDEX(indexLookUp:range_index,operator:RANGE,predicate:$ArrTime$SECOND
> '1'), 6, 5]");
+ }
+
+ @Test
+ public void timestampIndexSubstitutedInAggregateFilter() {
+ setUseMultiStageQueryEngine(false);
+ explainSse("SELECT sum(case when datetrunc('SECOND',ArrTime) > 1 then 2
else 0 end) FROM mytable",
+ "[BROKER_REDUCE(limit:10), 1, 0]",
+ "[COMBINE_AGGREGATE, 2, 1]",
+ "[PLAN_START(numSegmentsForThisPlan:1), -1, -1]",
+
"[AGGREGATE(aggregations:sum(case(greater_than($ArrTime$SECOND,'1'),'2','0'))),
3, 2]",
+ "[TRANSFORM(case(greater_than($ArrTime$SECOND,'1'),'2','0')), 4, 3]",
+ "[PROJECT($ArrTime$SECOND), 5, 4]",
+ "[DOC_ID_SET, 6, 5]",
+ Pattern.compile("\\[FILTER_MATCH_ENTIRE_SEGMENT\\(docs:[0-9]+\\), 7,
6]"));
+ }
+
+ @Test
+ public void timestampIndexSubstitutedInGroupBy() {
+ setUseMultiStageQueryEngine(false);
+ explainSse("SELECT count(*) FROM mytable group by
datetrunc('SECOND',ArrTime)",
+ "[BROKER_REDUCE(limit:10), 1, 0]",
+ "[COMBINE_GROUP_BY, 2, 1]",
+ "[PLAN_START(numSegmentsForThisPlan:1), -1, -1]",
+ "[GROUP_BY(groupKeys:$ArrTime$SECOND, aggregations:count(*)), 3, 2]",
+ "[PROJECT($ArrTime$SECOND), 4, 3]",
+ "[DOC_ID_SET, 5, 4]",
+ Pattern.compile("\\[FILTER_MATCH_ENTIRE_SEGMENT\\(docs:[0-9]+\\), 6,
5]"));
+ }
+
+ protected TableConfig createOfflineTableConfig() {
+ String colName = "ArrTime";
+
+ TableConfig tableConfig = super.createOfflineTableConfig();
+ List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+ if (fieldConfigList == null) {
+ fieldConfigList = new ArrayList<>();
+ tableConfig.setFieldConfigList(fieldConfigList);
+ } else {
+ fieldConfigList.stream()
+ .filter(fieldConfig -> fieldConfig.getName().equals(colName))
+ .findFirst()
+ .ifPresent(
+ fieldConfig -> {
+ throw new IllegalStateException("Time column already exists in
the field config list");
+ }
+ );
+ }
+ FieldConfig newTimeFieldConfig = new FieldConfig.Builder(colName)
+ .withTimestampConfig(
+ new TimestampConfig(List.of(TimestampIndexGranularity.SECOND))
+ )
+ .build();
+ fieldConfigList.add(newTimeFieldConfig);
+ return tableConfig;
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index 1ac11809aa..8db3784719 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -27,6 +27,7 @@ import
org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.request.DataSource;
import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.request.RequestUtils;
@@ -76,9 +77,12 @@ public class ServerPlanRequestVisitor implements
PlanNodeVisitor<Void, ServerPla
if (!groupByList.isEmpty()) {
pinotQuery.setGroupByList(groupByList);
}
- pinotQuery.setSelectList(
- CalciteRexExpressionParser.convertAggregateList(groupByList,
node.getAggCalls(), node.getFilterArgs(),
- pinotQuery));
+ List<Expression> selectList =
CalciteRexExpressionParser.convertAggregateList(groupByList, node.getAggCalls(),
+ node.getFilterArgs(), pinotQuery);
+ for (Expression expression : selectList) {
+ applyTimestampIndex(expression, pinotQuery);
+ }
+ pinotQuery.setSelectList(selectList);
if (node.getAggType() == AggregateNode.AggType.DIRECT) {
pinotQuery.putToQueryOptions(CommonConstants.Broker.Request.QueryOptionKey.SERVER_RETURN_FINAL_RESULT,
"true");
} else if (node.isLeafReturnFinalResult()) {
@@ -127,7 +131,9 @@ public class ServerPlanRequestVisitor implements
PlanNodeVisitor<Void, ServerPla
if (visit(node.getInputs().get(0), context)) {
PinotQuery pinotQuery = context.getPinotQuery();
if (pinotQuery.getFilterExpression() == null) {
-
pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(),
pinotQuery));
+ Expression expression =
CalciteRexExpressionParser.toExpression(node.getCondition(), pinotQuery);
+ applyTimestampIndex(expression, pinotQuery);
+ pinotQuery.setFilterExpression(expression);
} else {
// if filter is already applied then it cannot have another one on
leaf.
context.setLeafStageBoundaryNode(node.getInputs().get(0));
@@ -191,7 +197,11 @@ public class ServerPlanRequestVisitor implements
PlanNodeVisitor<Void, ServerPla
public Void visitProject(ProjectNode node, ServerPlanRequestContext context)
{
if (visit(node.getInputs().get(0), context)) {
PinotQuery pinotQuery = context.getPinotQuery();
-
pinotQuery.setSelectList(CalciteRexExpressionParser.convertRexNodes(node.getProjects(),
pinotQuery));
+ List<Expression> selectList =
CalciteRexExpressionParser.convertRexNodes(node.getProjects(), pinotQuery);
+ for (Expression expression : selectList) {
+ applyTimestampIndex(expression, pinotQuery);
+ }
+ pinotQuery.setSelectList(selectList);
}
return null;
}
@@ -249,4 +259,14 @@ public class ServerPlanRequestVisitor implements
PlanNodeVisitor<Void, ServerPla
node.visit(this, context);
return context.getLeafStageBoundaryNode() == null;
}
+
+ private void applyTimestampIndex(Expression expression, PinotQuery
pinotQuery) {
+ RequestUtils.applyTimestampIndexOverrideHints(expression, pinotQuery);
+ Function functionCall = expression.getFunctionCall();
+ if (expression.isSetFunctionCall()) {
+ for (Expression operand : functionCall.getOperands()) {
+ applyTimestampIndex(operand, pinotQuery);
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]