This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 70b5656610 [core] Support creating multiple indexes on one column and 
correct parameter name for specifying bitmap index version (#5589)
70b5656610 is described below

commit 70b56566104a8f6d66dd7e1f08ebab386ee0c44a
Author: Zhonghang Liu <[email protected]>
AuthorDate: Tue May 13 23:02:48 2025 +0800

    [core] Support creating multiple indexes on one column and correct 
parameter name for specifying bitmap index version (#5589)
---
 docs/content/concepts/spec/fileindex.md            |   2 +-
 .../org/apache/paimon/io/DataFileIndexWriter.java  |  31 +--
 .../apache/paimon/io/DataFileIndexWriterTest.java  | 211 +++++++++++++++++++++
 3 files changed, 232 insertions(+), 12 deletions(-)

diff --git a/docs/content/concepts/spec/fileindex.md 
b/docs/content/concepts/spec/fileindex.md
index 2cede8afe1..c5c4835140 100644
--- a/docs/content/concepts/spec/fileindex.md
+++ b/docs/content/concepts/spec/fileindex.md
@@ -170,7 +170,7 @@ length:                        4 bytes int
 
 (Legacy) Bitmap file index format (V1):
 
-You can configure `file-index.bitmap.version` to use legacy bitmap version 1.
+You can configure `file-index.bitmap.<column_name>.version` to use legacy 
bitmap version 1.
 
 <pre>
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
index 2ba0283b7b..2c620dd143 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileIndexWriter.java
@@ -60,7 +60,8 @@ public final class DataFileIndexWriter implements Closeable {
     // if the filter size greater than fileIndexInManifestThreshold, we put it 
in file
     private final long inManifestThreshold;
 
-    private final Map<String, IndexMaintainer> indexMaintainers = new 
HashMap<>();
+    // index type, column name -> index maintainer
+    private final Map<String, Map<String, IndexMaintainer>> indexMaintainers = 
new HashMap<>();
 
     private String resultFileName;
 
@@ -101,7 +102,9 @@ public final class DataFileIndexWriter implements Closeable 
{
 
             for (Map.Entry<String, Options> typeEntry : 
entry.getValue().entrySet()) {
                 String indexType = typeEntry.getKey();
-                IndexMaintainer maintainer = indexMaintainers.get(columnName);
+                Map<String, IndexMaintainer> column2maintainers =
+                        indexMaintainers.computeIfAbsent(indexType, k -> new 
HashMap<>());
+                IndexMaintainer maintainer = 
column2maintainers.get(columnName);
                 if (entryColumn.isNestedColumn()) {
                     if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
                         throw new IllegalArgumentException(
@@ -121,7 +124,7 @@ public final class DataFileIndexWriter implements Closeable 
{
                                         fileIndexOptions.getMapTopLevelOptions(
                                                 columnName, 
typeEntry.getKey()),
                                         index.get(columnName));
-                        indexMaintainers.put(columnName, mapMaintainer);
+                        column2maintainers.put(columnName, mapMaintainer);
                     }
                     mapMaintainer.add(entryColumn.getNestedColumnName(), 
typeEntry.getValue());
                 } else {
@@ -137,7 +140,7 @@ public final class DataFileIndexWriter implements Closeable 
{
                                                 .createWriter(),
                                         InternalRow.createFieldGetter(
                                                 field.type(), 
index.get(columnName)));
-                        indexMaintainers.put(columnName, maintainer);
+                        column2maintainers.put(columnName, maintainer);
                     }
                 }
             }
@@ -146,7 +149,11 @@ public final class DataFileIndexWriter implements 
Closeable {
     }
 
     public void write(InternalRow row) {
-        indexMaintainers.values().forEach(index -> index.write(row));
+        indexMaintainers
+                .values()
+                .forEach(
+                        column2maintainers ->
+                                column2maintainers.values().forEach(index -> 
index.write(row)));
     }
 
     @Override
@@ -170,12 +177,14 @@ public final class DataFileIndexWriter implements 
Closeable {
 
     public Map<String, Map<String, byte[]>> serializeMaintainers() {
         Map<String, Map<String, byte[]>> indexMaps = new HashMap<>();
-        for (IndexMaintainer indexMaintainer : indexMaintainers.values()) {
-            Map<String, byte[]> mapBytes = indexMaintainer.serializedBytes();
-            for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
-                indexMaps
-                        .computeIfAbsent(entry.getKey(), k -> new HashMap<>())
-                        .put(indexMaintainer.getIndexType(), entry.getValue());
+        for (Map<String, IndexMaintainer> columnIndexMaintainers : 
indexMaintainers.values()) {
+            for (IndexMaintainer indexMaintainer : 
columnIndexMaintainers.values()) {
+                Map<String, byte[]> mapBytes = 
indexMaintainer.serializedBytes();
+                for (Map.Entry<String, byte[]> entry : mapBytes.entrySet()) {
+                    indexMaps
+                            .computeIfAbsent(entry.getKey(), k -> new 
HashMap<>())
+                            .put(indexMaintainer.getIndexType(), 
entry.getValue());
+                }
             }
         }
         return indexMaps;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
new file mode 100644
index 0000000000..71c60fa4bc
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fileindex.FileIndexFormat;
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+
+/** Tests for {@link DataFileIndexWriter}. */
+public class DataFileIndexWriterTest {
+
+    @TempDir java.nio.file.Path tempFile;
+
+    FileIO fileIO = LocalFileIO.create();
+
+    boolean bitmapExist = false;
+    boolean bsiExist = false;
+    boolean bloomExists = false;
+
+    @Test
+    public void testCreatingMultipleIndexesOnOneColumn() throws Exception {
+
+        String tableName = "test";
+        String col1 = "f0";
+        String col2 = "f1";
+        Identifier identifier = Identifier.create(tableName, tableName);
+
+        Map<String, String> optionsMap = new HashMap<>();
+        optionsMap.put("file-index.bitmap.columns", col1);
+        optionsMap.put("file-index.bsi.columns", col1);
+        optionsMap.put("file-index.bloom-filter.columns", col2);
+        optionsMap.put("file-index.read.enabled", "true");
+        optionsMap.put("file-index.in-manifest-threshold", "1B");
+
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.options(optionsMap);
+        schemaBuilder.column(col1, DataTypes.INT());
+        schemaBuilder.column(col2, DataTypes.INT());
+        Schema schema = schemaBuilder.build();
+
+        Options catalogOptions = new Options();
+        catalogOptions.set(CatalogOptions.WAREHOUSE, 
tempFile.toUri().toString());
+        catalogOptions.set(CACHE_ENABLED, false);
+        CatalogContext context = CatalogContext.create(catalogOptions);
+        FileSystemCatalog catalog = (FileSystemCatalog) 
CatalogFactory.createCatalog(context);
+        catalog.createDatabase(tableName, false);
+        catalog.createTable(identifier, schema, false);
+        Table table = catalog.getTable(identifier);
+
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        IOManager ioManager = new IOManagerImpl("/tmp");
+        BatchTableWrite write = writeBuilder.newWrite();
+        write.withIOManager(ioManager);
+        write.write(GenericRow.of(1, 1));
+        write.write(GenericRow.of(1, 2));
+        write.write(GenericRow.of(2, 3));
+        List<CommitMessage> commitMessages = write.prepareCommit();
+        writeBuilder.newCommit().commit(commitMessages);
+
+        foreachIndexReader(
+                catalog,
+                tableName,
+                col1,
+                fileIndexReader -> {
+                    String className = fileIndexReader.getClass().getName();
+                    if (className.endsWith(".BitmapFileIndex$Reader")) {
+                        bitmapExist = true;
+                    } else if 
(className.endsWith(".BitSliceIndexBitmapFileIndex$Reader")) {
+                        bsiExist = true;
+                    } else {
+                        throw new RuntimeException("unknown file index reader: 
" + className);
+                    }
+                    BitmapIndexResult result =
+                            (BitmapIndexResult)
+                                    fileIndexReader.visitEqual(
+                                            new FieldRef(0, col1, 
DataTypes.INT()), 1);
+                    assert result.get().equals(RoaringBitmap32.bitmapOf(0, 1));
+                });
+
+        foreachIndexReader(
+                catalog,
+                tableName,
+                col2,
+                fileIndexReader -> {
+                    String className = fileIndexReader.getClass().getName();
+                    if (className.endsWith(".BloomFilterFileIndex$Reader")) {
+                        bloomExists = true;
+                    }
+                });
+
+        assert bitmapExist;
+        assert bsiExist;
+        assert bloomExists;
+    }
+
+    protected void foreachIndexReader(
+            FileSystemCatalog fileSystemCatalog,
+            String tableName,
+            String columnName,
+            Consumer<FileIndexReader> consumer)
+            throws Catalog.TableNotExistException {
+        Path tableRoot =
+                
fileSystemCatalog.getTableLocation(Identifier.create(tableName, tableName));
+        SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
+        FileStorePathFactory pathFactory =
+                new FileStorePathFactory(
+                        tableRoot,
+                        RowType.of(),
+                        new CoreOptions(new Options()).partitionDefaultName(),
+                        CoreOptions.FILE_FORMAT.defaultValue(),
+                        CoreOptions.DATA_FILE_PREFIX.defaultValue(),
+                        CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue(),
+                        null,
+                        null);
+
+        Table table = fileSystemCatalog.getTable(Identifier.create(tableName, 
tableName));
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        for (Split split : splits) {
+            DataSplit dataSplit = (DataSplit) split;
+            DataFilePathFactory dataFilePathFactory =
+                    pathFactory.createDataFilePathFactory(
+                            dataSplit.partition(), dataSplit.bucket());
+            for (DataFileMeta dataFileMeta : dataSplit.dataFiles()) {
+                TableSchema tableSchema = 
schemaManager.schema(dataFileMeta.schemaId());
+                List<String> indexFiles =
+                        dataFileMeta.extraFiles().stream()
+                                .filter(
+                                        name ->
+                                                name.endsWith(
+                                                        
DataFilePathFactory.INDEX_PATH_SUFFIX))
+                                .collect(Collectors.toList());
+                // assert index file exist and only one index file
+                assert indexFiles.size() == 1;
+                try (FileIndexFormat.Reader reader =
+                        FileIndexFormat.createReader(
+                                fileIO.newInputStream(
+                                        dataFilePathFactory.toAlignedPath(
+                                                indexFiles.get(0), 
dataFileMeta)),
+                                tableSchema.logicalRowType())) {
+                    Set<FileIndexReader> fileIndexReaders = 
reader.readColumnIndex(columnName);
+                    for (FileIndexReader fileIndexReader : fileIndexReaders) {
+                        consumer.accept(fileIndexReader);
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+}

Reply via email to