This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d5040b5e9 [core] Enable file index for DV table (#4310)
d5040b5e9 is described below
commit d5040b5e91bfd28ac68376e525b8e9a4b0e8ff5d
Author: YeJunHao <[email protected]>
AuthorDate: Mon Nov 4 10:47:55 2024 +0800
[core] Enable file index for DV table (#4310)
---
.../java/org/apache/paimon/KeyValueFileStore.java | 3 +-
.../java/org/apache/paimon/io/DataFileMeta.java | 37 ++++++++++
.../apache/paimon/io/KeyValueDataFileWriter.java | 34 +++++++++-
.../paimon/io/KeyValueFileWriterFactory.java | 6 +-
.../paimon/operation/KeyValueFileStoreScan.java | 44 +++++++++++-
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 79 ++++++++++++++++++++--
6 files changed, 192 insertions(+), 11 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 2e3aa965f..8f2dbbf5f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -239,7 +239,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
options.scanManifestParallelism(),
options.deletionVectorsEnabled(),
options.mergeEngine(),
- options.changelogProducer());
+ options.changelogProducer(),
+ options.fileIndexReadEnabled() &&
options.deletionVectorsEnabled());
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index bfda80db9..b6cac5ae5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -152,6 +152,43 @@ public class DataFileMeta {
valueStatsCols);
}
+ public DataFileMeta(
+ String fileName,
+ long fileSize,
+ long rowCount,
+ BinaryRow minKey,
+ BinaryRow maxKey,
+ SimpleStats keyStats,
+ SimpleStats valueStats,
+ long minSequenceNumber,
+ long maxSequenceNumber,
+ long schemaId,
+ int level,
+ List<String> extraFiles,
+ @Nullable Long deleteRowCount,
+ @Nullable byte[] embeddedIndex,
+ @Nullable FileSource fileSource,
+ @Nullable List<String> valueStatsCols) {
+ this(
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ keyStats,
+ valueStats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ level,
+ extraFiles,
+
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
+ deleteRowCount,
+ embeddedIndex,
+ fileSource,
+ valueStatsCols);
+ }
+
public DataFileMeta(
String fileName,
long fileSize,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index ba42f8720..ce0b3b028 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.format.SimpleStatsExtractor;
@@ -42,9 +43,12 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.function.Function;
+import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath;
+
/**
* A {@link StatsCollectingSingleFileWriter} to write data files containing
{@link KeyValue}s. Also
* produces {@link DataFileMeta} after writing a file.
@@ -66,6 +70,7 @@ public class KeyValueDataFileWriter
private final SimpleStatsConverter valueStatsConverter;
private final InternalRowSerializer keySerializer;
private final FileSource fileSource;
+ @Nullable private final DataFileIndexWriter dataFileIndexWriter;
private BinaryRow minKey = null;
private InternalRow maxKey = null;
@@ -85,7 +90,8 @@ public class KeyValueDataFileWriter
int level,
String compression,
CoreOptions options,
- FileSource fileSource) {
+ FileSource fileSource,
+ FileIndexOptions fileIndexOptions) {
super(
fileIO,
factory,
@@ -107,12 +113,19 @@ public class KeyValueDataFileWriter
this.valueStatsConverter = new SimpleStatsConverter(valueType,
options.statsDenseStore());
this.keySerializer = new InternalRowSerializer(keyType);
this.fileSource = fileSource;
+ this.dataFileIndexWriter =
+ DataFileIndexWriter.create(
+ fileIO, dataFileToFileIndexPath(path), valueType,
fileIndexOptions);
}
@Override
public void write(KeyValue kv) throws IOException {
super.write(kv);
+ if (dataFileIndexWriter != null) {
+ dataFileIndexWriter.write(kv.value());
+ }
+
updateMinKey(kv);
updateMaxKey(kv);
@@ -165,6 +178,11 @@ public class KeyValueDataFileWriter
Pair<List<String>, SimpleStats> valueStatsPair =
valueStatsConverter.toBinary(valFieldStats);
+ DataFileIndexWriter.FileIndexResult indexResult =
+ dataFileIndexWriter == null
+ ? DataFileIndexWriter.EMPTY_RESULT
+ : dataFileIndexWriter.result();
+
return new DataFileMeta(
path.getName(),
fileIO.getFileSize(path),
@@ -177,10 +195,20 @@ public class KeyValueDataFileWriter
maxSeqNumber,
schemaId,
level,
+ indexResult.independentIndexFile() == null
+ ? Collections.emptyList()
+ :
Collections.singletonList(indexResult.independentIndexFile()),
deleteRecordCount,
- // TODO: enable file filter for primary key table (e.g.
deletion table).
- null,
+ indexResult.embeddedIndexBytes(),
fileSource,
valueStatsPair.getKey());
}
+
+ @Override
+ public void close() throws IOException {
+ if (dataFileIndexWriter != null) {
+ dataFileIndexWriter.close();
+ }
+ super.close();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 922b06ee8..a6fddb432 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueSerializer;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleStatsExtractor;
@@ -52,6 +53,7 @@ public class KeyValueFileWriterFactory {
private final WriteFormatContext formatContext;
private final long suggestedFileSize;
private final CoreOptions options;
+ private final FileIndexOptions fileIndexOptions;
private KeyValueFileWriterFactory(
FileIO fileIO,
@@ -68,6 +70,7 @@ public class KeyValueFileWriterFactory {
this.formatContext = formatContext;
this.suggestedFileSize = suggestedFileSize;
this.options = options;
+ this.fileIndexOptions = options.indexColumnsOptions();
}
public RowType keyType() {
@@ -117,7 +120,8 @@ public class KeyValueFileWriterFactory {
level,
formatContext.compression(level),
options,
- fileSource);
+ fileSource,
+ fileIndexOptions);
}
public void deleteFile(String filename, int level) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 8300bdcfa..b5683fbe0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
@@ -31,11 +32,17 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStatsEvolution;
import org.apache.paimon.stats.SimpleStatsEvolutions;
import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
@@ -54,6 +61,10 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
private final MergeEngine mergeEngine;
private final ChangelogProducer changelogProducer;
+ private final boolean fileIndexReadEnabled;
+ // just cache.
+ private final Map<Long, Predicate> dataFilterMapping = new HashMap<>();
+
public KeyValueFileStoreScan(
ManifestsReader manifestsReader,
BucketSelectConverter bucketSelectConverter,
@@ -65,7 +76,8 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
Integer scanManifestParallelism,
boolean deletionVectorsEnabled,
MergeEngine mergeEngine,
- ChangelogProducer changelogProducer) {
+ ChangelogProducer changelogProducer,
+ boolean fileIndexReadEnabled) {
super(
manifestsReader,
snapshotManager,
@@ -85,6 +97,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
this.deletionVectorsEnabled = deletionVectorsEnabled;
this.mergeEngine = mergeEngine;
this.changelogProducer = changelogProducer;
+ this.fileIndexReadEnabled = fileIndexReadEnabled;
}
public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
@@ -118,6 +131,28 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
return true;
}
+ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes,
ManifestEntry entry) {
+ if (embeddedIndexBytes == null) {
+ return true;
+ }
+
+ RowType dataRowType =
scanTableSchema(entry.file().schemaId()).logicalRowType();
+
+ Predicate dataPredicate =
+ dataFilterMapping.computeIfAbsent(
+ entry.file().schemaId(),
+ id ->
+ fieldValueStatsConverters.convertFilter(
+ entry.file().schemaId(), valueFilter));
+
+ try (FileIndexPredicate predicate =
+ new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
+ return predicate.testPredicate(dataPredicate);
+ } catch (IOException e) {
+ throw new RuntimeException("Exception happens while checking
predicate.", e);
+ }
+ }
+
private boolean isValueFilterEnabled(ManifestEntry entry) {
if (valueFilter == null) {
return false;
@@ -181,7 +216,12 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
.getOrCreate(file.schemaId())
.evolution(file.valueStats(), file.rowCount(),
file.valueStatsCols());
return valueFilter.test(
- file.rowCount(), result.minValues(), result.maxValues(),
result.nullCounts());
+ file.rowCount(),
+ result.minValues(),
+ result.maxValues(),
+ result.nullCounts())
+ && (!fileIndexReadEnabled
+ || filterByFileIndex(entry.file().embeddedIndex(),
entry));
}
private static boolean noOverlapping(List<ManifestEntry> entries) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 1ecfd6f91..dca86aa61 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -672,10 +672,6 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
@Test
public void testReadFilter() throws Exception {
FileStoreTable table = createFileStoreTable();
- if
(table.coreOptions().fileFormat().getFormatIdentifier().equals("parquet")) {
- // TODO support parquet reader filter push down
- return;
- }
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
@@ -791,6 +787,81 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
innerTestWithShard(table);
}
+ @Test
+ public void testDeletionVectorsWithFileIndexInFile() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 1);
+ conf.set(DELETION_VECTORS_ENABLED, true);
+ conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
+ conf.set("file-index.bloom-filter.columns", "b");
+ });
+
+ StreamTableWrite write =
+ table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 1, 300L));
+ write.write(rowData(1, 2, 400L));
+ write.write(rowData(1, 3, 200L));
+ write.write(rowData(1, 4, 500L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 5, 100L));
+ write.write(rowData(1, 6, 600L));
+ write.write(rowData(1, 7, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(2);
+ TableRead read = table.newRead().withFilter(builder.equal(2, 300L));
+ assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
+ .hasSameElementsAs(
+ Arrays.asList(
+
"1|1|300|binary|varbinary|mapKey:mapVal|multiset",
+
"1|2|400|binary|varbinary|mapKey:mapVal|multiset",
+
"1|3|200|binary|varbinary|mapKey:mapVal|multiset",
+
"1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
+ }
+
+ @Test
+ public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 1);
+ conf.set(DELETION_VECTORS_ENABLED, true);
+ conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1));
+ conf.set("file-index.bloom-filter.columns", "b");
+ conf.set("file-index.bloom-filter.b.items", "20");
+ });
+
+ StreamTableWrite write =
+ table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 1, 300L));
+ write.write(rowData(1, 2, 400L));
+ write.write(rowData(1, 3, 200L));
+ write.write(rowData(1, 4, 500L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 5, 100L));
+ write.write(rowData(1, 6, 600L));
+ write.write(rowData(1, 7, 400L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+ Predicate predicate = builder.equal(2, 300L);
+
+ List<Split> splits =
+
toSplits(table.newSnapshotReader().withFilter(predicate).read().dataSplits());
+
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(1);
+ }
+
@Test
public void testWithShardFirstRow() throws Exception {
FileStoreTable table =