This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e9a70fd15 Timestamp in MSE (#14690)
6e9a70fd15 is described below

commit 6e9a70fd15f268967126fe303528e0ab1e7b0ee8
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Jan 7 10:26:43 2025 +0100

    Timestamp in MSE (#14690)
---
 .../BaseSingleStageBrokerRequestHandler.java       |  20 +--
 .../pinot/common/utils/request/RequestUtils.java   |  31 ++++
 .../core/plan/maker/InstancePlanMakerImplV2.java   |  14 ++
 .../tests/BaseClusterIntegrationTest.java          |   8 +-
 .../pinot/integration/tests/ClusterTest.java       |   2 +-
 .../tests/ExplainIntegrationTestTrait.java         | 123 +++++++++++++
 .../MultiStageEngineExplainIntegrationTest.java    |  57 +-----
 .../tests/custom/TimestampIndexMseTest.java        | 200 +++++++++++++++++++++
 .../tests/custom/TimestampIndexSseTest.java        | 146 +++++++++++++++
 .../plan/server/ServerPlanRequestVisitor.java      |  30 +++-
 10 files changed, 547 insertions(+), 84 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index b031e47617..72b69a24fa 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -106,7 +106,6 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.spi.utils.DataSizeUtils;
-import org.apache.pinot.spi.utils.TimestampIndexUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.FilterKind;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
@@ -938,24 +937,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       return;
     }
     Function function = expression.getFunctionCall();
-    switch (function.getOperator()) {
-      case "datetrunc":
-        String granularString = 
function.getOperands().get(0).getLiteral().getStringValue().toUpperCase();
-        Expression timeExpression = function.getOperands().get(1);
-        if (((function.getOperandsSize() == 2) || (function.getOperandsSize() 
== 3 && "MILLISECONDS".equalsIgnoreCase(
-            function.getOperands().get(2).getLiteral().getStringValue()))) && 
TimestampIndexUtils.isValidGranularity(
-            granularString) && timeExpression.getIdentifier() != null) {
-          String timeColumn = timeExpression.getIdentifier().getName();
-          String timeColumnWithGranularity = 
TimestampIndexUtils.getColumnWithGranularity(timeColumn, granularString);
-          if (timestampIndexColumns.contains(timeColumnWithGranularity)) {
-            pinotQuery.putToExpressionOverrideHints(expression,
-                
RequestUtils.getIdentifierExpression(timeColumnWithGranularity));
-          }
-        }
-        break;
-      default:
-        break;
-    }
+    RequestUtils.applyTimestampIndexOverrideHints(expression, pinotQuery, 
timestampIndexColumns::contains);
     function.getOperands()
         .forEach(operand -> setTimestampIndexExpressionOverrideHints(operand, 
timestampIndexColumns, pinotQuery));
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index b8c013427d..2d1e38d84a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -42,6 +43,7 @@ import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNumericLiteral;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.function.TransformFunctionType;
 import org.apache.pinot.common.request.DataSource;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.ExpressionType;
@@ -53,6 +55,7 @@ import 
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
+import org.apache.pinot.spi.utils.TimestampIndexUtils;
 import org.apache.pinot.sql.FilterKind;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.sql.parsers.SqlCompilationException;
@@ -631,4 +634,32 @@ public class RequestUtils {
   public static Map<String, String> getOptionsFromString(String optionStr) {
     return 
Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(optionStr);
   }
+
+  public static void applyTimestampIndexOverrideHints(Expression expression, 
PinotQuery query) {
+    applyTimestampIndexOverrideHints(expression, query, 
timeColumnWithGranularity -> true);
+  }
+
+  public static void applyTimestampIndexOverrideHints(
+      Expression expression, PinotQuery query, Predicate<String> 
timeColumnWithGranularityPredicate
+  ) {
+    if (!expression.isSetFunctionCall()) {
+      return;
+    }
+    Function function = expression.getFunctionCall();
+    if 
(!function.getOperator().equalsIgnoreCase(TransformFunctionType.DATE_TRUNC.getName()))
 {
+      return;
+    }
+    String granularString = 
function.getOperands().get(0).getLiteral().getStringValue().toUpperCase();
+    Expression timeExpression = function.getOperands().get(1);
+    if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 
&& "MILLISECONDS".equalsIgnoreCase(
+        function.getOperands().get(2).getLiteral().getStringValue()))) && 
TimestampIndexUtils.isValidGranularity(
+        granularString) && timeExpression.getIdentifier() != null) {
+      String timeColumn = timeExpression.getIdentifier().getName();
+      String timeColumnWithGranularity = 
TimestampIndexUtils.getColumnWithGranularity(timeColumn, granularString);
+
+      if (timeColumnWithGranularityPredicate.test(timeColumnWithGranularity)) {
+        query.putToExpressionOverrideHints(expression, 
getIdentifierExpression(timeColumnWithGranularity));
+      }
+    }
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index cadce4bcf6..ca74245606 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
@@ -46,6 +47,7 @@ import org.apache.pinot.core.plan.SelectionPlanNode;
 import org.apache.pinot.core.plan.StreamingInstanceResponsePlanNode;
 import org.apache.pinot.core.plan.StreamingSelectionPlanNode;
 import org.apache.pinot.core.plan.TimeSeriesPlanNode;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.executor.ResultsBlockStreamer;
 import org.apache.pinot.core.query.prefetch.FetchPlanner;
 import org.apache.pinot.core.query.prefetch.FetchPlannerRegistry;
