This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 221db828cf Let applyAnd to be applied using different window sizes
(#10372)
221db828cf is described below
commit 221db828cf535eb751267124bca35c9a3b1f9c52
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu Mar 9 19:45:33 2023 +0100
Let applyAnd to be applied using different window sizes (#10372)
---
.../core/operator/BitmapDocIdSetOperator.java | 5 +++
.../ExpressionScanDocIdIterator.java | 16 +++++++
.../dociditerators/MVScanDocIdIterator.java | 20 ++++++---
.../dociditerators/SVScanDocIdIterator.java | 50 ++++++++++++++--------
.../dociditerators/ScanBasedDocIdIterator.java | 11 ++++-
.../core/operator/docidsets/SVScanDocIdSet.java | 4 +-
.../operator/filter/ScanBasedFilterOperator.java | 13 +++++-
7 files changed, 89 insertions(+), 30 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
index b251c552d8..4f203599c1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
@@ -52,6 +52,11 @@ public class BitmapDocIdSetOperator extends
BaseOperator<DocIdSetBlock> {
_docIdBuffer = new int[Math.min(numDocs,
DocIdSetPlanNode.MAX_DOC_PER_CALL)];
}
+ public BitmapDocIdSetOperator(IntIterator intIterator, int[] docIdBuffer) {
+ _intIterator = intIterator;
+ _docIdBuffer = docIdBuffer;
+ }
+
public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int[]
docIdBuffer) {
_intIterator = bitmap.getIntIterator();
_docIdBuffer = docIdBuffer;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
index 64987293bc..b5b2273644 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.dociditerators;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.OptionalInt;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.BitmapDocIdSetOperator;
@@ -33,7 +34,9 @@ import
org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.roaringbitmap.BatchIterator;
import org.roaringbitmap.BitmapDataProvider;
+import org.roaringbitmap.IntIterator;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
@@ -110,6 +113,19 @@ public final class ExpressionScanDocIdIterator implements
ScanBasedDocIdIterator
return next();
}
+ @Override
+ public MutableRoaringBitmap applyAnd(BatchIterator batchIterator,
OptionalInt firstDoc, OptionalInt lastDoc) {
+ IntIterator intIterator = batchIterator.asIntIterator(new
int[OPTIMAL_ITERATOR_BATCH_SIZE]);
+ ProjectionOperator projectionOperator =
+ new ProjectionOperator(_dataSourceMap, new
BitmapDocIdSetOperator(intIterator, _docIdBuffer));
+ MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
+ ProjectionBlock projectionBlock;
+ while ((projectionBlock = projectionOperator.nextBlock()) != null) {
+ processProjectionBlock(projectionBlock, matchingDocIds);
+ }
+ return matchingDocIds;
+ }
+
@Override
public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
ProjectionOperator projectionOperator =
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
index 558c4003d9..1c794e81c0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.operator.dociditerators;
+import java.util.OptionalInt;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -26,7 +27,6 @@ import
org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.spi.utils.CommonConstants.Query.OptimizationConstants;
import org.roaringbitmap.BatchIterator;
import org.roaringbitmap.RoaringBitmapWriter;
-import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -79,13 +79,21 @@ public final class MVScanDocIdIterator implements
ScanBasedDocIdIterator {
}
@Override
- public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
- if (docIds.isEmpty()) {
+ public MutableRoaringBitmap applyAnd(BatchIterator docIdIterator,
OptionalInt firstDoc, OptionalInt lastDoc) {
+ if (!docIdIterator.hasNext()) {
return new MutableRoaringBitmap();
}
- RoaringBitmapWriter<MutableRoaringBitmap> result =
RoaringBitmapWriter.bufferWriter()
- .expectedRange(docIds.first(), docIds.last()).runCompress(false).get();
- BatchIterator docIdIterator = docIds.getBatchIterator();
+ RoaringBitmapWriter<MutableRoaringBitmap> result;
+ if (firstDoc.isPresent() && lastDoc.isPresent()) {
+ result = RoaringBitmapWriter.bufferWriter()
+ .expectedRange(firstDoc.getAsInt(), lastDoc.getAsInt())
+ .runCompress(false)
+ .get();
+ } else {
+ result = RoaringBitmapWriter.bufferWriter()
+ .runCompress(false)
+ .get();
+ }
int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
while (docIdIterator.hasNext()) {
int limit = docIdIterator.nextBatch(buffer);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
index 1a4f9c2c54..baf07c16aa 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.core.operator.dociditerators;
+import java.util.OptionalInt;
import javax.annotation.Nullable;
+import org.apache.pinot.core.common.BlockDocIdIterator;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -44,7 +46,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private final ForwardIndexReaderContext _readerContext;
private final int _numDocs;
private final ValueMatcher _valueMatcher;
- private final int[] _batch = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final int[] _batch;
private int _firstMismatch;
private int _cursor;
private final int _cardinality;
@@ -53,7 +55,8 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private long _numEntriesScanned = 0L;
public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator, DataSource
dataSource, int numDocs,
- @Nullable NullValueVectorReader nullValueReader) {
+ @Nullable NullValueVectorReader nullValueReader, int batchSize) {
+ _batch = new int[batchSize];
_predicateEvaluator = predicateEvaluator;
_reader = dataSource.getForwardIndex();
_readerContext = _reader.createContext();
@@ -69,6 +72,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
// for testing
public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator,
ForwardIndexReader reader, int numDocs,
@Nullable NullValueVectorReader nullValueReader) {
+ _batch = new int[BlockDocIdIterator.OPTIMAL_ITERATOR_BATCH_SIZE];
_predicateEvaluator = predicateEvaluator;
_reader = reader;
_readerContext = reader.createContext();
@@ -87,7 +91,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
int limit;
int batchSize = 0;
do {
- limit = Math.min(_numDocs - _nextDocId, OPTIMAL_ITERATOR_BATCH_SIZE);
+ limit = Math.min(_numDocs - _nextDocId, _batch.length);
if (limit > 0) {
for (int i = 0; i < limit; i++) {
_batch[i] = _nextDocId + i;
@@ -121,14 +125,22 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
}
@Override
- public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
- if (docIds.isEmpty()) {
+ public MutableRoaringBitmap applyAnd(BatchIterator docIdIterator,
OptionalInt firstDoc, OptionalInt lastDoc) {
+ if (!docIdIterator.hasNext()) {
return new MutableRoaringBitmap();
}
- RoaringBitmapWriter<MutableRoaringBitmap> result =
RoaringBitmapWriter.bufferWriter()
- .expectedRange(docIds.first(), docIds.last()).runCompress(false).get();
- BatchIterator docIdIterator = docIds.getBatchIterator();
- int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+ RoaringBitmapWriter<MutableRoaringBitmap> result;
+ if (firstDoc.isPresent() && lastDoc.isPresent()) {
+ result = RoaringBitmapWriter.bufferWriter()
+ .expectedRange(firstDoc.getAsInt(), lastDoc.getAsInt())
+ .runCompress(false)
+ .get();
+ } else {
+ result = RoaringBitmapWriter.bufferWriter()
+ .runCompress(false)
+ .get();
+ }
+ int[] buffer = new int[_batch.length];
while (docIdIterator.hasNext()) {
int limit = docIdIterator.nextBatch(buffer);
if (limit > 0) {
@@ -264,7 +276,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private class DictIdMatcher implements ValueMatcher {
- private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final int[] _buffer = new int[_batch.length];
@Override
public boolean doesValueMatch(int docId) {
@@ -280,7 +292,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private class DictIdMatcherAndNullHandler implements ValueMatcher {
- private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final int[] _buffer = new int[_batch.length];
private final ImmutableRoaringBitmap _nullBitmap;
public DictIdMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
@@ -308,7 +320,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private class IntMatcher implements ValueMatcher {
- private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final int[] _buffer = new int[_batch.length];
@Override
public boolean doesValueMatch(int docId) {
@@ -325,7 +337,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private class IntMatcherAndNullHandler implements ValueMatcher {
private final ImmutableRoaringBitmap _nullBitmap;
- private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final int[] _buffer = new int[_batch.length];
public IntMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
_nullBitmap = nullBitmap;
@@ -349,7 +361,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private class LongMatcher implements ValueMatcher {
- private final long[] _buffer = new long[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final long[] _buffer = new long[_batch.length];
@Override
public boolean doesValueMatch(int docId) {
@@ -366,7 +378,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private class LongMatcherAndNullHandler implements ValueMatcher {
private final ImmutableRoaringBitmap _nullBitmap;
- private final long[] _buffer = new long[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final long[] _buffer = new long[_batch.length];
public LongMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
_nullBitmap = nullBitmap;
@@ -390,7 +402,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private class FloatMatcher implements ValueMatcher {
- private final float[] _buffer = new float[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final float[] _buffer = new float[_batch.length];
@Override
public boolean doesValueMatch(int docId) {
@@ -407,7 +419,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private class FloatMatcherAndNullHandler implements ValueMatcher {
private final ImmutableRoaringBitmap _nullBitmap;
- private final float[] _buffer = new float[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final float[] _buffer = new float[_batch.length];
public FloatMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
_nullBitmap = nullBitmap;
@@ -431,7 +443,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private class DoubleMatcher implements ValueMatcher {
- private final double[] _buffer = new double[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final double[] _buffer = new double[_batch.length];
@Override
public boolean doesValueMatch(int docId) {
@@ -448,7 +460,7 @@ public final class SVScanDocIdIterator implements
ScanBasedDocIdIterator {
private class DoubleMatcherAndNullHandler implements ValueMatcher {
private final ImmutableRoaringBitmap _nullBitmap;
- private final double[] _buffer = new double[OPTIMAL_ITERATOR_BATCH_SIZE];
+ private final double[] _buffer = new double[_batch.length];
public DoubleMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
_nullBitmap = nullBitmap;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java
index 1ed890cc83..1caa526444 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.core.operator.dociditerators;
+import java.util.OptionalInt;
import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.roaringbitmap.BatchIterator;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -34,10 +36,17 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
*/
public interface ScanBasedDocIdIterator extends BlockDocIdIterator {
+ MutableRoaringBitmap applyAnd(BatchIterator batchIterator, OptionalInt
firstDoc, OptionalInt lastDoc);
+
/**
* Applies AND operation to the given bitmap of document ids, returns a
bitmap of the matching document ids.
*/
- MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds);
+ default MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
+ if (docIds.isEmpty()) {
+ return new MutableRoaringBitmap();
+ }
+ return applyAnd(docIds.getBatchIterator(), OptionalInt.of(docIds.first()),
OptionalInt.of(docIds.last()));
+ }
/**
* Returns the number of entries (SV value contains one entry, MV value
contains multiple entries) scanned during the
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
index 85ca55bacc..40fc00a621 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
@@ -29,9 +29,9 @@ public final class SVScanDocIdSet implements BlockDocIdSet {
private final SVScanDocIdIterator _docIdIterator;
public SVScanDocIdSet(PredicateEvaluator predicateEvaluator, DataSource
dataSource, int numDocs,
- boolean nullHandlingEnabled) {
+ boolean nullHandlingEnabled, int batchSize) {
NullValueVectorReader nullValueVector = nullHandlingEnabled ?
dataSource.getNullValueVector() : null;
- _docIdIterator = new SVScanDocIdIterator(predicateEvaluator, dataSource,
numDocs, nullValueVector);
+ _docIdIterator = new SVScanDocIdIterator(predicateEvaluator, dataSource,
numDocs, nullValueVector, batchSize);
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
index 89305d24fb..b2241cfc6f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.filter;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
+import org.apache.pinot.core.common.BlockDocIdIterator;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.FilterBlock;
import org.apache.pinot.core.operator.docidsets.MVScanDocIdSet;
@@ -37,9 +38,15 @@ public class ScanBasedFilterOperator extends
BaseFilterOperator {
private final DataSource _dataSource;
private final int _numDocs;
private final boolean _nullHandlingEnabled;
+ private final int _batchSize;
- ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource
dataSource, int numDocs,
+ public ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator,
DataSource dataSource, int numDocs,
boolean nullHandlingEnabled) {
+ this(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled,
BlockDocIdIterator.OPTIMAL_ITERATOR_BATCH_SIZE);
+ }
+
+ public ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator,
DataSource dataSource, int numDocs,
+ boolean nullHandlingEnabled, int batchSize) {
_predicateEvaluator = predicateEvaluator;
_dataSource = dataSource;
_numDocs = numDocs;
@@ -47,13 +54,15 @@ public class ScanBasedFilterOperator extends
BaseFilterOperator {
Preconditions.checkState(_dataSource.getForwardIndex() != null,
"Forward index disabled for column: %s, scan based filtering not
supported!",
_dataSource.getDataSourceMetadata().getFieldSpec().getName());
+ _batchSize = batchSize;
}
@Override
protected FilterBlock getNextBlock() {
DataSourceMetadata dataSourceMetadata =
_dataSource.getDataSourceMetadata();
if (dataSourceMetadata.isSingleValue()) {
- return new FilterBlock(new SVScanDocIdSet(_predicateEvaluator,
_dataSource, _numDocs, _nullHandlingEnabled));
+ return new FilterBlock(new SVScanDocIdSet(_predicateEvaluator,
_dataSource, _numDocs, _nullHandlingEnabled,
+ _batchSize));
} else {
return new FilterBlock(new MVScanDocIdSet(_predicateEvaluator,
_dataSource, _numDocs));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]