This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 41f90f9ef8 Add APIs to IndexSegment as a preparation to support
virtual DataSource (#15869)
41f90f9ef8 is described below
commit 41f90f9ef81d6d91205df95c2685a3abbb0d4f89
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu May 22 18:19:34 2025 -0600
Add APIs to IndexSegment as a preparation to support virtual DataSource
(#15869)
---
...xValueBasedSelectionOrderByCombineOperator.java | 8 +-
.../operator/filter/ExpressionFilterOperator.java | 2 +-
.../operator/query/SelectionOrderByOperator.java | 2 +-
.../pinot/core/plan/AggregationPlanNode.java | 13 +--
.../apache/pinot/core/plan/DistinctPlanNode.java | 2 +-
.../org/apache/pinot/core/plan/FilterPlanNode.java | 20 ++--
.../apache/pinot/core/plan/ProjectPlanNode.java | 3 +-
.../apache/pinot/core/plan/SelectionPlanNode.java | 2 +-
.../query/executor/LogicalTableExecutionInfo.java | 7 ++
.../query/executor/ServerQueryExecutorV1Impl.java | 2 +
.../query/executor/SingleTableExecutionInfo.java | 24 +++--
.../core/query/executor/TableExecutionInfo.java | 5 +
.../core/query/prefetch/DefaultFetchPlanner.java | 4 +-
.../core/query/request/context/QueryContext.java | 11 ++
.../plan/maker/QueryOverrideWithHintsTest.java | 113 +++++----------------
.../indexsegment/immutable/EmptyIndexSegment.java | 17 ++--
.../immutable/ImmutableSegmentImpl.java | 15 ++-
.../indexsegment/mutable/MutableSegmentImpl.java | 11 +-
.../org/apache/pinot/segment/spi/IndexSegment.java | 29 ++++--
19 files changed, 143 insertions(+), 147 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index 776a838167..e9a513ad9e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -36,6 +36,7 @@ import
org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryErrorMessage;
import org.slf4j.Logger;
@@ -85,7 +86,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator
_minMaxValueContexts = new ArrayList<>(_numOperators);
for (Operator<BaseResultsBlock> operator : _operators) {
- _minMaxValueContexts.add(new MinMaxValueContext(operator,
firstOrderByColumn));
+ _minMaxValueContexts.add(new MinMaxValueContext(operator,
firstOrderByColumn, queryContext.getSchema()));
}
if (firstOrderByExpression.isAsc()) {
// For ascending order, sort on column min value in ascending order
@@ -313,9 +314,10 @@ public class
MinMaxValueBasedSelectionOrderByCombineOperator
final Comparable _minValue;
final Comparable _maxValue;
- MinMaxValueContext(Operator<BaseResultsBlock> operator, String column) {
+ MinMaxValueContext(Operator<BaseResultsBlock> operator, String column,
Schema schema) {
_operator = operator;
- DataSourceMetadata dataSourceMetadata =
operator.getIndexSegment().getDataSource(column).getDataSourceMetadata();
+ DataSourceMetadata dataSourceMetadata =
+ operator.getIndexSegment().getDataSource(column,
schema).getDataSourceMetadata();
_minValue = dataSourceMetadata.getMinValue();
_maxValue = dataSourceMetadata.getMaxValue();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
index 6346be2372..27889f8ec3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java
@@ -64,7 +64,7 @@ public class ExpressionFilterOperator extends
BaseFilterOperator {
_dataSourceMap = new HashMap<>(mapCapacity);
Map<String, ColumnContext> columnContextMap = new HashMap<>(mapCapacity);
columns.forEach(column -> {
- DataSource dataSource = segment.getDataSource(column);
+ DataSource dataSource = segment.getDataSource(column,
queryContext.getSchema());
_dataSourceMap.put(column, dataSource);
columnContextMap.put(column, ColumnContext.fromDataSource(dataSource));
});
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index 95bf2a2e66..3321a3ae14 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -279,7 +279,7 @@ public class SelectionOrderByOperator extends
BaseOperator<SelectionResultsBlock
int numColumns = columns.size();
Map<String, DataSource> dataSourceMap = new HashMap<>();
for (String column : columns) {
- dataSourceMap.put(column, _indexSegment.getDataSource(column));
+ dataSourceMap.put(column, _indexSegment.getDataSource(column,
_queryContext.getSchema()));
}
try (ProjectionOperator projectionOperator =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index cca14f2704..7db0aefd29 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -112,13 +112,13 @@ public class AggregationPlanNode implements PlanNode {
boolean hasNullValues = _queryContext.isNullHandlingEnabled() &&
hasNullValues(aggregationFunctions);
if (!hasNullValues) {
// Priority 2: Check if non-scan based aggregation is feasible
- if (filterOperator.isResultMatchingAll() &&
isFitForNonScanBasedPlan(aggregationFunctions, _indexSegment)) {
+ if (filterOperator.isResultMatchingAll() && isFitForNonScanBasedPlan()) {
DataSource[] dataSources = new DataSource[aggregationFunctions.length];
for (int i = 0; i < aggregationFunctions.length; i++) {
List<?> inputExpressions =
aggregationFunctions[i].getInputExpressions();
if (!inputExpressions.isEmpty()) {
String column = ((ExpressionContext)
inputExpressions.get(0)).getIdentifier();
- dataSources[i] = _indexSegment.getDataSource(column);
+ dataSources[i] = _indexSegment.getDataSource(column,
_queryContext.getSchema());
}
}
return new NonScanBasedAggregationOperator(_queryContext, dataSources,
numTotalDocs);
@@ -148,7 +148,7 @@ public class AggregationPlanNode implements PlanNode {
for (ExpressionContext argument :
aggregationFunction.getInputExpressions()) {
switch (argument.getType()) {
case IDENTIFIER:
- DataSource dataSource =
_indexSegment.getDataSource(argument.getIdentifier());
+ DataSource dataSource =
_indexSegment.getDataSource(argument.getIdentifier(),
_queryContext.getSchema());
NullValueVectorReader nullValueVector =
dataSource.getNullValueVector();
if (nullValueVector != null &&
!nullValueVector.getNullBitmap().isEmpty()) {
return true;
@@ -172,8 +172,9 @@ public class AggregationPlanNode implements PlanNode {
* Returns {@code true} if the given aggregations can be solved with
dictionary or column metadata, {@code false}
* otherwise.
*/
- private static boolean isFitForNonScanBasedPlan(AggregationFunction[]
aggregationFunctions,
- IndexSegment indexSegment) {
+ private boolean isFitForNonScanBasedPlan() {
+ AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
+ assert aggregationFunctions != null;
for (AggregationFunction<?, ?> aggregationFunction : aggregationFunctions)
{
if (aggregationFunction.getType() == COUNT) {
continue;
@@ -182,7 +183,7 @@ public class AggregationPlanNode implements PlanNode {
if (argument.getType() != ExpressionContext.Type.IDENTIFIER) {
return false;
}
- DataSource dataSource =
indexSegment.getDataSource(argument.getIdentifier());
+ DataSource dataSource =
_indexSegment.getDataSource(argument.getIdentifier(),
_queryContext.getSchema());
if (DICTIONARY_BASED_FUNCTIONS.contains(aggregationFunction.getType())) {
if (dataSource.getDictionary() != null) {
continue;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java
index 44ffcd98aa..fae9b8efad 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java
@@ -54,7 +54,7 @@ public class DistinctPlanNode implements PlanNode {
if (_queryContext.getFilter() == null && expressions.size() == 1) {
String column = expressions.get(0).getIdentifier();
if (column != null) {
- DataSource dataSource = _indexSegment.getDataSource(column);
+ DataSource dataSource = _indexSegment.getDataSource(column,
_queryContext.getSchema());
if (dataSource.getDictionary() != null) {
if (!_queryContext.isNullHandlingEnabled()) {
return new DictionaryBasedDistinctOperator(dataSource,
_queryContext);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 73f148c9c4..805c70a1c2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -148,8 +148,12 @@ public class FilterPlanNode implements PlanNode {
findLiteral = true;
}
}
- return columnName != null &&
_indexSegment.getDataSource(columnName).getH3Index() != null && findLiteral
- && _queryContext.isIndexUseAllowed(columnName,
FieldConfig.IndexType.H3);
+ if (columnName == null || !findLiteral) {
+ return false;
+ }
+ DataSource dataSource = _indexSegment.getDataSourceNullable(columnName);
+ return dataSource != null && dataSource.getH3Index() != null &&
_queryContext.isIndexUseAllowed(columnName,
+ FieldConfig.IndexType.H3);
}
/**
@@ -179,16 +183,18 @@ public class FilterPlanNode implements PlanNode {
if (arguments.get(0).getType() == ExpressionContext.Type.IDENTIFIER
&& arguments.get(1).getType() == ExpressionContext.Type.LITERAL) {
String columnName = arguments.get(0).getIdentifier();
- return _indexSegment.getDataSource(columnName).getH3Index() != null
- && _queryContext.isIndexUseAllowed(columnName,
FieldConfig.IndexType.H3);
+ DataSource dataSource =
_indexSegment.getDataSourceNullable(columnName);
+ return dataSource != null && dataSource.getH3Index() != null &&
_queryContext.isIndexUseAllowed(columnName,
+ FieldConfig.IndexType.H3);
}
return false;
} else {
if (arguments.get(1).getType() == ExpressionContext.Type.IDENTIFIER
&& arguments.get(0).getType() == ExpressionContext.Type.LITERAL) {
String columnName = arguments.get(1).getIdentifier();
- return _indexSegment.getDataSource(columnName).getH3Index() != null
- && _queryContext.isIndexUseAllowed(columnName,
FieldConfig.IndexType.H3);
+ DataSource dataSource =
_indexSegment.getDataSourceNullable(columnName);
+ return dataSource != null && dataSource.getH3Index() != null &&
_queryContext.isIndexUseAllowed(columnName,
+ FieldConfig.IndexType.H3);
}
return false;
}
@@ -256,7 +262,7 @@ public class FilterPlanNode implements PlanNode {
}
} else {
String column = lhs.getIdentifier();
- DataSource dataSource = _indexSegment.getDataSource(column);
+ DataSource dataSource = _indexSegment.getDataSource(column,
_queryContext.getSchema());
PredicateEvaluator predicateEvaluator;
switch (predicate.getType()) {
case TEXT_CONTAINS:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
index fdd47ba717..d9b4cb8aa8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java
@@ -75,7 +75,8 @@ public class ProjectPlanNode implements PlanNode {
}
}
Map<String, DataSource> dataSourceMap = new
HashMap<>(HashUtil.getHashMapCapacity(projectionColumns.size()));
- projectionColumns.forEach(column -> dataSourceMap.put(column,
_indexSegment.getDataSource(column)));
+ projectionColumns.forEach(
+ column -> dataSourceMap.put(column,
_indexSegment.getDataSource(column, _queryContext.getSchema())));
// NOTE: Skip creating DocIdSetOperator when maxDocsPerCall is 0 (for
selection query with LIMIT 0)
DocIdSetOperator docIdSetOperator =
_maxDocsPerCall > 0 ? new DocIdSetPlanNode(_segmentContext,
_queryContext, _maxDocsPerCall,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
index e936cd6949..d59345e184 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
@@ -158,7 +158,7 @@ public class SelectionPlanNode implements PlanNode {
return false;
}
String column = orderByExpression.getExpression().getIdentifier();
- DataSource dataSource = _indexSegment.getDataSource(column);
+ DataSource dataSource = _indexSegment.getDataSource(column,
_queryContext.getSchema());
// If there are null values, we cannot trust
DataSourceMetadata.isSorted
if (isNullHandlingEnabled) {
NullValueVectorReader nullValueVector =
dataSource.getNullValueVector();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java
index 3a9c63f599..cc65a51852 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java
@@ -37,6 +37,7 @@ import
org.apache.pinot.core.query.request.context.TimerContext;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +67,12 @@ public class LogicalTableExecutionInfo implements
TableExecutionInfo {
_tableExecutionInfos = tableExecutionInfos;
}
+ @Override
+ public Schema getSchema() {
+ // TODO: Return the schema of the logical table
+ return _tableExecutionInfos.get(0).getSchema();
+ }
+
@Override
public boolean hasRealtime() {
return _tableExecutionInfos.stream()
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index aca9b861f9..97ee75dedb 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -216,6 +216,8 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
.collect(Collectors.toList()));
}
+ queryContext.setSchema(executionInfo.getSchema());
+
// Gather stats for realtime consuming segments
// TODO: the freshness time should not be collected at query time because
there is no guarantee that the consuming
// segment is queried (consuming segment might be pruned, or the
server only contains relocated committed
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
index 2439dcbc35..aba2be49ad 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java
@@ -45,6 +45,7 @@ import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,18 +152,18 @@ public class SingleTableExecutionInfo implements
TableExecutionInfo {
_notAcquiredSegments = notAcquiredSegments;
}
- @Override
- public boolean hasRealtime() {
- return _tableDataManager instanceof RealtimeTableDataManager;
- }
-
-
public TableDataManager getTableDataManager() {
return _tableDataManager;
}
- public List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ @Override
+ public Schema getSchema() {
+ return _tableDataManager.getCachedTableConfigAndSchema().getRight();
+ }
+
+ @Override
+ public boolean hasRealtime() {
+ return _tableDataManager instanceof RealtimeTableDataManager;
}
@Override
@@ -189,14 +190,21 @@ public class SingleTableExecutionInfo implements
TableExecutionInfo {
return _providedSegmentContexts;
}
+ @Override
public List<String> getSegmentsToQuery() {
return _segmentsToQuery;
}
+ @Override
public List<String> getOptionalSegments() {
return _optionalSegments;
}
+ @Override
+ public List<SegmentDataManager> getSegmentDataManagers() {
+ return _segmentDataManagers;
+ }
+
@Override
public List<String> getNotAcquiredSegments() {
return _notAcquiredSegments;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java
index a820bade0d..6f172ecc72 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java
@@ -30,9 +30,14 @@ import
org.apache.pinot.core.query.request.context.TimerContext;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
+import org.apache.pinot.spi.data.Schema;
public interface TableExecutionInfo {
+
+ /// Returns the latest [Schema] for the table.
+ Schema getSchema();
+
/**
* Check if consuming segments are being queried.
* @return true if consuming segments are being queried, false otherwise
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/prefetch/DefaultFetchPlanner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/prefetch/DefaultFetchPlanner.java
index d198fb97e0..2cfcbcc9c7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/prefetch/DefaultFetchPlanner.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/prefetch/DefaultFetchPlanner.java
@@ -48,8 +48,8 @@ public class DefaultFetchPlanner implements FetchPlanner {
extractEqInColumns(Objects.requireNonNull(queryContext.getFilter()),
eqInColumns);
Map<String, List<IndexType<?, ?, ?>>> columnToIndexList = new HashMap<>();
for (String column : eqInColumns) {
- DataSource dataSource = indexSegment.getDataSource(column);
- if (dataSource.getBloomFilter() != null) {
+ DataSource dataSource = indexSegment.getDataSourceNullable(column);
+ if (dataSource != null && dataSource.getBloomFilter() != null) {
columnToIndexList.put(column,
Collections.singletonList(StandardIndexes.bloomFilter()));
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index b1144e6044..d5f77b84fa 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -42,6 +42,7 @@ import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionFacto
import org.apache.pinot.core.util.MemoizedClassAssociation;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants.Server;
@@ -98,6 +99,8 @@ public class QueryContext {
private Set<String> _columns;
// Other properties to be shared across all the segments
+ // Latest table schema at query time
+ private Schema _schema;
// End time in milliseconds for the query
private long _endTimeMs;
// Whether to enable prefetch for the query
@@ -314,6 +317,14 @@ public class QueryContext {
return _columns;
}
+ public Schema getSchema() {
+ return _schema;
+ }
+
+ public void setSchema(Schema schema) {
+ _schema = schema;
+ }
+
public long getEndTimeMs() {
return _endTimeMs;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
index c6750af635..274b891b12 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
@@ -18,14 +18,10 @@
*/
package org.apache.pinot.core.plan.maker;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.request.context.ExpressionContext;
@@ -37,82 +33,26 @@ import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
-import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
public class QueryOverrideWithHintsTest {
- private final IndexSegment _indexSegment = new IndexSegment() {
- @Override
- public String getSegmentName() {
- return null;
- }
+ private IndexSegment _indexSegment;
- @Override
- public SegmentMetadata getSegmentMetadata() {
- return null;
- }
-
- @Override
- public Set<String> getColumnNames() {
- return ImmutableSet.of("$ts$MONTH");
- }
-
- @Override
- public Set<String> getPhysicalColumnNames() {
- return null;
- }
-
- @Override
- public DataSource getDataSource(String columnName) {
- return null;
- }
-
- @Override
- public List<StarTreeV2> getStarTrees() {
- return null;
- }
-
- @Nullable
- @Override
- public ThreadSafeMutableRoaringBitmap getValidDocIds() {
- return null;
- }
-
- @Nullable
- @Override
- public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
- return null;
- }
-
- @Override
- public GenericRow getRecord(int docId, GenericRow reuse) {
- return null;
- }
-
- @Override
- public Object getValue(int docId, String column) {
- return null;
- }
-
- @Override
- public void offload() {
- }
-
- @Override
- public void destroy() {
- }
- };
+ @BeforeClass
+ public void setUp() {
+ _indexSegment = mock(IndexSegment.class);
+
Mockito.when(_indexSegment.getColumnNames()).thenReturn(Set.of("$ts$MONTH"));
+ }
@Test
public void testExpressionContextHashcode() {
@@ -138,19 +78,19 @@ public class QueryOverrideWithHintsTest {
assertNotEquals(expressionContext1.hashCode(),
expressionContext2.hashCode());
expressionContext1 = ExpressionContext.forFunction(new
FunctionContext(FunctionContext.Type.TRANSFORM, "func1",
- ImmutableList.of(ExpressionContext.forIdentifier("abc"),
+ List.of(ExpressionContext.forIdentifier("abc"),
ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "abc"))));
expressionContext2 = ExpressionContext.forFunction(new
FunctionContext(FunctionContext.Type.TRANSFORM, "func1",
- ImmutableList.of(ExpressionContext.forIdentifier("abc"),
+ List.of(ExpressionContext.forIdentifier("abc"),
ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "abc"))));
assertEquals(expressionContext1, expressionContext2);
assertEquals(expressionContext1.hashCode(), expressionContext2.hashCode());
expressionContext1 = ExpressionContext.forFunction(new
FunctionContext(FunctionContext.Type.TRANSFORM, "datetrunc",
-
ImmutableList.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"),
+ List.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"),
ExpressionContext.forLiteral(FieldSpec.DataType.STRING,
"event_time_ts"))));
expressionContext2 = ExpressionContext.forFunction(new
FunctionContext(FunctionContext.Type.TRANSFORM, "datetrunc",
-
ImmutableList.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"),
+ List.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"),
ExpressionContext.forLiteral(FieldSpec.DataType.STRING,
"event_time_ts"))));
assertEquals(expressionContext1, expressionContext2);
assertEquals(expressionContext1.hashCode(), expressionContext2.hashCode());
@@ -160,15 +100,14 @@ public class QueryOverrideWithHintsTest {
public void testOverrideFilterWithExpressionOverrideHints() {
ExpressionContext dateTruncFunctionExpr = ExpressionContext.forFunction(
new FunctionContext(FunctionContext.Type.TRANSFORM, "dateTrunc", new
ArrayList<>(new ArrayList<>(
-
ImmutableList.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING,
"MONTH"),
+ List.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING,
"MONTH"),
ExpressionContext.forIdentifier("ts"))))));
ExpressionContext timestampIndexColumn =
ExpressionContext.forIdentifier("$ts$MONTH");
ExpressionContext equalsExpression = ExpressionContext.forFunction(
new FunctionContext(FunctionContext.Type.TRANSFORM, "EQUALS", new
ArrayList<>(
- ImmutableList.of(dateTruncFunctionExpr,
- ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000)))));
+ List.of(dateTruncFunctionExpr,
ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000)))));
FilterContext filter = RequestContextUtils.getFilter(equalsExpression);
- Map<ExpressionContext, ExpressionContext> hints =
ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn);
+ Map<ExpressionContext, ExpressionContext> hints =
Map.of(dateTruncFunctionExpr, timestampIndexColumn);
InstancePlanMakerImplV2.overrideWithExpressionHints(filter, _indexSegment,
hints);
assertEquals(filter.getType(), FilterContext.Type.PREDICATE);
assertEquals(filter.getPredicate().getLhs(), timestampIndexColumn);
@@ -176,7 +115,7 @@ public class QueryOverrideWithHintsTest {
FilterContext andFilter =
RequestContextUtils.getFilter(ExpressionContext.forFunction(
new FunctionContext(FunctionContext.Type.TRANSFORM, "AND",
- new ArrayList<>(ImmutableList.of(equalsExpression,
equalsExpression)))));
+ new ArrayList<>(List.of(equalsExpression, equalsExpression)))));
InstancePlanMakerImplV2.overrideWithExpressionHints(andFilter,
_indexSegment, hints);
assertEquals(andFilter.getChildren().get(0).getPredicate().getLhs(),
timestampIndexColumn);
assertEquals(andFilter.getChildren().get(1).getPredicate().getLhs(),
timestampIndexColumn);
@@ -186,14 +125,13 @@ public class QueryOverrideWithHintsTest {
public void testOverrideWithExpressionOverrideHints() {
ExpressionContext dateTruncFunctionExpr = ExpressionContext.forFunction(
new FunctionContext(FunctionContext.Type.TRANSFORM, "dateTrunc", new
ArrayList<>(
-
ImmutableList.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING,
"MONTH"),
+ List.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING,
"MONTH"),
ExpressionContext.forIdentifier("ts")))));
ExpressionContext timestampIndexColumn =
ExpressionContext.forIdentifier("$ts$MONTH");
ExpressionContext equalsExpression = ExpressionContext.forFunction(
new FunctionContext(FunctionContext.Type.TRANSFORM, "EQUALS", new
ArrayList<>(
- ImmutableList.of(dateTruncFunctionExpr,
- ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000)))));
- Map<ExpressionContext, ExpressionContext> hints =
ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn);
+ List.of(dateTruncFunctionExpr,
ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000)))));
+ Map<ExpressionContext, ExpressionContext> hints =
Map.of(dateTruncFunctionExpr, timestampIndexColumn);
ExpressionContext newEqualsExpression =
InstancePlanMakerImplV2.overrideWithExpressionHints(equalsExpression,
_indexSegment, hints);
assertEquals(newEqualsExpression.getFunction().getFunctionName(),
"equals");
@@ -206,14 +144,13 @@ public class QueryOverrideWithHintsTest {
public void testNotOverrideWithExpressionOverrideHints() {
ExpressionContext dateTruncFunctionExpr = ExpressionContext.forFunction(
new FunctionContext(FunctionContext.Type.TRANSFORM, "dateTrunc", new
ArrayList<>(
-
ImmutableList.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"),
+ List.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING,
"DAY"),
ExpressionContext.forIdentifier("ts")))));
ExpressionContext timestampIndexColumn =
ExpressionContext.forIdentifier("$ts$DAY");
ExpressionContext equalsExpression = ExpressionContext.forFunction(
new FunctionContext(FunctionContext.Type.TRANSFORM, "EQUALS", new
ArrayList<>(
- ImmutableList.of(dateTruncFunctionExpr,
- ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000)))));
- Map<ExpressionContext, ExpressionContext> hints =
ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn);
+ List.of(dateTruncFunctionExpr,
ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000)))));
+ Map<ExpressionContext, ExpressionContext> hints =
Map.of(dateTruncFunctionExpr, timestampIndexColumn);
ExpressionContext newEqualsExpression =
InstancePlanMakerImplV2.overrideWithExpressionHints(equalsExpression,
_indexSegment, hints);
assertEquals(newEqualsExpression.getFunction().getFunctionName(),
"equals");
@@ -231,7 +168,7 @@ public class QueryOverrideWithHintsTest {
RequestUtils.getFunctionExpression("datetrunc",
RequestUtils.getLiteralExpression("MONTH"),
RequestUtils.getIdentifierExpression("ts"));
Expression timestampIndexColumn =
RequestUtils.getIdentifierExpression("$ts$MONTH");
-
pinotQuery.setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr,
timestampIndexColumn));
+ pinotQuery.setExpressionOverrideHints(Map.of(dateTruncFunctionExpr,
timestampIndexColumn));
QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(pinotQuery);
InstancePlanMakerImplV2.rewriteQueryContextWithHints(queryContext,
_indexSegment);
assertEquals(queryContext.getSelectExpressions().get(0).getIdentifier(),
"$ts$MONTH");
@@ -246,7 +183,7 @@ public class QueryOverrideWithHintsTest {
RequestUtils.getFunctionExpression("datetrunc",
RequestUtils.getLiteralExpression("DAY"),
RequestUtils.getIdentifierExpression("ts"));
Expression timestampIndexColumn =
RequestUtils.getIdentifierExpression("$ts$DAY");
-
pinotQuery.setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr,
timestampIndexColumn));
+ pinotQuery.setExpressionOverrideHints(Map.of(dateTruncFunctionExpr,
timestampIndexColumn));
QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(pinotQuery);
InstancePlanMakerImplV2.rewriteQueryContextWithHints(queryContext,
_indexSegment);
assertEquals(queryContext.getSelectExpressions().get(0).getFunction(),
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
index c4374829c3..b66edb8a0b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.segment.local.indexsegment.immutable;
-import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
@@ -58,14 +57,6 @@ public class EmptyIndexSegment implements ImmutableSegment {
return _segmentMetadata;
}
- @Override
- public DataSource getDataSource(String column) {
- ColumnMetadata columnMetadata =
_segmentMetadata.getColumnMetadataFor(column);
- Preconditions.checkNotNull(columnMetadata,
- "ColumnMetadata for " + column + " should not be null. " +
"Potentially invalid column name specified.");
- return new EmptyDataSource(columnMetadata);
- }
-
@Override
public Set<String> getColumnNames() {
return _segmentMetadata.getSchema().getColumnNames();
@@ -84,6 +75,14 @@ public class EmptyIndexSegment implements ImmutableSegment {
public void destroy() {
}
+ @Nullable
+ @Override
+ public DataSource getDataSourceNullable(String column) {
+ ColumnMetadata columnMetadata =
_segmentMetadata.getColumnMetadataFor(column);
+ return columnMetadata != null ? new EmptyDataSource(columnMetadata) : null;
+ }
+
+ @Nullable
@Override
public List<StarTreeV2> getStarTrees() {
return null;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index e5183f4479..28ea561e8a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -234,14 +234,6 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
return _segmentMetadata;
}
- @Override
- public DataSource getDataSource(String column) {
- DataSource result = _dataSources.get(column);
- Preconditions.checkNotNull(result,
- "DataSource for %s should not be null. Potentially invalid column name
specified.", column);
- return result;
- }
-
@Override
public Set<String> getColumnNames() {
return _segmentMetadata.getSchema().getColumnNames();
@@ -306,6 +298,13 @@ public class ImmutableSegmentImpl implements
ImmutableSegment {
}
}
+ @Nullable
+ @Override
+ public DataSource getDataSourceNullable(String column) {
+ return _dataSources.get(column);
+ }
+
+ @Nullable
@Override
public List<StarTreeV2> getStarTrees() {
return _starTreeIndexContainer != null ?
_starTreeIndexContainer.getStarTrees() : null;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 17bf891010..ae1ffae0c3 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -1048,24 +1048,27 @@ public class MutableSegmentImpl implements
MutableSegment {
return physicalColumnNames;
}
+ @Nullable
@Override
- public DataSource getDataSource(String column) {
+ public DataSource getDataSourceNullable(String column) {
IndexContainer indexContainer = _indexContainerMap.get(column);
if (indexContainer != null) {
// Physical column
return indexContainer.toDataSource();
- } else {
+ }
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(column);
+ if (fieldSpec != null && fieldSpec.isVirtualColumn()) {
// Virtual column
- FieldSpec fieldSpec = _schema.getFieldSpecFor(column);
- Preconditions.checkState(fieldSpec != null &&
fieldSpec.isVirtualColumn(), "Failed to find column: %s", column);
// TODO: Refactor virtual column provider to directly generate data
source
VirtualColumnContext virtualColumnContext = new
VirtualColumnContext(fieldSpec, _numDocsIndexed);
VirtualColumnProvider virtualColumnProvider =
VirtualColumnProviderFactory.buildProvider(virtualColumnContext);
return new
ImmutableDataSource(virtualColumnProvider.buildMetadata(virtualColumnContext),
virtualColumnProvider.buildColumnIndexContainer(virtualColumnContext));
}
+ return null;
}
+ @Nullable
@Override
public List<StarTreeV2> getStarTrees() {
return null;
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
index 53f7b99558..4fdf1d6fab 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.spi;
+import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
@@ -25,6 +26,7 @@ import org.apache.pinot.segment.spi.datasource.DataSource;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.spi.annotations.InterfaceAudience;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -59,17 +61,30 @@ public interface IndexSegment {
*/
Set<String> getPhysicalColumnNames();
- /**
- * Returns the {@link DataSource} for the given column.
- *
- * @param columnName Column name
- * @return Data source for the given column
- */
- DataSource getDataSource(String columnName);
+ /// Returns the [DataSource] for the given column.
+ /// TODO: Revisit all usage of this method to support virtual [DataSource].
+ default DataSource getDataSource(String column) {
+ DataSource dataSource = getDataSourceNullable(column);
+ Preconditions.checkState(dataSource != null, "Failed to find data source
for column: ", column);
+ return dataSource;
+ }
+
+ /// Returns the [DataSource] for the given column, or `null` if the column
does not exist in the segment.
+ @Nullable
+ DataSource getDataSourceNullable(String column);
+
+ /// Returns the [DataSource] for the given column, or creates a virtual one
if it doesn't exist. The passed in
+ /// [Schema] should be the latest schema of the table, not the one from
[SegmentMetadata], and should contain the
+ /// asked column.
+ /// TODO: Add support for virtual [DataSource].
+ default DataSource getDataSource(String column, Schema schema) {
+ return getDataSource(column);
+ }
/**
* Returns a list of star-trees (V2), or null if there is no star-tree (V2)
in the segment.
*/
+ @Nullable
List<StarTreeV2> getStarTrees();
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]