@@ -321,6 +323,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   public PlanNode makeStreamingSegmentPlanNode(SegmentContext segmentContext, 
QueryContext queryContext) {
     if (QueryContextUtils.isSelectionOnlyQuery(queryContext) && 
queryContext.getLimit() != 0) {
       // Use streaming operator only for non-empty selection-only query
+      rewriteQueryContextWithHints(queryContext, 
segmentContext.getIndexSegment());
       return new StreamingSelectionPlanNode(segmentContext, queryContext);
     } else {
       return makeSegmentPlanNode(segmentContext, queryContext);
@@ -344,6 +347,17 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     selectExpressions.replaceAll(
         expression -> overrideWithExpressionHints(expression, indexSegment, 
expressionOverrideHints));
 
+    List<Pair<AggregationFunction, FilterContext>> filtAggrFuns = 
queryContext.getFilteredAggregationFunctions();
+    if (filtAggrFuns != null) {
+      for (Pair<AggregationFunction, FilterContext> 
filteredAggregationFunction : filtAggrFuns) {
+        FilterContext right = filteredAggregationFunction.getRight();
+        if (right != null) {
+          Predicate predicate = right.getPredicate();
+          predicate.setLhs(overrideWithExpressionHints(predicate.getLhs(), 
indexSegment, expressionOverrideHints));
+        }
+      }
+    }
+
     List<ExpressionContext> groupByExpressions = 
queryContext.getGroupByExpressions();
     if (CollectionUtils.isNotEmpty(groupByExpressions)) {
       groupByExpressions.replaceAll(
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index a3b46ad270..a6cbad653e 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -186,22 +186,22 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
 
   @Nullable
   protected List<String> getInvertedIndexColumns() {
-    return DEFAULT_INVERTED_INDEX_COLUMNS;
+    return new ArrayList<>(DEFAULT_INVERTED_INDEX_COLUMNS);
   }
 
   @Nullable
   protected List<String> getNoDictionaryColumns() {
-    return DEFAULT_NO_DICTIONARY_COLUMNS;
+    return new ArrayList<>(DEFAULT_NO_DICTIONARY_COLUMNS);
   }
 
   @Nullable
   protected List<String> getRangeIndexColumns() {
-    return DEFAULT_RANGE_INDEX_COLUMNS;
+    return new ArrayList<>(DEFAULT_RANGE_INDEX_COLUMNS);
   }
 
   @Nullable
   protected List<String> getBloomFilterColumns() {
-    return DEFAULT_BLOOM_FILTER_COLUMNS;
+    return new ArrayList<>(DEFAULT_BLOOM_FILTER_COLUMNS);
   }
 
   @Nullable
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 1338e9f529..05e534a203 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -509,7 +509,7 @@ public abstract class ClusterTest extends ControllerTest {
   /**
    * Queries the broker's sql query endpoint (/query/sql)
    */
-  protected JsonNode postQuery(String query)
+  public JsonNode postQuery(String query)
       throws Exception {
     return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), 
useMultiStageQueryEngine()), null,
         getExtraQueryProperties());
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
new file mode 100644
index 0000000000..cbe0ffd09f
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.intellij.lang.annotations.Language;
+import org.testng.Assert;
+
+
+public interface ExplainIntegrationTestTrait {
+
+  JsonNode postQuery(@Language("sql") String query)
+      throws Exception;
+
+  default void explainLogical(@Language("sql") String query, String expected) {
+    try {
+      JsonNode jsonNode = postQuery("explain plan without implementation for " 
+ query);
+      JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
+
+      Assert.assertEquals(plan.asText(), expected);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  default void explainSse(boolean verbose, @Language("sql") String query, 
Object... expected) {
+    try {
+      @Language("sql")
+      String actualQuery = "SET useMultistageEngine=false; explain plan for " 
+ query;
+      if (verbose) {
+        actualQuery = "SET explainPlanVerbose=true; " + actualQuery;
+      }
+      JsonNode jsonNode = postQuery(actualQuery);
+      JsonNode plan = jsonNode.get("resultTable").get("rows");
+      List<String> planAsStrList = (List<String>) 
JsonUtils.jsonNodeToObject(plan, List.class).stream()
+          .map(Object::toString)
+          .collect(Collectors.toList());
+
+      if (planAsStrList.size() != expected.length) {
+        Assert.fail("Actual: " + planAsStrList + ", Expected: " + 
Arrays.toString(expected)
+                + ". Size mismatch. Actual: " + planAsStrList.size() + ", 
Expected: " + expected.length);
+      }
+      for (int i = 0; i < planAsStrList.size(); i++) {
+        String planAsStr = planAsStrList.get(i);
+        Object expectedObj = expected[i];
+        if (expectedObj instanceof Pattern) {
+          Assert.assertTrue(((Pattern) 
expectedObj).matcher(planAsStr).matches(),
+              "Pattern doesn't match. Actual: " + planAsStr + ", Expected: " + 
expectedObj
+              + ", Actual complete plan: " + planAsStrList);
+        } else if (expectedObj instanceof String) {
+          Assert.assertEquals(planAsStr, expectedObj, "Actual: " + planAsStr + 
", Expected: " + expectedObj
+            + ", Actual complete plan: " + planAsStrList);
+        } else {
+          Assert.fail("Expected object should be either Pattern or String in 
position " + i + ". Actual: "
+              + expectedObj + " of type " + expectedObj.getClass());
+        }
+      }
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  default void explainSse(@Language("sql") String query, Object... expected) {
+    explainSse(false, query, expected);
+  }
+
+  default void explain(@Language("sql") String query, String expected) {
+    try {
+      JsonNode jsonNode = postQuery("explain plan for " + query);
+      JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
+
+      Assert.assertEquals(plan.asText(), expected);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  default void explainVerbose(@Language("sql") String query, String expected) {
+    try {
+      JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan 
for " + query);
+      JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
+
+      String actual = plan.asText()
+          .replaceAll("numDocs=\\[[^\\]]*]", "numDocs=[any]")
+          .replaceAll("segment=\\[[^\\]]*]", "segment=[any]")
+          .replaceAll("totalDocs=\\[[^\\]]*]", "totalDocs=[any]");
+
+
+      Assert.assertEquals(actual, expected);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java
index 8303a583d3..52c5687801 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineExplainIntegrationTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.integration.tests;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.util.List;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -26,16 +25,15 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
-import org.intellij.lang.annotations.Language;
 import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 
-public class MultiStageEngineExplainIntegrationTest extends 
BaseClusterIntegrationTest {
+public class MultiStageEngineExplainIntegrationTest extends 
BaseClusterIntegrationTest
+    implements ExplainIntegrationTestTrait {
 
   @BeforeClass
   public void setUp()
@@ -78,7 +76,6 @@ public class MultiStageEngineExplainIntegrationTest extends 
BaseClusterIntegrati
   @Test
   public void simpleQuery() {
     explain("SELECT 1 FROM mytable",
-        //@formatter:off
         "Execution Plan\n"
             + "PinotLogicalExchange(distribution=[broadcast])\n"
             + "  LeafStageCombineOperator(table=[mytable])\n"
@@ -89,13 +86,11 @@ public class MultiStageEngineExplainIntegrationTest extends 
BaseClusterIntegrati
             + "            Project(columns=[[]])\n"
             + "              DocIdSet(maxDocs=[120000])\n"
             + "                FilterMatchEntireSegment(numDocs=[115545])\n");
-        //@formatter:on
   }
 
   @Test
   public void simpleQueryVerbose() {
     explainVerbose("SELECT 1 FROM mytable",
-        //@formatter:off
         "Execution Plan\n"
             + "PinotLogicalExchange(distribution=[broadcast])\n"
             + "  LeafStageCombineOperator(table=[mytable])\n"
@@ -161,17 +156,14 @@ public class MultiStageEngineExplainIntegrationTest 
extends BaseClusterIntegrati
             + "            Project(columns=[[]])\n"
             + "              DocIdSet(maxDocs=[10000])\n"
             + "                FilterMatchEntireSegment(numDocs=[any])\n");
-    //@formatter:on
   }
 
   @Test
   public void simpleQueryLogical() {
     explainLogical("SELECT 1 FROM mytable",
-        //@formatter:off
         "Execution Plan\n"
             + "LogicalProject(EXPR$0=[1])\n"
             + "  LogicalTableScan(table=[[default, mytable]])\n");
-    //@formatter:on
   }
 
   @AfterClass
@@ -186,49 +178,4 @@ public class MultiStageEngineExplainIntegrationTest 
extends BaseClusterIntegrati
 
     FileUtils.deleteDirectory(_tempDir);
   }
-
-  private void explainVerbose(@Language("sql") String query, String expected) {
-    try {
-      JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan 
for " + query);
-      JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
-
-      String actual = plan.asText()
-          .replaceAll("numDocs=\\[[^\\]]*]", "numDocs=[any]")
-          .replaceAll("segment=\\[[^\\]]*]", "segment=[any]")
-          .replaceAll("totalDocs=\\[[^\\]]*]", "totalDocs=[any]");
-
-
-      Assert.assertEquals(actual, expected);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void explain(@Language("sql") String query, String expected) {
-    try {
-      JsonNode jsonNode = postQuery("explain plan for " + query);
-      JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
-
-      Assert.assertEquals(plan.asText(), expected);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void explainLogical(@Language("sql") String query, String expected) {
-    try {
-      JsonNode jsonNode = postQuery("set explainAskingServers=false; explain 
plan for " + query);
-      JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
-
-      Assert.assertEquals(plan.asText(), expected);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexMseTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexMseTest.java
new file mode 100644
index 0000000000..072b21f3bc
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexMseTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.ExplainIntegrationTestTrait;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TimestampConfig;
+import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TimestampIndexMseTest extends BaseClusterIntegrationTest 
implements ExplainIntegrationTestTrait {
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServers(2);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    String property = 
CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN;
+    brokerConf.setProperty(property, "true");
+  }
+
+  @Test
+  public void timestampIndexSubstitutedInProjections() {
+    setUseMultiStageQueryEngine(true);
+    explain("SELECT datetrunc('SECOND',ArrTime) FROM mytable",
+        "Execution Plan\n"
+            + "PinotLogicalExchange(distribution=[broadcast])\n"
+            + "  LeafStageCombineOperator(table=[mytable])\n"
+            + "    StreamingInstanceResponse\n"
+            + "      StreamingCombineSelect\n"
+            + "        SelectStreaming(table=[mytable], totalDocs=[115545])\n"
+            + "          Project(columns=[[$ArrTime$SECOND]])\n"
+            + "            DocIdSet(maxDocs=[120000])\n"
+            + "              FilterMatchEntireSegment(numDocs=[115545])\n");
+  }
+
+  @Test
+  public void timestampIndexSubstitutedInFilters() {
+    setUseMultiStageQueryEngine(true);
+    explain("SELECT 1 FROM mytable where datetrunc('SECOND',ArrTime) > 1",
+        "Execution Plan\n"
+            + "PinotLogicalExchange(distribution=[broadcast])\n"
+            + "  LeafStageCombineOperator(table=[mytable])\n"
+            + "    StreamingInstanceResponse\n"
+            + "      StreamingCombineSelect\n"
+            + "        SelectStreaming(table=[mytable], totalDocs=[115545])\n"
+            + "          Transform(expressions=[['1']])\n"
+            + "            Project(columns=[[]])\n"
+            + "              DocIdSet(maxDocs=[120000])\n"
+            + "                FilterRangeIndex(predicate=[$ArrTime$SECOND > 
'1'], indexLookUp=[range_index], "
+            + "operator=[RANGE])\n");
+  }
+
+  @Test
+  public void timestampIndexSubstitutedInAggregateFilter() {
+    setUseMultiStageQueryEngine(true);
+    explain("SELECT sum(case when datetrunc('SECOND',ArrTime) > 1 then 2 else 
0 end) FROM mytable",
+        "Execution Plan\n"
+            + "LogicalProject(EXPR$0=[CASE(=($1, 0), null:BIGINT, $0)])\n"
+            + "  PinotLogicalAggregate(group=[{}], agg#0=[$SUM0($0)], 
agg#1=[COUNT($1)], aggType=[FINAL])\n"
+            + "    PinotLogicalExchange(distribution=[hash])\n"
+            + "      LeafStageCombineOperator(table=[mytable])\n"
+            + "        StreamingInstanceResponse\n"
+            + "          CombineAggregate\n"
+            + "            AggregateFiltered(aggregations=[[sum('2'), 
count(*)]])\n"
+            + "              Transform(expressions=[['2']])\n"
+            + "                Project(columns=[[]])\n"
+            + "                  DocIdSet(maxDocs=[120000])\n"
+            + "                    FilterRangeIndex(predicate=[$ArrTime$SECOND 
> '1'], indexLookUp=[range_index], "
+            + "operator=[RANGE])\n"
+            + "              Project(columns=[[]])\n"
+            + "                DocIdSet(maxDocs=[120000])\n"
+            + "                  
FilterMatchEntireSegment(numDocs=[115545])\n");
+  }
+
+  @Test
+  public void timestampIndexSubstitutedInGroupBy() {
+    setUseMultiStageQueryEngine(true);
+    explain("SELECT count(*) FROM mytable group by 
datetrunc('SECOND',ArrTime)",
+        "Execution Plan\n"
+            + "LogicalProject(EXPR$0=[$1])\n"
+            + "  PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])\n"
+            + "    PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "      LeafStageCombineOperator(table=[mytable])\n"
+            + "        StreamingInstanceResponse\n"
+            + "          CombineGroupBy\n"
+            + "            GroupBy(groupKeys=[[$ArrTime$SECOND]], 
aggregations=[[count(*)]])\n"
+            + "              Project(columns=[[$ArrTime$SECOND]])\n"
+            + "                DocIdSet(maxDocs=[120000])\n"
+            + "                  
FilterMatchEntireSegment(numDocs=[115545])\n");
+  }
+
+  @Test
+  public void timestampIndexSubstitutedInJoinMSE() {
+    setUseMultiStageQueryEngine(true);
+    explain("SELECT 1 "
+            + "FROM mytable as a1 "
+            + "join mytable as a2 "
+            + "on datetrunc('SECOND',a1.ArrTime) = 
datetrunc('DAY',a2.ArrTime)",
+        "Execution Plan\n"
+            + "LogicalProject(EXPR$0=[1])\n"
+            + "  LogicalJoin(condition=[=($0, $1)], joinType=[inner])\n"
+            + "    PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "      LeafStageCombineOperator(table=[mytable])\n"
+            + "        StreamingInstanceResponse\n"
+            + "          StreamingCombineSelect\n"
+            + "            SelectStreaming(table=[mytable], 
totalDocs=[115545])\n"
+            + "              Project(columns=[[$ArrTime$SECOND]])\n" // 
substituted because we have SECOND granularity
+            + "                DocIdSet(maxDocs=[120000])\n"
+            + "                  FilterMatchEntireSegment(numDocs=[115545])\n"
+            + "    PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "      LeafStageCombineOperator(table=[mytable])\n"
+            + "        StreamingInstanceResponse\n"
+            + "          StreamingCombineSelect\n"
+            + "            SelectStreaming(table=[mytable], 
totalDocs=[115545])\n"
+            + "              
Transform(expressions=[[datetrunc('DAY',ArrTime)]])\n" // we don't set the DAY 
granularity
+            + "                Project(columns=[[ArrTime]])\n"
+            + "                  DocIdSet(maxDocs=[120000])\n"
+            + "                    
FilterMatchEntireSegment(numDocs=[115545])\n");
+  }
+
+
+  protected TableConfig createOfflineTableConfig() {
+    String colName = "ArrTime";
+
+    TableConfig tableConfig = super.createOfflineTableConfig();
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList == null) {
+      fieldConfigList = new ArrayList<>();
+      tableConfig.setFieldConfigList(fieldConfigList);
+    } else {
+      fieldConfigList.stream()
+          .filter(fieldConfig -> fieldConfig.getName().equals(colName))
+          .findFirst()
+          .ifPresent(
+              fieldConfig -> {
+                throw new IllegalStateException("Time column already exists in 
the field config list");
+              }
+          );
+    }
+    FieldConfig newTimeFieldConfig = new FieldConfig.Builder(colName)
+        .withTimestampConfig(
+            new TimestampConfig(List.of(TimestampIndexGranularity.SECOND))
+        )
+        .build();
+    fieldConfigList.add(newTimeFieldConfig);
+    return tableConfig;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexSseTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexSseTest.java
new file mode 100644
index 0000000000..0620778693
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TimestampIndexSseTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
+import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.ExplainIntegrationTestTrait;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TimestampConfig;
+import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TimestampIndexSseTest extends BaseClusterIntegrationTest 
implements ExplainIntegrationTestTrait {
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServers(2);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Test
+  public void timestampIndexSubstitutedInProjections() {
+    setUseMultiStageQueryEngine(false);
+    explainSse("SELECT datetrunc('SECOND',ArrTime) FROM mytable",
+    "[BROKER_REDUCE(limit:10), 1, 0]",
+        "[COMBINE_SELECT, 2, 1]",
+        "[PLAN_START(numSegmentsForThisPlan:1), -1, -1]",
+        "[SELECT(selectList:$ArrTime$SECOND), 3, 2]",
+        "[PROJECT($ArrTime$SECOND), 4, 3]",
+        "[DOC_ID_SET, 5, 4]",
+        Pattern.compile("\\[FILTER_MATCH_ENTIRE_SEGMENT\\(docs:[0-9]+\\), 6, 
5]"));
+  }
+
+  @Test
+  public void timestampIndexSubstitutedInFilters() {
+    setUseMultiStageQueryEngine(false);
+    explainSse("SELECT ArrTime FROM mytable where datetrunc('SECOND',ArrTime) 
> 1",
+    "[BROKER_REDUCE(limit:10), 1, 0]",
+        "[COMBINE_SELECT, 2, 1]",
+        "[PLAN_START(numSegmentsForThisPlan:12), -1, -1]",
+        "[SELECT(selectList:ArrTime), 3, 2]",
+        "[PROJECT(ArrTime), 4, 3]",
+        "[DOC_ID_SET, 5, 4]",
+        
"[FILTER_RANGE_INDEX(indexLookUp:range_index,operator:RANGE,predicate:$ArrTime$SECOND
 > '1'), 6, 5]");
+  }
+
+  @Test
+  public void timestampIndexSubstitutedInAggregateFilter() {
+    setUseMultiStageQueryEngine(false);
+    explainSse("SELECT sum(case when datetrunc('SECOND',ArrTime) > 1 then 2 
else 0 end) FROM mytable",
+    "[BROKER_REDUCE(limit:10), 1, 0]",
+        "[COMBINE_AGGREGATE, 2, 1]",
+        "[PLAN_START(numSegmentsForThisPlan:1), -1, -1]",
+        
"[AGGREGATE(aggregations:sum(case(greater_than($ArrTime$SECOND,'1'),'2','0'))), 
3, 2]",
+        "[TRANSFORM(case(greater_than($ArrTime$SECOND,'1'),'2','0')), 4, 3]",
+        "[PROJECT($ArrTime$SECOND), 5, 4]",
+        "[DOC_ID_SET, 6, 5]",
+        Pattern.compile("\\[FILTER_MATCH_ENTIRE_SEGMENT\\(docs:[0-9]+\\), 7, 
6]"));
+  }
+
+  @Test
+  public void timestampIndexSubstitutedInGroupBy() {
+    setUseMultiStageQueryEngine(false);
+    explainSse("SELECT count(*) FROM mytable group by 
datetrunc('SECOND',ArrTime)",
+    "[BROKER_REDUCE(limit:10), 1, 0]",
+        "[COMBINE_GROUP_BY, 2, 1]",
+        "[PLAN_START(numSegmentsForThisPlan:1), -1, -1]",
+        "[GROUP_BY(groupKeys:$ArrTime$SECOND, aggregations:count(*)), 3, 2]",
+        "[PROJECT($ArrTime$SECOND), 4, 3]",
+        "[DOC_ID_SET, 5, 4]",
+        Pattern.compile("\\[FILTER_MATCH_ENTIRE_SEGMENT\\(docs:[0-9]+\\), 6, 
5]"));
+  }
+
+  protected TableConfig createOfflineTableConfig() {
+    String colName = "ArrTime";
+
+    TableConfig tableConfig = super.createOfflineTableConfig();
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList == null) {
+      fieldConfigList = new ArrayList<>();
+      tableConfig.setFieldConfigList(fieldConfigList);
+    } else {
+      fieldConfigList.stream()
+          .filter(fieldConfig -> fieldConfig.getName().equals(colName))
+          .findFirst()
+          .ifPresent(
+              fieldConfig -> {
+                throw new IllegalStateException("Time column already exists in 
the field config list");
+              }
+          );
+    }
+    FieldConfig newTimeFieldConfig = new FieldConfig.Builder(colName)
+        .withTimestampConfig(
+            new TimestampConfig(List.of(TimestampIndexGranularity.SECOND))
+        )
+        .build();
+    fieldConfigList.add(newTimeFieldConfig);
+    return tableConfig;
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index 1ac11809aa..8db3784719 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -27,6 +27,7 @@ import 
org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.request.DataSource;
 import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.request.RequestUtils;
@@ -76,9 +77,12 @@ public class ServerPlanRequestVisitor implements 
PlanNodeVisitor<Void, ServerPla
       if (!groupByList.isEmpty()) {
         pinotQuery.setGroupByList(groupByList);
       }
-      pinotQuery.setSelectList(
-          CalciteRexExpressionParser.convertAggregateList(groupByList, 
node.getAggCalls(), node.getFilterArgs(),
-              pinotQuery));
+      List<Expression> selectList = 
CalciteRexExpressionParser.convertAggregateList(groupByList, node.getAggCalls(),
+          node.getFilterArgs(), pinotQuery);
+      for (Expression expression : selectList) {
+        applyTimestampIndex(expression, pinotQuery);
+      }
+      pinotQuery.setSelectList(selectList);
       if (node.getAggType() == AggregateNode.AggType.DIRECT) {
         
pinotQuery.putToQueryOptions(CommonConstants.Broker.Request.QueryOptionKey.SERVER_RETURN_FINAL_RESULT,
 "true");
       } else if (node.isLeafReturnFinalResult()) {
@@ -127,7 +131,9 @@ public class ServerPlanRequestVisitor implements 
PlanNodeVisitor<Void, ServerPla
     if (visit(node.getInputs().get(0), context)) {
       PinotQuery pinotQuery = context.getPinotQuery();
       if (pinotQuery.getFilterExpression() == null) {
-        
pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(),
 pinotQuery));
+        Expression expression = 
CalciteRexExpressionParser.toExpression(node.getCondition(), pinotQuery);
+        applyTimestampIndex(expression, pinotQuery);
+        pinotQuery.setFilterExpression(expression);
       } else {
         // if filter is already applied then it cannot have another one on 
leaf.
         context.setLeafStageBoundaryNode(node.getInputs().get(0));
@@ -191,7 +197,11 @@ public class ServerPlanRequestVisitor implements 
PlanNodeVisitor<Void, ServerPla
   public Void visitProject(ProjectNode node, ServerPlanRequestContext context) 
{
     if (visit(node.getInputs().get(0), context)) {
       PinotQuery pinotQuery = context.getPinotQuery();
-      
pinotQuery.setSelectList(CalciteRexExpressionParser.convertRexNodes(node.getProjects(),
 pinotQuery));
+      List<Expression> selectList = 
CalciteRexExpressionParser.convertRexNodes(node.getProjects(), pinotQuery);
+      for (Expression expression : selectList) {
+        applyTimestampIndex(expression, pinotQuery);
+      }
+      pinotQuery.setSelectList(selectList);
     }
     return null;
   }
@@ -249,4 +259,14 @@ public class ServerPlanRequestVisitor implements 
PlanNodeVisitor<Void, ServerPla
     node.visit(this, context);
     return context.getLeafStageBoundaryNode() == null;
   }
+
+  private void applyTimestampIndex(Expression expression, PinotQuery 
pinotQuery) {
+    RequestUtils.applyTimestampIndexOverrideHints(expression, pinotQuery);
+    Function functionCall = expression.getFunctionCall();
+    if (expression.isSetFunctionCall()) {
+      for (Expression operand : functionCall.getOperands()) {
+        applyTimestampIndex(operand, pinotQuery);
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to