This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 396f37daec Allow bloom filters for non dictionary columns (#9121)
396f37daec is described below

commit 396f37daec153eb584094fbaa9450010323d8933
Author: Saurabh Dubey <[email protected]>
AuthorDate: Fri Jul 29 11:04:09 2022 +0530

    Allow bloom filters for non dictionary columns (#9121)
---
 .../creator/impl/DefaultIndexCreatorProvider.java  |  13 +-
 .../loader/bloomfilter/BloomFilterHandler.java     | 151 +++++++++++++++++++--
 .../segment/local/utils/TableConfigUtils.java      |   4 -
 .../index/loader/SegmentPreProcessorTest.java      |   2 -
 .../segment/local/utils/TableConfigUtilsTest.java  |   9 --
 5 files changed, 151 insertions(+), 28 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
index 371861cf4f..cd3a5603db 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
@@ -44,6 +44,7 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTInd
 import 
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.text.NativeTextIndexCreator;
 import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
+import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
@@ -257,8 +258,16 @@ public final class DefaultIndexCreatorProvider implements 
IndexCreatorProvider {
   @Override
   public BloomFilterCreator 
newBloomFilterCreator(IndexCreationContext.BloomFilter context)
       throws IOException {
-    return new OnHeapGuavaBloomFilterCreator(context.getIndexDir(), 
context.getFieldSpec().getName(),
-        context.getCardinality(), 
Objects.requireNonNull(context.getBloomFilterConfig()));
+    int cardinality = context.getCardinality();
+    if (cardinality == Constants.UNKNOWN_CARDINALITY) {
+      // This is when we're creating bloom filters for non dictionary encoded 
cols where exact cardinality is not
+      // known beforehand.
+      // Since this field is only used for the estimate cardinality, using 
total # of entries instead
+      // TODO (saurabh) Check if we can do a better estimate
+      cardinality = context.getTotalNumberOfEntries();
+    }
+    return new OnHeapGuavaBloomFilterCreator(context.getIndexDir(), 
context.getFieldSpec().getName(), cardinality,
+        Objects.requireNonNull(context.getBloomFilterConfig()));
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
index 16127f9aa8..ddb53e04b3 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -43,11 +43,14 @@ import 
org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.BloomFilterConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BytesUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,8 +112,137 @@ public class BloomFilterHandler implements IndexHandler {
   }
 
   private boolean shouldCreateBloomFilter(ColumnMetadata columnMetadata) {
-    // TODO: Support raw index
-    return columnMetadata != null && columnMetadata.hasDictionary();
+    return columnMetadata != null;
+  }
+
+  private void 
createAndSealBloomFilterForDictionaryColumn(BloomFilterCreatorProvider 
indexCreatorProvider,
+      File indexDir, ColumnMetadata columnMetadata, BloomFilterConfig 
bloomFilterConfig,
+      SegmentDirectory.Writer segmentWriter)
+      throws Exception {
+    try (BloomFilterCreator bloomFilterCreator = 
indexCreatorProvider.newBloomFilterCreator(
+        
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
+            .forBloomFilter(bloomFilterConfig));
+        Dictionary dictionary = getDictionaryReader(columnMetadata, 
segmentWriter)) {
+      int length = dictionary.length();
+      for (int i = 0; i < length; i++) {
+        bloomFilterCreator.add(dictionary.getStringValue(i));
+      }
+      bloomFilterCreator.seal();
+    }
+  }
+
+  private void 
createAndSealBloomFilterForNonDictionaryColumn(BloomFilterCreatorProvider 
indexCreatorProvider,
+      File indexDir, ColumnMetadata columnMetadata, BloomFilterConfig 
bloomFilterConfig,
+      SegmentDirectory.Writer segmentWriter)
+      throws Exception {
+    int numDocs = columnMetadata.getTotalDocs();
+    try (BloomFilterCreator bloomFilterCreator = 
indexCreatorProvider.newBloomFilterCreator(
+        
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
+            .forBloomFilter(bloomFilterConfig));
+        ForwardIndexReader forwardIndexReader = 
LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
+        ForwardIndexReaderContext readerContext = 
forwardIndexReader.createContext()) {
+      if (columnMetadata.isSingleValue()) {
+        // SV
+        switch (columnMetadata.getDataType()) {
+          case INT:
+            for (int i = 0; i < numDocs; i++) {
+              
bloomFilterCreator.add(Integer.toString(forwardIndexReader.getInt(i, 
readerContext)));
+            }
+            break;
+          case LONG:
+            for (int i = 0; i < numDocs; i++) {
+              
bloomFilterCreator.add(Long.toString(forwardIndexReader.getLong(i, 
readerContext)));
+            }
+            break;
+          case FLOAT:
+            for (int i = 0; i < numDocs; i++) {
+              
bloomFilterCreator.add(Float.toString(forwardIndexReader.getFloat(i, 
readerContext)));
+            }
+            break;
+          case DOUBLE:
+            for (int i = 0; i < numDocs; i++) {
+              
bloomFilterCreator.add(Double.toString(forwardIndexReader.getDouble(i, 
readerContext)));
+            }
+            break;
+          case STRING:
+            for (int i = 0; i < numDocs; i++) {
+              bloomFilterCreator.add(forwardIndexReader.getString(i, 
readerContext));
+            }
+            break;
+          case BYTES:
+            for (int i = 0; i < numDocs; i++) {
+              
bloomFilterCreator.add(BytesUtils.toHexString(forwardIndexReader.getBytes(i, 
readerContext)));
+            }
+            break;
+          default:
+            throw new IllegalStateException("Unsupported data type: " + 
columnMetadata.getDataType() + " for column: "
+                + columnMetadata.getColumnName());
+        }
+        bloomFilterCreator.seal();
+      } else {
+        // MV
+        switch (columnMetadata.getDataType()) {
+          case INT:
+            for (int i = 0; i < numDocs; i++) {
+              int[] buffer = new 
int[columnMetadata.getMaxNumberOfMultiValues()];
+              int length = forwardIndexReader.getIntMV(i, buffer, 
readerContext);
+              for (int j = 0; j < length; j++) {
+                bloomFilterCreator.add(Integer.toString(buffer[j]));
+              }
+            }
+            break;
+          case LONG:
+            for (int i = 0; i < numDocs; i++) {
+              long[] buffer = new 
long[columnMetadata.getMaxNumberOfMultiValues()];
+              int length = forwardIndexReader.getLongMV(i, buffer, 
readerContext);
+              for (int j = 0; j < length; j++) {
+                bloomFilterCreator.add(Long.toString(buffer[j]));
+              }
+            }
+            break;
+          case FLOAT:
+            for (int i = 0; i < numDocs; i++) {
+              float[] buffer = new 
float[columnMetadata.getMaxNumberOfMultiValues()];
+              int length = forwardIndexReader.getFloatMV(i, buffer, 
readerContext);
+              for (int j = 0; j < length; j++) {
+                bloomFilterCreator.add(Float.toString(buffer[j]));
+              }
+            }
+            break;
+          case DOUBLE:
+            for (int i = 0; i < numDocs; i++) {
+              double[] buffer = new 
double[columnMetadata.getMaxNumberOfMultiValues()];
+              int length = forwardIndexReader.getDoubleMV(i, buffer, 
readerContext);
+              for (int j = 0; j < length; j++) {
+                bloomFilterCreator.add(Double.toString(buffer[j]));
+              }
+            }
+            break;
+          case STRING:
+            for (int i = 0; i < numDocs; i++) {
+              String[] buffer = new 
String[columnMetadata.getMaxNumberOfMultiValues()];
+              int length = forwardIndexReader.getStringMV(i, buffer, 
readerContext);
+              for (int j = 0; j < length; j++) {
+                bloomFilterCreator.add(buffer[j]);
+              }
+            }
+            break;
+          case BYTES:
+            for (int i = 0; i < numDocs; i++) {
+              byte[][] buffer = new 
byte[columnMetadata.getMaxNumberOfMultiValues()][];
+              int length = forwardIndexReader.getBytesMV(i, buffer, 
readerContext);
+              for (int j = 0; j < length; j++) {
+                bloomFilterCreator.add(BytesUtils.toHexString(buffer[j]));
+              }
+            }
+            break;
+          default:
+            throw new IllegalStateException("Unsupported data type: " + 
columnMetadata.getDataType() + " for column: "
+                + columnMetadata.getColumnName());
+        }
+        bloomFilterCreator.seal();
+      }
+    }
   }
 
   private void createBloomFilterForColumn(SegmentDirectory.Writer 
segmentWriter, ColumnMetadata columnMetadata,
@@ -136,15 +268,12 @@ public class BloomFilterHandler implements IndexHandler {
     BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
     LOGGER.info("Creating new bloom filter for segment: {}, column: {} with 
config: {}", segmentName, columnName,
         bloomFilterConfig);
-    try (BloomFilterCreator bloomFilterCreator = 
indexCreatorProvider.newBloomFilterCreator(
-        
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
-            .build().forBloomFilter(bloomFilterConfig));
-        Dictionary dictionary = getDictionaryReader(columnMetadata, 
segmentWriter)) {
-      int length = dictionary.length();
-      for (int i = 0; i < length; i++) {
-        bloomFilterCreator.add(dictionary.getStringValue(i));
-      }
-      bloomFilterCreator.seal();
+    if (columnMetadata.hasDictionary()) {
+      createAndSealBloomFilterForDictionaryColumn(indexCreatorProvider, 
indexDir, columnMetadata, bloomFilterConfig,
+          segmentWriter);
+    } else {
+      createAndSealBloomFilterForNonDictionaryColumn(indexCreatorProvider, 
indexDir, columnMetadata, bloomFilterConfig,
+          segmentWriter);
     }
 
     // For v3, write the generated bloom filter file into the single file and 
remove it.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 214458e7e2..88da20e0f5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -696,10 +696,6 @@ public final class TableConfigUtils {
       
bloomFilterColumns.addAll(indexingConfig.getBloomFilterConfigs().keySet());
     }
     for (String bloomFilterColumn : bloomFilterColumns) {
-      if (noDictionaryColumnsSet.contains(bloomFilterColumn)) {
-        throw new IllegalStateException("Cannot create a Bloom Filter on 
column " + bloomFilterColumn
-            + " specified in the noDictionaryColumns config");
-      }
       columnNameToConfigMap.put(bloomFilterColumn, "Bloom Filter Config");
     }
     if (indexingConfig.getInvertedIndexColumns() != null) {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 861743ebb0..1350fab3b5 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -1267,8 +1267,6 @@ public class SegmentPreProcessorTest {
     // In fact, the validation logic when updating index configs already 
blocks this to happen.
     testCases.put("addInvertedIndexOnNonDictColumn", (IndexLoadingConfig 
config) -> config.setInvertedIndexColumns(
         new HashSet<>(Collections.singletonList("column4"))));
-    testCases.put("addBloomFilterOnNonDictColumn", (IndexLoadingConfig config) 
-> config.setBloomFilterConfigs(
-        ImmutableMap.of("column4", new BloomFilterConfig(0.1, 1024, true))));
     // No index is added on non-existing columns.
     // The validation logic when updating index configs already blocks this to 
happen.
     testCases.put("addInvertedIndexOnAbsentColumn", (IndexLoadingConfig 
config) -> config.setInvertedIndexColumns(
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index b87ba4edc2..9bca0ff9ce 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1080,15 +1080,6 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(columnList)
-        .setBloomFilterColumns(columnList).build();
-    try {
-      TableConfigUtils.validate(tableConfig, schema);
-      Assert.fail("Should fail for valid column name in both no dictionary and 
bloom filter column config");
-    } catch (Exception e) {
-      // expected
-    }
-
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setJsonIndexColumns(Arrays.asList("non-existent-column")).build();
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to