This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 396f37daec Allow bloom filters for non dictionary columns (#9121)
396f37daec is described below
commit 396f37daec153eb584094fbaa9450010323d8933
Author: Saurabh Dubey <[email protected]>
AuthorDate: Fri Jul 29 11:04:09 2022 +0530
Allow bloom filters for non dictionary columns (#9121)
---
.../creator/impl/DefaultIndexCreatorProvider.java | 13 +-
.../loader/bloomfilter/BloomFilterHandler.java | 151 +++++++++++++++++++--
.../segment/local/utils/TableConfigUtils.java | 4 -
.../index/loader/SegmentPreProcessorTest.java | 2 -
.../segment/local/utils/TableConfigUtilsTest.java | 9 --
5 files changed, 151 insertions(+), 28 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
index 371861cf4f..cd3a5603db 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
@@ -44,6 +44,7 @@ import
org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTInd
import
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
import
org.apache.pinot.segment.local.segment.creator.impl.text.NativeTextIndexCreator;
import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
+import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
@@ -257,8 +258,16 @@ public final class DefaultIndexCreatorProvider implements
IndexCreatorProvider {
@Override
public BloomFilterCreator
newBloomFilterCreator(IndexCreationContext.BloomFilter context)
throws IOException {
- return new OnHeapGuavaBloomFilterCreator(context.getIndexDir(),
context.getFieldSpec().getName(),
- context.getCardinality(),
Objects.requireNonNull(context.getBloomFilterConfig()));
+ int cardinality = context.getCardinality();
+ if (cardinality == Constants.UNKNOWN_CARDINALITY) {
+ // This is when we're creating bloom filters for non dictionary encoded
cols where exact cardinality is not
+ // known beforehand.
+ // Since this field is only used for the estimate cardinality, using
total # of entries instead
+ // TODO (saurabh) Check if we can do a better estimate
+ cardinality = context.getTotalNumberOfEntries();
+ }
+ return new OnHeapGuavaBloomFilterCreator(context.getIndexDir(),
context.getFieldSpec().getName(), cardinality,
+ Objects.requireNonNull(context.getBloomFilterConfig()));
}
@Override
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
index 16127f9aa8..ddb53e04b3 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -43,11 +43,14 @@ import
org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.BloomFilterConfig;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BytesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,8 +112,137 @@ public class BloomFilterHandler implements IndexHandler {
}
private boolean shouldCreateBloomFilter(ColumnMetadata columnMetadata) {
- // TODO: Support raw index
- return columnMetadata != null && columnMetadata.hasDictionary();
+ return columnMetadata != null;
+ }
+
+ private void
createAndSealBloomFilterForDictionaryColumn(BloomFilterCreatorProvider
indexCreatorProvider,
+ File indexDir, ColumnMetadata columnMetadata, BloomFilterConfig
bloomFilterConfig,
+ SegmentDirectory.Writer segmentWriter)
+ throws Exception {
+ try (BloomFilterCreator bloomFilterCreator =
indexCreatorProvider.newBloomFilterCreator(
+
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
+ .forBloomFilter(bloomFilterConfig));
+ Dictionary dictionary = getDictionaryReader(columnMetadata,
segmentWriter)) {
+ int length = dictionary.length();
+ for (int i = 0; i < length; i++) {
+ bloomFilterCreator.add(dictionary.getStringValue(i));
+ }
+ bloomFilterCreator.seal();
+ }
+ }
+
+ private void
createAndSealBloomFilterForNonDictionaryColumn(BloomFilterCreatorProvider
indexCreatorProvider,
+ File indexDir, ColumnMetadata columnMetadata, BloomFilterConfig
bloomFilterConfig,
+ SegmentDirectory.Writer segmentWriter)
+ throws Exception {
+ int numDocs = columnMetadata.getTotalDocs();
+ try (BloomFilterCreator bloomFilterCreator =
indexCreatorProvider.newBloomFilterCreator(
+
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
+ .forBloomFilter(bloomFilterConfig));
+ ForwardIndexReader forwardIndexReader =
LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
+ ForwardIndexReaderContext readerContext =
forwardIndexReader.createContext()) {
+ if (columnMetadata.isSingleValue()) {
+ // SV
+ switch (columnMetadata.getDataType()) {
+ case INT:
+ for (int i = 0; i < numDocs; i++) {
+
bloomFilterCreator.add(Integer.toString(forwardIndexReader.getInt(i,
readerContext)));
+ }
+ break;
+ case LONG:
+ for (int i = 0; i < numDocs; i++) {
+
bloomFilterCreator.add(Long.toString(forwardIndexReader.getLong(i,
readerContext)));
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < numDocs; i++) {
+
bloomFilterCreator.add(Float.toString(forwardIndexReader.getFloat(i,
readerContext)));
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < numDocs; i++) {
+
bloomFilterCreator.add(Double.toString(forwardIndexReader.getDouble(i,
readerContext)));
+ }
+ break;
+ case STRING:
+ for (int i = 0; i < numDocs; i++) {
+ bloomFilterCreator.add(forwardIndexReader.getString(i,
readerContext));
+ }
+ break;
+ case BYTES:
+ for (int i = 0; i < numDocs; i++) {
+
bloomFilterCreator.add(BytesUtils.toHexString(forwardIndexReader.getBytes(i,
readerContext)));
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unsupported data type: " +
columnMetadata.getDataType() + " for column: "
+ + columnMetadata.getColumnName());
+ }
+ bloomFilterCreator.seal();
+ } else {
+ // MV
+ switch (columnMetadata.getDataType()) {
+ case INT:
+ for (int i = 0; i < numDocs; i++) {
+ int[] buffer = new
int[columnMetadata.getMaxNumberOfMultiValues()];
+ int length = forwardIndexReader.getIntMV(i, buffer,
readerContext);
+ for (int j = 0; j < length; j++) {
+ bloomFilterCreator.add(Integer.toString(buffer[j]));
+ }
+ }
+ break;
+ case LONG:
+ for (int i = 0; i < numDocs; i++) {
+ long[] buffer = new
long[columnMetadata.getMaxNumberOfMultiValues()];
+ int length = forwardIndexReader.getLongMV(i, buffer,
readerContext);
+ for (int j = 0; j < length; j++) {
+ bloomFilterCreator.add(Long.toString(buffer[j]));
+ }
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < numDocs; i++) {
+ float[] buffer = new
float[columnMetadata.getMaxNumberOfMultiValues()];
+ int length = forwardIndexReader.getFloatMV(i, buffer,
readerContext);
+ for (int j = 0; j < length; j++) {
+ bloomFilterCreator.add(Float.toString(buffer[j]));
+ }
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < numDocs; i++) {
+ double[] buffer = new
double[columnMetadata.getMaxNumberOfMultiValues()];
+ int length = forwardIndexReader.getDoubleMV(i, buffer,
readerContext);
+ for (int j = 0; j < length; j++) {
+ bloomFilterCreator.add(Double.toString(buffer[j]));
+ }
+ }
+ break;
+ case STRING:
+ for (int i = 0; i < numDocs; i++) {
+ String[] buffer = new
String[columnMetadata.getMaxNumberOfMultiValues()];
+ int length = forwardIndexReader.getStringMV(i, buffer,
readerContext);
+ for (int j = 0; j < length; j++) {
+ bloomFilterCreator.add(buffer[j]);
+ }
+ }
+ break;
+ case BYTES:
+ for (int i = 0; i < numDocs; i++) {
+ byte[][] buffer = new
byte[columnMetadata.getMaxNumberOfMultiValues()][];
+ int length = forwardIndexReader.getBytesMV(i, buffer,
readerContext);
+ for (int j = 0; j < length; j++) {
+ bloomFilterCreator.add(BytesUtils.toHexString(buffer[j]));
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unsupported data type: " +
columnMetadata.getDataType() + " for column: "
+ + columnMetadata.getColumnName());
+ }
+ bloomFilterCreator.seal();
+ }
+ }
}
private void createBloomFilterForColumn(SegmentDirectory.Writer
segmentWriter, ColumnMetadata columnMetadata,
@@ -136,15 +268,12 @@ public class BloomFilterHandler implements IndexHandler {
BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
LOGGER.info("Creating new bloom filter for segment: {}, column: {} with
config: {}", segmentName, columnName,
bloomFilterConfig);
- try (BloomFilterCreator bloomFilterCreator =
indexCreatorProvider.newBloomFilterCreator(
-
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
- .build().forBloomFilter(bloomFilterConfig));
- Dictionary dictionary = getDictionaryReader(columnMetadata,
segmentWriter)) {
- int length = dictionary.length();
- for (int i = 0; i < length; i++) {
- bloomFilterCreator.add(dictionary.getStringValue(i));
- }
- bloomFilterCreator.seal();
+ if (columnMetadata.hasDictionary()) {
+ createAndSealBloomFilterForDictionaryColumn(indexCreatorProvider,
indexDir, columnMetadata, bloomFilterConfig,
+ segmentWriter);
+ } else {
+ createAndSealBloomFilterForNonDictionaryColumn(indexCreatorProvider,
indexDir, columnMetadata, bloomFilterConfig,
+ segmentWriter);
}
// For v3, write the generated bloom filter file into the single file and
remove it.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 214458e7e2..88da20e0f5 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -696,10 +696,6 @@ public final class TableConfigUtils {
bloomFilterColumns.addAll(indexingConfig.getBloomFilterConfigs().keySet());
}
for (String bloomFilterColumn : bloomFilterColumns) {
- if (noDictionaryColumnsSet.contains(bloomFilterColumn)) {
- throw new IllegalStateException("Cannot create a Bloom Filter on
column " + bloomFilterColumn
- + " specified in the noDictionaryColumns config");
- }
columnNameToConfigMap.put(bloomFilterColumn, "Bloom Filter Config");
}
if (indexingConfig.getInvertedIndexColumns() != null) {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 861743ebb0..1350fab3b5 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -1267,8 +1267,6 @@ public class SegmentPreProcessorTest {
// In fact, the validation logic when updating index configs already
blocks this to happen.
testCases.put("addInvertedIndexOnNonDictColumn", (IndexLoadingConfig
config) -> config.setInvertedIndexColumns(
new HashSet<>(Collections.singletonList("column4"))));
- testCases.put("addBloomFilterOnNonDictColumn", (IndexLoadingConfig config)
-> config.setBloomFilterConfigs(
- ImmutableMap.of("column4", new BloomFilterConfig(0.1, 1024, true))));
// No index is added on non-existing columns.
// The validation logic when updating index configs already blocks this to
happen.
testCases.put("addInvertedIndexOnAbsentColumn", (IndexLoadingConfig
config) -> config.setInvertedIndexColumns(
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index b87ba4edc2..9bca0ff9ce 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1080,15 +1080,6 @@ public class TableConfigUtilsTest {
// expected
}
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(columnList)
- .setBloomFilterColumns(columnList).build();
- try {
- TableConfigUtils.validate(tableConfig, schema);
- Assert.fail("Should fail for valid column name in both no dictionary and
bloom filter column config");
- } catch (Exception e) {
- // expected
- }
-
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setJsonIndexColumns(Arrays.asList("non-existent-column")).build();
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]