This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d5040b5e9 [core] Enable file index for DV table (#4310)
d5040b5e9 is described below

commit d5040b5e91bfd28ac68376e525b8e9a4b0e8ff5d
Author: YeJunHao <[email protected]>
AuthorDate: Mon Nov 4 10:47:55 2024 +0800

    [core] Enable file index for DV table (#4310)
---
 .../java/org/apache/paimon/KeyValueFileStore.java  |  3 +-
 .../java/org/apache/paimon/io/DataFileMeta.java    | 37 ++++++++++
 .../apache/paimon/io/KeyValueDataFileWriter.java   | 34 +++++++++-
 .../paimon/io/KeyValueFileWriterFactory.java       |  6 +-
 .../paimon/operation/KeyValueFileStoreScan.java    | 44 +++++++++++-
 .../paimon/table/PrimaryKeyFileStoreTableTest.java | 79 ++++++++++++++++++++--
 6 files changed, 192 insertions(+), 11 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 2e3aa965f..8f2dbbf5f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -239,7 +239,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 options.scanManifestParallelism(),
                 options.deletionVectorsEnabled(),
                 options.mergeEngine(),
-                options.changelogProducer());
+                options.changelogProducer(),
+                options.fileIndexReadEnabled() && 
options.deletionVectorsEnabled());
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index bfda80db9..b6cac5ae5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -152,6 +152,43 @@ public class DataFileMeta {
                 valueStatsCols);
     }
 
