npawar commented on a change in pull request #7654:
URL: https://github.com/apache/pinot/pull/7654#discussion_r739432087
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
##########
@@ -72,29 +80,127 @@ public void init(PinotConfiguration config) {
}
@Override
- public boolean prune(IndexSegment segment, QueryContext query) {
+ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext
query) {
+ if (segments.isEmpty()) {
+ return segments;
+ }
FilterContext filter = query.getFilter();
if (filter == null) {
- return false;
+ return segments;
+ }
+
+ // Extract EQ/IN/RANGE predicate columns
+ Set<String> eqInColumns = new HashSet<>();
+ Set<String> rangeColumns = new HashSet<>();
+ extractPredicateColumns(filter, eqInColumns, rangeColumns);
+
+ if (eqInColumns.isEmpty() && rangeColumns.isEmpty()) {
+ return segments;
+ }
+
+ int numSegments = segments.size();
+ List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
+ if (!eqInColumns.isEmpty() && query.isEnablePrefetch()) {
+ Map[] dataSourceCaches = new Map[numSegments];
+ FetchContext[] fetchContexts = new FetchContext[numSegments];
+ try {
+ // Prefetch bloom filter for columns within the EQ/IN predicate if
exists
+ for (int i = 0; i < numSegments; i++) {
+ IndexSegment segment = segments.get(i);
+ Map<String, DataSource> dataSourceCache = new HashMap<>();
+ Map<String, List<ColumnIndexType>> columnToIndexList = new
HashMap<>();
+ for (String column : eqInColumns) {
+ DataSource dataSource = segment.getDataSource(column);
+ // NOTE: Column must exist after DataSchemaSegmentPruner
+ assert dataSource != null;
+ dataSourceCache.put(column, dataSource);
+ if (dataSource.getBloomFilter() != null) {
+ columnToIndexList.put(column,
Collections.singletonList(ColumnIndexType.BLOOM_FILTER));
+ }
+ }
+ dataSourceCaches[i] = dataSourceCache;
+ if (!columnToIndexList.isEmpty()) {
+ FetchContext fetchContext =
+ new FetchContext(UUID.randomUUID(), segment.getSegmentName(),
columnToIndexList);
+ segment.prefetch(fetchContext);
+ fetchContexts[i] = fetchContext;
+ }
+ }
+
+ // Prune segments
+ for (int i = 0; i < numSegments; i++) {
+ IndexSegment segment = segments.get(i);
+ if (!pruneSegment(segment, filter, dataSourceCaches[i],
fetchContexts[i])) {
+ selectedSegments.add(segment);
Review comment:
it looks like we'll acquire for all segments, and then later release for
all. Unlike the segment execution where we are guaranteed to release an
acquired segment. Will this not deadlock if all acquire are not able to
complete?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]