+    public DataFileMeta(
+            String fileName,
+            long fileSize,
+            long rowCount,
+            BinaryRow minKey,
+            BinaryRow maxKey,
+            SimpleStats keyStats,
+            SimpleStats valueStats,
+            long minSequenceNumber,
+            long maxSequenceNumber,
+            long schemaId,
+            int level,
+            List<String> extraFiles,
+            @Nullable Long deleteRowCount,
+            @Nullable byte[] embeddedIndex,
+            @Nullable FileSource fileSource,
+            @Nullable List<String> valueStatsCols) {
+        this(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                keyStats,
+                valueStats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                schemaId,
+                level,
+                extraFiles,
+                
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
+                deleteRowCount,
+                embeddedIndex,
+                fileSource,
+                valueStatsCols);
+    }
+
     public DataFileMeta(
             String fileName,
             long fileSize,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index ba42f8720..ce0b3b028 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SimpleColStats;
 import org.apache.paimon.format.SimpleStatsExtractor;
@@ -42,9 +43,12 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
 
+import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
+
 /**
  * A {@link StatsCollectingSingleFileWriter} to write data files containing 
{@link KeyValue}s. Also
  * produces {@link DataFileMeta} after writing a file.
@@ -66,6 +70,7 @@ public class KeyValueDataFileWriter
     private final SimpleStatsConverter valueStatsConverter;
     private final InternalRowSerializer keySerializer;
     private final FileSource fileSource;
+    @Nullable private final DataFileIndexWriter dataFileIndexWriter;
 
     private BinaryRow minKey = null;
     private InternalRow maxKey = null;
@@ -85,7 +90,8 @@ public class KeyValueDataFileWriter
             int level,
             String compression,
             CoreOptions options,
-            FileSource fileSource) {
+            FileSource fileSource,
+            FileIndexOptions fileIndexOptions) {
         super(
                 fileIO,
                 factory,
@@ -107,12 +113,19 @@ public class KeyValueDataFileWriter
         this.valueStatsConverter = new SimpleStatsConverter(valueType, 
options.statsDenseStore());
         this.keySerializer = new InternalRowSerializer(keyType);
         this.fileSource = fileSource;
+        this.dataFileIndexWriter =
+                DataFileIndexWriter.create(
+                        fileIO, dataFileToFileIndexPath(path), valueType, 
fileIndexOptions);
     }
 
     @Override
     public void write(KeyValue kv) throws IOException {
         super.write(kv);
 
+        if (dataFileIndexWriter != null) {
+            dataFileIndexWriter.write(kv.value());
+        }
+
         updateMinKey(kv);
         updateMaxKey(kv);
 
@@ -165,6 +178,11 @@ public class KeyValueDataFileWriter
         Pair<List<String>, SimpleStats> valueStatsPair =
                 valueStatsConverter.toBinary(valFieldStats);
 
+        DataFileIndexWriter.FileIndexResult indexResult =
+                dataFileIndexWriter == null
+                        ? DataFileIndexWriter.EMPTY_RESULT
+                        : dataFileIndexWriter.result();
+
         return new DataFileMeta(
                 path.getName(),
                 fileIO.getFileSize(path),
@@ -177,10 +195,20 @@ public class KeyValueDataFileWriter
                 maxSeqNumber,
                 schemaId,
                 level,
+                indexResult.independentIndexFile() == null
+                        ? Collections.emptyList()
+                        : 
Collections.singletonList(indexResult.independentIndexFile()),
                 deleteRecordCount,
-                // TODO: enable file filter for primary key table (e.g. 
deletion table).
-                null,
+                indexResult.embeddedIndexBytes(),
                 fileSource,
                 valueStatsPair.getKey());
     }
+
+    @Override
+    public void close() throws IOException {
+        if (dataFileIndexWriter != null) {
+            dataFileIndexWriter.close();
+        }
+        super.close();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 922b06ee8..a6fddb432 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueSerializer;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SimpleStatsExtractor;
@@ -52,6 +53,7 @@ public class KeyValueFileWriterFactory {
     private final WriteFormatContext formatContext;
     private final long suggestedFileSize;
     private final CoreOptions options;
+    private final FileIndexOptions fileIndexOptions;
 
     private KeyValueFileWriterFactory(
             FileIO fileIO,
@@ -68,6 +70,7 @@ public class KeyValueFileWriterFactory {
         this.formatContext = formatContext;
         this.suggestedFileSize = suggestedFileSize;
         this.options = options;
+        this.fileIndexOptions = options.indexColumnsOptions();
     }
 
     public RowType keyType() {
@@ -117,7 +120,8 @@ public class KeyValueFileWriterFactory {
                 level,
                 formatContext.compression(level),
                 options,
-                fileSource);
+                fileSource,
+                fileIndexOptions);
     }
 
     public void deleteFile(String filename, int level) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 8300bdcfa..b5683fbe0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.fileindex.FileIndexPredicate;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
@@ -31,11 +32,17 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.SimpleStatsEvolution;
 import org.apache.paimon.stats.SimpleStatsEvolutions;
 import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
@@ -54,6 +61,10 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
     private final MergeEngine mergeEngine;
     private final ChangelogProducer changelogProducer;
 
+    private final boolean fileIndexReadEnabled;
+    // just cache.
+    private final Map<Long, Predicate> dataFilterMapping = new HashMap<>();
+
     public KeyValueFileStoreScan(
             ManifestsReader manifestsReader,
             BucketSelectConverter bucketSelectConverter,
@@ -65,7 +76,8 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             Integer scanManifestParallelism,
             boolean deletionVectorsEnabled,
             MergeEngine mergeEngine,
-            ChangelogProducer changelogProducer) {
+            ChangelogProducer changelogProducer,
+            boolean fileIndexReadEnabled) {
         super(
                 manifestsReader,
                 snapshotManager,
@@ -85,6 +97,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         this.deletionVectorsEnabled = deletionVectorsEnabled;
         this.mergeEngine = mergeEngine;
         this.changelogProducer = changelogProducer;
+        this.fileIndexReadEnabled = fileIndexReadEnabled;
     }
 
     public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
@@ -118,6 +131,28 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         return true;
     }
 
+    private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, 
ManifestEntry entry) {
+        if (embeddedIndexBytes == null) {
+            return true;
+        }
+
+        RowType dataRowType = 
scanTableSchema(entry.file().schemaId()).logicalRowType();
+
+        Predicate dataPredicate =
+                dataFilterMapping.computeIfAbsent(
+                        entry.file().schemaId(),
+                        id ->
+                                fieldValueStatsConverters.convertFilter(
+                                        entry.file().schemaId(), valueFilter));
+
+        try (FileIndexPredicate predicate =
+                new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
+            return predicate.testPredicate(dataPredicate);
+        } catch (IOException e) {
+            throw new RuntimeException("Exception happens while checking 
predicate.", e);
+        }
+    }
+
     private boolean isValueFilterEnabled(ManifestEntry entry) {
         if (valueFilter == null) {
             return false;
@@ -181,7 +216,12 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                         .getOrCreate(file.schemaId())
                         .evolution(file.valueStats(), file.rowCount(), 
file.valueStatsCols());
         return valueFilter.test(
-                file.rowCount(), result.minValues(), result.maxValues(), 
result.nullCounts());
+                        file.rowCount(),
+                        result.minValues(),
+                        result.maxValues(),
+                        result.nullCounts())
+                && (!fileIndexReadEnabled
+                        || filterByFileIndex(entry.file().embeddedIndex(), 
entry));
     }
 
     private static boolean noOverlapping(List<ManifestEntry> entries) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 1ecfd6f91..dca86aa61 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -672,10 +672,6 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
     @Test
     public void testReadFilter() throws Exception {
         FileStoreTable table = createFileStoreTable();
-        if 
(table.coreOptions().fileFormat().getFormatIdentifier().equals("parquet")) {
-            // TODO support parquet reader filter push down
-            return;
-        }
 
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
@@ -791,6 +787,81 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         innerTestWithShard(table);
     }
 
+    @Test
+    public void testDeletionVectorsWithFileIndexInFile() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(BUCKET, 1);
+                            conf.set(DELETION_VECTORS_ENABLED, true);
+                            conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
+                            conf.set("file-index.bloom-filter.columns", "b");
+                        });
+
+        StreamTableWrite write =
+                table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 1, 300L));
+        write.write(rowData(1, 2, 400L));
+        write.write(rowData(1, 3, 200L));
+        write.write(rowData(1, 4, 500L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 5, 100L));
+        write.write(rowData(1, 6, 600L));
+        write.write(rowData(1, 7, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(2);
+        TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+        assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                
"1|1|300|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|2|400|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|3|200|binary|varbinary|mapKey:mapVal|multiset",
+                                
"1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
+    }
+
+    @Test
+    public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(BUCKET, 1);
+                            conf.set(DELETION_VECTORS_ENABLED, true);
+                            conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
+                            conf.set("file-index.bloom-filter.columns", "b");
+                            conf.set("file-index.bloom-filter.b.items", "20");
+                        });
+
+        StreamTableWrite write =
+                table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 1, 300L));
+        write.write(rowData(1, 2, 400L));
+        write.write(rowData(1, 3, 200L));
+        write.write(rowData(1, 4, 500L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 5, 100L));
+        write.write(rowData(1, 6, 600L));
+        write.write(rowData(1, 7, 400L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+        Predicate predicate = builder.equal(2, 300L);
+
+        List<Split> splits =
+                
toSplits(table.newSnapshotReader().withFilter(predicate).read().dataSplits());
+
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(1);
+    }
+
     @Test
     public void testWithShardFirstRow() throws Exception {
         FileStoreTable table =

Reply via email to