This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ada017e7dc [iceberg] support deletion vector in Iceberg compatibility
(#5670)
ada017e7dc is described below
commit ada017e7dc49d17364dc9ee5cbd83ca331d4d44a
Author: LsomeYeah <[email protected]>
AuthorDate: Fri May 30 13:07:28 2025 +0800
[iceberg] support deletion vector in Iceberg compatibility (#5670)
---
.github/workflows/check-licensing.yml | 2 +-
.github/workflows/utitcase-jdk11.yml | 14 +-
.github/workflows/utitcase-spark-3.x.yml | 2 +-
.github/workflows/utitcase.yml | 4 +-
docs/content/migration/iceberg-compatibility.md | 122 ++++++++
.../generated/iceberg_configuration.html | 6 +
.../paimon/iceberg/IcebergCommitCallback.java | 301 ++++++++++++++++++--
.../org/apache/paimon/iceberg/IcebergOptions.java | 8 +
.../iceberg/manifest/IcebergDataFileMeta.java | 88 +++++-
.../manifest/IcebergDataFileMetaSerializer.java | 10 +-
.../iceberg/manifest/IcebergManifestFile.java | 20 +-
.../paimon/iceberg/metadata/IcebergMetadata.java | 6 +-
.../org/apache/paimon/index/IndexFileHandler.java | 5 +-
.../metastore/AddPartitionCommitCallback.java | 6 +-
.../paimon/metastore/TagPreviewCommitCallback.java | 6 +-
.../paimon/operation/FileStoreCommitImpl.java | 2 +-
.../apache/paimon/table/sink/CommitCallback.java | 6 +-
.../paimon/iceberg/IcebergCompatibilityTest.java | 50 ++++
.../apache/paimon/table/sink/TableCommitTest.java | 6 +-
paimon-iceberg/pom.xml | 187 ++++++++++++
.../paimon/core/IcebergDVCompatibilityTest.java | 314 +++++++++++++++++++++
.../apache/paimon/flink/FlinkIcebergITCase.java | 128 +++++++++
.../org/apache/paimon/spark/PaimonCommitTest.scala | 8 +-
pom.xml | 1 +
24 files changed, 1239 insertions(+), 63 deletions(-)
diff --git a/.github/workflows/check-licensing.yml
b/.github/workflows/check-licensing.yml
index 101d331e90..ab72928779 100644
--- a/.github/workflows/check-licensing.yml
+++ b/.github/workflows/check-licensing.yml
@@ -41,7 +41,7 @@ jobs:
run: |
set -o pipefail
- mvn clean deploy ${{ env.MVN_COMMON_OPTIONS }} -DskipTests \
+ mvn clean deploy ${{ env.MVN_COMMON_OPTIONS }} -DskipTests -pl
"!paimon-iceberg" \
-DaltDeploymentRepository=validation_repository::default::file:${{
env.MVN_VALIDATION_DIR }} \
| tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
diff --git a/.github/workflows/utitcase-jdk11.yml
b/.github/workflows/utitcase-jdk11.yml
index e640f0921d..26105a3b01 100644
--- a/.github/workflows/utitcase-jdk11.yml
+++ b/.github/workflows/utitcase-jdk11.yml
@@ -19,21 +19,17 @@
name: UTCase and ITCase Others on JDK 11
on:
- issue_comment:
- types: [created, edited, deleted]
-
- # daily run
- schedule:
- - cron: "0 0 * * *"
+ push:
+ pull_request:
+ paths-ignore:
+ - 'docs/**'
+ - '**/*.md'
env:
JDK_VERSION: 11
jobs:
build:
- if: |
- github.event_name == 'schedule' ||
- (contains(github.event.comment.html_url, '/pull/') &&
contains(github.event.comment.body, '/jdk11'))
runs-on: ubuntu-latest
steps:
diff --git a/.github/workflows/utitcase-spark-3.x.yml
b/.github/workflows/utitcase-spark-3.x.yml
index d06554108a..06ca1d88d0 100644
--- a/.github/workflows/utitcase-spark-3.x.yml
+++ b/.github/workflows/utitcase-spark-3.x.yml
@@ -45,7 +45,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'temurin'
- name: Build Spark
- run: mvn -T 2C -B clean install -DskipTests
+ run: mvn -T 2C -B clean install -DskipTests -pl "!paimon-iceberg"
- name: Test Spark
timeout-minutes: 60
run: |
diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml
index 012e6a68c8..1a1f927806 100644
--- a/.github/workflows/utitcase.yml
+++ b/.github/workflows/utitcase.yml
@@ -49,7 +49,7 @@ jobs:
- name: Build Others
run: |
echo "Start compiling modules"
- mvn -T 2C -B clean install -DskipTests -Pflink1,spark3
+ mvn -T 2C -B clean install -DskipTests -Pflink1,spark3 -pl
"!paimon-iceberg"
- name: Test Others
timeout-minutes: 60
@@ -58,7 +58,7 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- TEST_MODULES="!paimon-e2e-tests,"
+ TEST_MODULES="!paimon-e2e-tests,!paimon-iceberg,"
for suffix in 3.5 3.4 3.3 3.2 ut; do
TEST_MODULES+="!org.apache.paimon:paimon-spark-${suffix},"
done
diff --git a/docs/content/migration/iceberg-compatibility.md
b/docs/content/migration/iceberg-compatibility.md
index f624b6ff54..12d34c8dc6 100644
--- a/docs/content/migration/iceberg-compatibility.md
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -348,6 +348,128 @@ You can configure the following table option, so that
Paimon is forced to perfor
Note that full compaction is a resource-consuming process, so the value of
this table option should not be too small.
We recommend full compaction to be performed once or twice per hour.
+## Deletion Vector Support
+
+[Deletion vectors]({{< ref "concepts/spec/tableindex#deletion-vectors" >}}) in
Paimon are used to store deleted records for each file.
+Under deletion-vector mode, paimon readers can directly filter out unnecessary
records during reading phase without merging data.
+Fortunately, Iceberg has supported [deletion
vectors](https://iceberg.apache.org/spec/?h=deletion#deletion-vectors) in
[Version 3](https://iceberg.apache.org/spec/?h=deletion#version-3).
+This means that if the Iceberg reader can recognize Paimon's deletion vectors,
it will be able to read all of Paimon's data, even without the ability to merge
data files.
+With Paimon's deletion vectors synchronized to Iceberg, Iceberg reader and
Paimon reader can achieve true real-time synchronization.
+
+
+If the following conditions are met, it will construct metadata about Paimon's
deletion vectors for Iceberg.
+* '`deletion-vectors.enabled`' and '`deletion-vectors.bitmap64`' should be set
to true. Because only 64-bit bitmap implementation of deletion vector in Paimon
is compatible with Iceberg.
+* '`metadata.iceberg.format-version`'(default value is 2) should be set to 3.
Because Iceberg only supports deletion vector in V3.
+* Version of Iceberg should be 1.8.0+.
+* JDK version should be 11+. Iceberg has stopped supporting JDK 8 since
version 1.7.0.
+
+Here is an example:
+{{< tabs "deletion-vector-table" >}}
+
+{{< tab "Flink SQL" >}}
+```sql
+-- flink version: 1.20.1
+
+CREATE CATALOG paimon_catalog WITH (
+ 'type' = 'paimon',
+ 'warehouse' = '<path-to-warehouse>'
+);
+
+-- Create a paimon table with primary key and enable deletion vector
+CREATE TABLE paimon_catalog.`default`.T
+(
+ pt INT
+ ,k INT
+ ,v INT
+ ,PRIMARY KEY (pt, k) NOT ENFORCED
+)PARTITIONED BY (pt)
+WITH (
+ 'metadata.iceberg.storage' = 'hadoop-catalog'
+ ,'metadata.iceberg.format-version' = '3'
+ ,'deletion-vectors.enabled' = 'true'
+ ,'deletion-vectors.bitmap64' = 'true'
+);
+
+INSERT INTO paimon_catalog.`default`.T
+VALUES (1, 9, 90), (1, 10, 100), (1, 11, 110), (2, 20, 200)
+;
+
+-- iceberg version: 1.8.1
+CREATE CATALOG iceberg_catalog WITH (
+ 'type' = 'iceberg',
+ 'catalog-type' = 'hadoop',
+ 'warehouse' = '<path-to-warehouse>/iceberg',
+ 'cache-enabled' = 'false' -- disable iceberg catalog caching to quickly
see the result
+);
+
+SELECT * FROM iceberg_catalog.`default`.T;
+/*
++------------+------------+------------+
+| pt | k | v |
++------------+------------+------------+
+| 2 | 20 | 200 |
+| 1 | 9 | 90 |
+| 1 | 10 | 100 |
+| 1 | 11 | 110 |
++------------+------------+------------+
+*/
+
+-- insert some data again, this will generate deletion vectors
+INSERT INTO paimon_catalog.`default`.T
+VALUES (1, 10, 101), (2, 20, 201), (1, 12, 121)
+;
+
+-- select deletion-vector index in paimon
+SELECT * FROM paimon_catalog.`default`.`T$table_indexes` WHERE
index_type='DELETION_VECTORS';
+/*
++------------+-----------+-------------------+------------------------
-----+------------+------------+--------------------------------+
+| partition | bucket | index_type | file_name
| file_size | row_count | dv_ranges |
++------------+-----------+-------------------+------------------------
-----+------------+------------+--------------------------------+
+| {1} | 0 | DELETION_VECTORS | index-4ae44c5d-2fc6-40b0-9ff0~
| 43 | 1 | [(data-968fdf3a-2f44-41df-89b~ |
++------------+-----------+-------------------+------------------------
-----+------------+------------+--------------------------------+
+*/
+
+-- select in iceberg, the updates was successfully read by iceberg
+SELECT * FROM iceberg_catalog.`default`.T;
+/*
++------------+------------+------------+
+| pt | k | v |
++------------+------------+------------+
+| 1 | 9 | 90 |
+| 1 | 11 | 110 |
+| 2 | 20 | 201 |
+| 1 | 10 | 101 |
+| 1 | 12 | 121 |
++------------+------------+------------+
+*/
+
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+{{< hint info >}}
+
+note1: Upgrade the implementation of deletion vector to 64-bit bitmap if
necessary.
+
+{{< /hint >}}
+
+If your paimon table has already been in deletion-vector mode, but 32-bit
bitmap was used for deletion vector.
+You need to upgrade the implementation of deletion vector to 64-bit bitmap if
you want to synchronize deletion-vector metadata to iceberg.
+You can follow the following steps to upgrade to 64-bit deletion-vector:
+1. stop all the writing jobs of your paimon table.
+2. perform a [full compaction]({{< ref
"maintenance/dedicated-compaction#dedicated-compaction-job" >}}) to your paimon
table.
+3. run `ALTER TABLE tableName SET ('deletion-vectors.bitmap64' = 'true')` to
upgrade to 64-bit deletion vector.
+4. restart your writing job. If meeting the all the conditions mentioned
above, deletion vector metadata will be synchronized to iceberg.
+
+{{< hint info >}}
+
+note2: Upgrade the format version of iceberg to 3 if necessary.
+
+{{< /hint >}}
+You can upgrade the format version of iceberg from 2 to 3 by setting
`'metadata.iceberg.format-version' = '3'`.
+This will recreate the iceberg metadata without using the base metadata.
+
## Hive Catalog
When creating Paimon table, set `'metadata.iceberg.storage' = 'hive-catalog'`.
diff --git a/docs/layouts/shortcodes/generated/iceberg_configuration.html
b/docs/layouts/shortcodes/generated/iceberg_configuration.html
index 7eef90574c..634f622d71 100644
--- a/docs/layouts/shortcodes/generated/iceberg_configuration.html
+++ b/docs/layouts/shortcodes/generated/iceberg_configuration.html
@@ -50,6 +50,12 @@ under the License.
<td>Boolean</td>
<td>Whether to delete old metadata files after each table
commit</td>
</tr>
+ <tr>
+ <td><h5>metadata.iceberg.format-version</h5></td>
+ <td style="word-wrap: break-word;">2</td>
+ <td>Integer</td>
+ <td>The format version of iceberg table, the value can be 2 or 3.
Note that only version 3 supports deletion vector.</td>
+ </tr>
<tr>
<td><h5>metadata.iceberg.glue.skip-archive</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index dc45fbe4f5..053e6206f1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -41,8 +41,12 @@ import org.apache.paimon.iceberg.metadata.IcebergRef;
import org.apache.paimon.iceberg.metadata.IcebergSchema;
import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
@@ -52,6 +56,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
@@ -61,6 +66,7 @@ import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -72,6 +78,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -83,6 +90,9 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
/**
* A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg
readers can read
@@ -95,6 +105,8 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
// see org.apache.iceberg.hadoop.Util
private static final String VERSION_HINT_FILENAME = "version-hint.text";
+ private static final String PUFFIN_FORMAT = "puffin";
+
private final FileStoreTable table;
private final String commitUser;
@@ -104,6 +116,10 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
private final FileStorePathFactory fileStorePathFactory;
private final IcebergManifestFile manifestFile;
private final IcebergManifestList manifestList;
+ private final int formatVersion;
+
+ private final IndexFileHandler indexFileHandler;
+ private final boolean needAddDvToIceberg;
//
-------------------------------------------------------------------------------------
// Public interface
@@ -133,6 +149,17 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
this.fileStorePathFactory = table.store().pathFactory();
this.manifestFile = IcebergManifestFile.create(table, pathFactory);
this.manifestList = IcebergManifestList.create(table, pathFactory);
+
+ this.formatVersion =
+
table.coreOptions().toConfiguration().get(IcebergOptions.FORMAT_VERSION);
+ Preconditions.checkArgument(
+ formatVersion == IcebergMetadata.FORMAT_VERSION_V2
+ || formatVersion == IcebergMetadata.FORMAT_VERSION_V3,
+ "Unsupported iceberg format version! Only version 2 or version
3 is valid, but current version is ",
+ formatVersion);
+
+ this.indexFileHandler = table.store().newIndexFileHandler();
+ this.needAddDvToIceberg = needAddDvToIceberg();
}
public static Path catalogTableMetadataPath(FileStoreTable table) {
@@ -192,23 +219,26 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
public void close() throws Exception {}
@Override
- public void call(List<ManifestEntry> committedEntries, Snapshot snapshot) {
+ public void call(
+ List<ManifestEntry> committedEntries,
+ List<IndexManifestEntry> indexFiles,
+ Snapshot snapshot) {
createMetadata(
- snapshot.id(),
+ snapshot,
(removedFiles, addedFiles) ->
- collectFileChanges(committedEntries, removedFiles,
addedFiles));
+ collectFileChanges(committedEntries, removedFiles,
addedFiles),
+ indexFiles);
}
@Override
public void retry(ManifestCommittable committable) {
SnapshotManager snapshotManager = table.snapshotManager();
- long snapshotId =
+ Snapshot snapshot =
snapshotManager
.findSnapshotsForIdentifiers(
commitUser,
Collections.singletonList(committable.identifier()))
.stream()
- .mapToLong(Snapshot::id)
- .max()
+ .max(Comparator.comparingLong(Snapshot::id))
.orElseThrow(
() ->
new RuntimeException(
@@ -217,13 +247,19 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
+ " and identifier "
+
committable.identifier()
+ ". This is
unexpected."));
+ long snapshotId = snapshot.id();
createMetadata(
- snapshotId,
+ snapshot,
(removedFiles, addedFiles) ->
- collectFileChanges(snapshotId, removedFiles,
addedFiles));
+ collectFileChanges(snapshotId, removedFiles,
addedFiles),
+ indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX));
}
- private void createMetadata(long snapshotId, FileChangesCollector
fileChangesCollector) {
+ private void createMetadata(
+ Snapshot snapshot,
+ FileChangesCollector fileChangesCollector,
+ List<IndexManifestEntry> indexFiles) {
+ long snapshotId = snapshot.id();
try {
if (snapshotId == Snapshot.FIRST_SNAPSHOT_ID) {
// If Iceberg metadata is stored separately in another
directory, dropping the table
@@ -237,8 +273,19 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
}
Path baseMetadataPath = pathFactory.toMetadataPath(snapshotId - 1);
+
if (table.fileIO().exists(baseMetadataPath)) {
- createMetadataWithBase(fileChangesCollector, snapshotId,
baseMetadataPath);
+ createMetadataWithBase(
+ fileChangesCollector,
+ indexFiles.stream()
+ .filter(
+ index ->
+ index.indexFile()
+ .indexType()
+
.equals(DELETION_VECTORS_INDEX))
+ .collect(Collectors.toList()),
+ snapshot,
+ baseMetadataPath);
} else {
createMetadataWithoutBase(snapshotId);
}
@@ -254,16 +301,31 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
private void createMetadataWithoutBase(long snapshotId) throws IOException
{
SnapshotReader snapshotReader =
table.newSnapshotReader().withSnapshot(snapshotId);
SchemaCache schemaCache = new SchemaCache();
- Iterator<IcebergManifestEntry> entryIterator =
+ List<IcebergManifestEntry> dataFileEntries = new ArrayList<>();
+ List<IcebergManifestEntry> dvFileEntries = new ArrayList<>();
+
+ List<DataSplit> filteredDataSplits =
snapshotReader.read().dataSplits().stream()
.filter(DataSplit::rawConvertible)
- .flatMap(
- s ->
- dataSplitToManifestEntries(s,
snapshotId, schemaCache)
- .stream())
- .iterator();
- List<IcebergManifestFileMeta> manifestFileMetas =
- manifestFile.rollingWrite(entryIterator, snapshotId);
+ .collect(Collectors.toList());
+ for (DataSplit dataSplit : filteredDataSplits) {
+ dataSplitToManifestEntries(
+ dataSplit, snapshotId, schemaCache, dataFileEntries,
dvFileEntries);
+ }
+
+ List<IcebergManifestFileMeta> manifestFileMetas = new ArrayList<>();
+ if (!dataFileEntries.isEmpty()) {
+ manifestFileMetas.addAll(
+ manifestFile.rollingWrite(dataFileEntries.iterator(),
snapshotId));
+ }
+ if (!dvFileEntries.isEmpty()) {
+ manifestFileMetas.addAll(
+ manifestFile.rollingWrite(
+ dvFileEntries.iterator(),
+ snapshotId,
+ IcebergManifestFileMeta.Content.DELETES));
+ }
+
String manifestListFileName =
manifestList.writeWithoutRolling(manifestFileMetas);
int schemaId = (int) table.schema().id();
@@ -289,6 +351,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
String tableUuid = UUID.randomUUID().toString();
IcebergMetadata metadata =
new IcebergMetadata(
+ formatVersion,
tableUuid,
table.location().toString(),
snapshotId,
@@ -320,10 +383,14 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
}
}
- private List<IcebergManifestEntry> dataSplitToManifestEntries(
- DataSplit dataSplit, long snapshotId, SchemaCache schemaCache) {
- List<IcebergManifestEntry> result = new ArrayList<>();
+ private void dataSplitToManifestEntries(
+ DataSplit dataSplit,
+ long snapshotId,
+ SchemaCache schemaCache,
+ List<IcebergManifestEntry> dataFileEntries,
+ List<IcebergManifestEntry> dvFileEntries) {
List<RawFile> rawFiles = dataSplit.convertToRawFiles().get();
+
for (int i = 0; i < dataSplit.dataFiles().size(); i++) {
DataFileMeta paimonFileMeta = dataSplit.dataFiles().get(i);
RawFile rawFile = rawFiles.get(i);
@@ -338,15 +405,52 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
schemaCache.get(paimonFileMeta.schemaId()),
paimonFileMeta.valueStats(),
paimonFileMeta.valueStatsCols());
- result.add(
+ dataFileEntries.add(
new IcebergManifestEntry(
IcebergManifestEntry.Status.ADDED,
snapshotId,
snapshotId,
snapshotId,
fileMeta));
+
+ if (needAddDvToIceberg
+ && dataSplit.deletionFiles().isPresent()
+ && dataSplit.deletionFiles().get().get(i) != null) {
+ DeletionFile deletionFile =
dataSplit.deletionFiles().get().get(i);
+
+ // Iceberg will check the cardinality between deserialized dv
and iceberg deletion
+ // file, so if deletionFile.cardinality() is null, we should
stop synchronizing all
+ // dvs.
+ Preconditions.checkState(
+ deletionFile.cardinality() != null,
+ "cardinality in DeletionFile is null, stop generating
dv for iceberg. "
+ + "dataFile path is {}, deletionFile is {}",
+ rawFile.path(),
+ deletionFile);
+
+ // We can not get the file size of the complete DV index file
from the DeletionFile,
+ // so we set 'fileSizeInBytes' to -1(default in iceberg)
+ IcebergDataFileMeta deleteFileMeta =
+ IcebergDataFileMeta.createForDeleteFile(
+ IcebergDataFileMeta.Content.POSITION_DELETES,
+ deletionFile.path(),
+ PUFFIN_FORMAT,
+ dataSplit.partition(),
+ deletionFile.cardinality(),
+ -1,
+ rawFile.path(),
+ deletionFile.offset(),
+ deletionFile.length());
+
+ dvFileEntries.add(
+ new IcebergManifestEntry(
+ IcebergManifestEntry.Status.ADDED,
+ snapshotId,
+ snapshotId,
+ snapshotId,
+ deleteFileMeta));
+ }
}
- return result;
}
private List<IcebergPartitionField> getPartitionFields(
@@ -370,12 +474,35 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
//
-------------------------------------------------------------------------------------
private void createMetadataWithBase(
- FileChangesCollector fileChangesCollector, long snapshotId, Path
baseMetadataPath)
+ FileChangesCollector fileChangesCollector,
+ List<IndexManifestEntry> indexFiles,
+ Snapshot snapshot,
+ Path baseMetadataPath)
throws IOException {
+ long snapshotId = snapshot.id();
IcebergMetadata baseMetadata =
IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath);
+
+ if (!isSameFormatVersion(baseMetadata.formatVersion())) {
+ // we need to recreate iceberg metadata if format version changed
+ createMetadataWithoutBase(snapshot.id());
+ return;
+ }
+
List<IcebergManifestFileMeta> baseManifestFileMetas =
manifestList.read(baseMetadata.currentSnapshot().manifestList());
+ // base manifest file for data files
+ List<IcebergManifestFileMeta> baseDataManifestFileMetas =
+ baseManifestFileMetas.stream()
+ .filter(meta -> meta.content() ==
IcebergManifestFileMeta.Content.DATA)
+ .collect(Collectors.toList());
+
+ // base manifest file for deletion vector index files
+ List<IcebergManifestFileMeta> baseDVManifestFileMetas =
+ baseManifestFileMetas.stream()
+ .filter(meta -> meta.content() ==
IcebergManifestFileMeta.Content.DELETES)
+ .collect(Collectors.toList());
+
Map<String, BinaryRow> removedFiles = new LinkedHashMap<>();
Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles = new
LinkedHashMap<>();
boolean isAddOnly = fileChangesCollector.collect(removedFiles,
addedFiles);
@@ -388,13 +515,14 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
// because if a file's level is changed, it will first be removed and
then added.
// In this case, if `baseMetadata` already contains this file, we
should not add a
// duplicate.
- List<IcebergManifestFileMeta> newManifestFileMetas;
+ List<IcebergManifestFileMeta> newDataManifestFileMetas;
IcebergSnapshotSummary snapshotSummary;
if (isAddOnly) {
// Fast case. We don't need to remove files from `baseMetadata`.
We only need to append
// new metadata files.
- newManifestFileMetas = new ArrayList<>(baseManifestFileMetas);
-
newManifestFileMetas.addAll(createNewlyAddedManifestFileMetas(addedFiles,
snapshotId));
+ newDataManifestFileMetas = new
ArrayList<>(baseDataManifestFileMetas);
+ newDataManifestFileMetas.addAll(
+ createNewlyAddedManifestFileMetas(addedFiles, snapshotId));
snapshotSummary = IcebergSnapshotSummary.APPEND;
} else {
Pair<List<IcebergManifestFileMeta>, IcebergSnapshotSummary> result
=
@@ -402,14 +530,32 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
removedFiles,
addedFiles,
modifiedPartitions,
- baseManifestFileMetas,
+ baseDataManifestFileMetas,
snapshotId);
- newManifestFileMetas = result.getLeft();
+ newDataManifestFileMetas = result.getLeft();
snapshotSummary = result.getRight();
}
+
+ List<IcebergManifestFileMeta> newDVManifestFileMetas = new
ArrayList<>();
+ if (needAddDvToIceberg) {
+ if (!indexFiles.isEmpty()) {
+ // reconstruct the dv index
+
newDVManifestFileMetas.addAll(createDvManifestFileMetas(snapshot));
+ } else {
+ // no new dv index, reuse the old one
+ newDVManifestFileMetas.addAll(baseDVManifestFileMetas);
+ }
+ }
+
+ // compact data manifest file if needed
+ newDataManifestFileMetas =
compactMetadataIfNeeded(newDataManifestFileMetas, snapshotId);
+
String manifestListFileName =
manifestList.writeWithoutRolling(
- compactMetadataIfNeeded(newManifestFileMetas,
snapshotId));
+ Stream.concat(
+ newDataManifestFileMetas.stream(),
+ newDVManifestFileMetas.stream())
+ .collect(Collectors.toList()));
// add new schema if needed
SchemaCache schemaCache = new SchemaCache();
@@ -445,6 +591,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
IcebergMetadata metadata =
new IcebergMetadata(
+ baseMetadata.formatVersion(),
baseMetadata.tableUuid(),
baseMetadata.location(),
snapshotId,
@@ -532,6 +679,9 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
if (table.primaryKeys().isEmpty()) {
return true;
} else {
+ if (needAddDvToIceberg) {
+ return meta.level() > 0;
+ }
int maxLevel = table.coreOptions().numLevels() - 1;
return meta.level() == maxLevel;
}
@@ -849,6 +999,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
IcebergMetadata metadata =
new IcebergMetadata(
+ baseMetadata.formatVersion(),
baseMetadata.tableUuid(),
baseMetadata.location(),
baseMetadata.currentSnapshotId(),
@@ -906,6 +1057,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
IcebergMetadata metadata =
new IcebergMetadata(
+ baseMetadata.formatVersion(),
baseMetadata.tableUuid(),
baseMetadata.location(),
baseMetadata.currentSnapshotId(),
@@ -935,10 +1087,99 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
}
}
+ //
-------------------------------------------------------------------------------------
+ // Deletion vectors
+ //
-------------------------------------------------------------------------------------
+
+ private boolean needAddDvToIceberg() {
+ CoreOptions options = table.coreOptions();
+ // there may be dv indexes using bitmap32 in index files even if
'deletion-vectors.bitmap64'
+ // is true, but analyzing all deletion vectors is very costly, so we
do not check exactly
+ // currently.
+ return options.deletionVectorsEnabled()
+ && options.deletionVectorBitmap64()
+ && formatVersion == IcebergMetadata.FORMAT_VERSION_V3;
+ }
+
+ private List<IcebergManifestFileMeta> createDvManifestFileMetas(Snapshot
snapshot) {
+ List<IcebergManifestEntry> icebergDvEntries = new ArrayList<>();
+
+ long snapshotId = snapshot.id();
+ List<IndexManifestEntry> newIndexes =
+ indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX);
+ if (newIndexes.isEmpty()) {
+ return Collections.emptyList();
+ }
+ for (IndexManifestEntry entry : newIndexes) {
+ IndexFileMeta indexFileMeta = entry.indexFile();
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas =
indexFileMeta.deletionVectorMetas();
+ Path bucketPath =
fileStorePathFactory.bucketPath(entry.partition(), entry.bucket());
+ if (dvMetas != null) {
+ for (DeletionVectorMeta dvMeta : dvMetas.values()) {
+
+ // Iceberg will check the cardinality between deserialized
dv and iceberg
+ // deletion file, so if deletionFile.cardinality() is
null, we should stop
+ // synchronizing all dvs.
+ Preconditions.checkState(
+ dvMeta.cardinality() != null,
+ "cardinality in DeletionVector is null, stop
generate dv for iceberg. "
+ + "dataFile path is {}, indexFile path is
{}",
+ new Path(bucketPath, dvMeta.dataFileName()),
+
indexFileHandler.filePath(indexFileMeta).toString());
+
+ IcebergDataFileMeta deleteFileMeta =
+ IcebergDataFileMeta.createForDeleteFile(
+
IcebergDataFileMeta.Content.POSITION_DELETES,
+
indexFileHandler.filePath(indexFileMeta).toString(),
+ PUFFIN_FORMAT,
+ entry.partition(),
+ dvMeta.cardinality(),
+ indexFileMeta.fileSize(),
+ new Path(bucketPath,
dvMeta.dataFileName()).toString(),
+ (long) dvMeta.offset(),
+ (long) dvMeta.length());
+
+ icebergDvEntries.add(
+ new IcebergManifestEntry(
+ IcebergManifestEntry.Status.ADDED,
+ snapshotId,
+ snapshotId,
+ snapshotId,
+ deleteFileMeta));
+ }
+ }
+ }
+
+ if (icebergDvEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ return manifestFile.rollingWrite(
+ icebergDvEntries.iterator(), snapshotId,
IcebergManifestFileMeta.Content.DELETES);
+ }
+
//
-------------------------------------------------------------------------------------
// Utils
//
-------------------------------------------------------------------------------------
+ private boolean isSameFormatVersion(int baseFormatVersion) {
+ if (baseFormatVersion != formatVersion) {
+ Preconditions.checkArgument(
+ formatVersion > baseFormatVersion,
+ "format version in base metadata is {}, and it's bigger
than the current format version {}, "
+ + "this is not allowed!");
+
+ LOG.info(
+ "format version in base metadata is {}, and it's different
from the current format version {}. "
+ + "New metadata will be recreated using format
version {}.",
+ baseFormatVersion,
+ formatVersion,
+ formatVersion);
+ return false;
+ }
+ return true;
+ }
+
private class SchemaCache {
SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index e5166b9394..a817364973 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -44,6 +44,14 @@ public class IcebergOptions {
.withDescription(
"To store Iceberg metadata in a separate directory
or under table location");
+ public static final ConfigOption<Integer> FORMAT_VERSION =
+ ConfigOptions.key("metadata.iceberg.format-version")
+ .intType()
+ .defaultValue(2)
+ .withDescription(
+ "The format version of iceberg table, the value
can be 2 or 3. "
+ + "Note that only version 3 supports
deletion vector.");
+
public static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
.intType()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
index 590359aba3..9167497257 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
@@ -83,6 +83,10 @@ public class IcebergDataFileMeta {
private final InternalMap lowerBounds;
private final InternalMap upperBounds;
+ private final String referencedDataFile;
+ private final Long contentOffset;
+ private final Long contentSizeInBytes;
+
// only used for iceberg migrate
private long schemaId = 0;
@@ -96,6 +100,34 @@ public class IcebergDataFileMeta {
InternalMap nullValueCounts,
InternalMap lowerBounds,
InternalMap upperBounds) {
+ this(
+ content,
+ filePath,
+ fileFormat,
+ partition,
+ recordCount,
+ fileSizeInBytes,
+ nullValueCounts,
+ lowerBounds,
+ upperBounds,
+ null,
+ null,
+ null);
+ }
+
+ IcebergDataFileMeta(
+ Content content,
+ String filePath,
+ String fileFormat,
+ BinaryRow partition,
+ long recordCount,
+ long fileSizeInBytes,
+ InternalMap nullValueCounts,
+ InternalMap lowerBounds,
+ InternalMap upperBounds,
+ String referencedDataFile,
+ Long contentOffset,
+ Long contentSizeInBytes) {
this.content = content;
this.filePath = filePath;
this.fileFormat = fileFormat;
@@ -105,6 +137,10 @@ public class IcebergDataFileMeta {
this.nullValueCounts = nullValueCounts;
this.lowerBounds = lowerBounds;
this.upperBounds = upperBounds;
+
+ this.referencedDataFile = referencedDataFile;
+ this.contentOffset = contentOffset;
+ this.contentSizeInBytes = contentSizeInBytes;
}
public static IcebergDataFileMeta create(
@@ -168,6 +204,33 @@ public class IcebergDataFileMeta {
new GenericMap(upperBounds));
}
+ public static IcebergDataFileMeta createForDeleteFile(
+ Content content,
+ String filePath,
+ String fileFormat,
+ BinaryRow partition,
+ long recordCount,
+ long fileSizeInBytes,
+ String referencedDataFile,
+ Long contentOffset,
+ Long contentSizeInBytes) {
+
+ // reference org.apache.iceberg.deletes.BaseDVFileWriter#createDV
+ return new IcebergDataFileMeta(
+ content,
+ filePath,
+ fileFormat,
+ partition,
+ recordCount,
+ fileSizeInBytes,
+ null,
+ null,
+ null,
+ referencedDataFile,
+ contentOffset,
+ contentSizeInBytes);
+ }
+
public Content content() {
return content;
}
@@ -204,6 +267,18 @@ public class IcebergDataFileMeta {
return upperBounds;
}
+ public String referencedDataFile() {
+ return referencedDataFile;
+ }
+
+ public Long contentOffset() {
+ return contentOffset;
+ }
+
+ public Long contentSizeInBytes() {
+ return contentSizeInBytes;
+ }
+
public long schemaId() {
return schemaId;
}
@@ -236,6 +311,9 @@ public class IcebergDataFileMeta {
128,
"upper_bounds",
DataTypes.MAP(DataTypes.INT().notNull(),
DataTypes.BYTES().notNull())));
+ fields.add(new DataField(143, "referenced_data_file",
DataTypes.STRING()));
+ fields.add(new DataField(144, "content_offset", DataTypes.BIGINT()));
+ fields.add(new DataField(145, "content_size_in_bytes",
DataTypes.BIGINT()));
return new RowType(false, fields);
}
@@ -256,7 +334,10 @@ public class IcebergDataFileMeta {
&& Objects.equals(partition, that.partition)
&& Objects.equals(nullValueCounts, that.nullValueCounts)
&& Objects.equals(lowerBounds, that.lowerBounds)
- && Objects.equals(upperBounds, that.upperBounds);
+ && Objects.equals(upperBounds, that.upperBounds)
+ && Objects.equals(referencedDataFile, that.referencedDataFile)
+ && Objects.equals(contentOffset, that.contentOffset)
+ && Objects.equals(contentSizeInBytes, that.contentSizeInBytes);
}
@Override
@@ -270,6 +351,9 @@ public class IcebergDataFileMeta {
fileSizeInBytes,
nullValueCounts,
lowerBounds,
- upperBounds);
+ upperBounds,
+ referencedDataFile,
+ contentOffset,
+ contentSizeInBytes);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
index 9b97425936..020b5e1b44 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
@@ -57,7 +57,10 @@ public class IcebergDataFileMetaSerializer extends
ObjectSerializer<IcebergDataF
file.fileSizeInBytes(),
file.nullValueCounts(),
file.lowerBounds(),
- file.upperBounds());
+ file.upperBounds(),
+ BinaryString.fromString(file.referencedDataFile()),
+ file.contentOffset(),
+ file.contentSizeInBytes());
}
@Override
@@ -71,6 +74,9 @@ public class IcebergDataFileMetaSerializer extends
ObjectSerializer<IcebergDataF
row.getLong(5),
nullValueCountsSerializer.copy(row.getMap(6)),
lowerBoundsSerializer.copy(row.getMap(7)),
- upperBoundsSerializer.copy(row.getMap(8)));
+ upperBoundsSerializer.copy(row.getMap(8)),
+ row.isNullAt(9) ? null : row.getString(9).toString(),
+ row.isNullAt(10) ? null : row.getLong(10),
+ row.isNullAt(11) ? null : row.getLong(11));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index 6e69602eb8..43ce1016a9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -161,9 +161,15 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
public List<IcebergManifestFileMeta> rollingWrite(
Iterator<IcebergManifestEntry> entries, long sequenceNumber) {
+
+ return rollingWrite(entries, sequenceNumber, Content.DATA);
+ }
+
+ public List<IcebergManifestFileMeta> rollingWrite(
+ Iterator<IcebergManifestEntry> entries, long sequenceNumber,
Content content) {
RollingFileWriter<IcebergManifestEntry, IcebergManifestFileMeta>
writer =
new RollingFileWriter<>(
- () -> createWriter(sequenceNumber),
targetFileSize.getBytes());
+ () -> createWriter(sequenceNumber, content),
targetFileSize.getBytes());
try {
writer.write(entries);
writer.close();
@@ -174,9 +180,9 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
}
public SingleFileWriter<IcebergManifestEntry, IcebergManifestFileMeta>
createWriter(
- long sequenceNumber) {
+ long sequenceNumber, Content content) {
return new IcebergManifestEntryWriter(
- writerFactory, pathFactory.newPath(), compression,
sequenceNumber);
+ writerFactory, pathFactory.newPath(), compression,
sequenceNumber, content);
}
private class IcebergManifestEntryWriter
@@ -193,11 +199,14 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
private long deletedRowsCount = 0;
private Long minSequenceNumber = null;
+ private final Content content;
+
IcebergManifestEntryWriter(
FormatWriterFactory factory,
Path path,
String fileCompression,
- long sequenceNumber) {
+ long sequenceNumber,
+ Content content) {
super(
IcebergManifestFile.this.fileIO,
factory,
@@ -207,6 +216,7 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
false);
this.partitionStatsCollector = new
SimpleStatsCollector(partitionType);
this.sequenceNumber = sequenceNumber;
+ this.content = content;
}
@Override
@@ -253,7 +263,7 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
path.toString(),
outputBytes,
IcebergPartitionSpec.SPEC_ID,
- Content.DATA,
+ content,
sequenceNumber,
minSequenceNumber != null ? minSequenceNumber :
UNASSIGNED_SEQ,
sequenceNumber,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
index 00c4c3de61..c755e32c7d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -44,7 +44,8 @@ import java.util.Objects;
@JsonIgnoreProperties(ignoreUnknown = true)
public class IcebergMetadata {
- public static final int CURRENT_FORMAT_VERSION = 2;
+ public static final int FORMAT_VERSION_V2 = 2;
+ public static final int FORMAT_VERSION_V3 = 3;
private static final String FIELD_FORMAT_VERSION = "format-version";
private static final String FIELD_TABLE_UUID = "table-uuid";
@@ -118,6 +119,7 @@ public class IcebergMetadata {
private final Map<String, IcebergRef> refs;
public IcebergMetadata(
+ int formatVersion,
String tableUuid,
String location,
long lastSequenceNumber,
@@ -130,7 +132,7 @@ public class IcebergMetadata {
long currentSnapshotId,
@Nullable Map<String, IcebergRef> refs) {
this(
- CURRENT_FORMAT_VERSION,
+ formatVersion,
tableUuid,
location,
lastSequenceNumber,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index f04b40b2ff..efd081c0bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -117,7 +117,10 @@ public class IndexFileHandler {
}
public List<IndexManifestEntry> scan(String indexType) {
- Snapshot snapshot = snapshotManager.latestSnapshot();
+ return scan(snapshotManager.latestSnapshot(), indexType);
+ }
+
+ public List<IndexManifestEntry> scan(Snapshot snapshot, String indexType) {
if (snapshot == null) {
return Collections.emptyList();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index dbd42d5466..cb6edc9d6d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -21,6 +21,7 @@ package org.apache.paimon.metastore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.PartitionHandler;
@@ -59,7 +60,10 @@ public class AddPartitionCommitCallback implements
CommitCallback {
}
@Override
- public void call(List<ManifestEntry> committedEntries, Snapshot snapshot) {
+ public void call(
+ List<ManifestEntry> committedEntries,
+ List<IndexManifestEntry> indexFiles,
+ Snapshot snapshot) {
Set<BinaryRow> partitions =
committedEntries.stream()
.filter(e -> FileKind.ADD.equals(e.kind()))
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
index 7e2e00dce3..2d9e44c39a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
@@ -19,6 +19,7 @@
package org.apache.paimon.metastore;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.sink.CommitCallback;
@@ -39,7 +40,10 @@ public class TagPreviewCommitCallback implements
CommitCallback {
}
@Override
- public void call(List<ManifestEntry> committedEntries, Snapshot snapshot) {
+ public void call(
+ List<ManifestEntry> committedEntries,
+ List<IndexManifestEntry> indexFiles,
+ Snapshot snapshot) {
long currentMillis = System.currentTimeMillis();
Optional<String> tagOptional = tagPreview.extractTag(currentMillis,
snapshot.watermark());
tagOptional.ifPresent(tagCallback::notifyCreation);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7046d970a4..f85bf6b2c7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1053,7 +1053,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
commitUser,
identifier,
commitKind.name());
- commitCallbacks.forEach(callback -> callback.call(deltaFiles,
newSnapshot));
+ commitCallbacks.forEach(callback -> callback.call(deltaFiles,
indexFiles, newSnapshot));
return new SuccessResult();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
index c4b606441d..40e615198e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
@@ -37,7 +38,10 @@ import java.util.List;
*/
public interface CommitCallback extends AutoCloseable {
- void call(List<ManifestEntry> committedEntries, Snapshot snapshot);
+ void call(
+ List<ManifestEntry> committedEntries,
+ List<IndexManifestEntry> indexFiles,
+ Snapshot snapshot);
void retry(ManifestCommittable committable);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 7b0105fa85..668894f23d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -25,10 +25,12 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.FileIO;
@@ -41,6 +43,7 @@ import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.iceberg.metadata.IcebergRef;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -48,6 +51,8 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
@@ -67,6 +72,8 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -1159,4 +1166,47 @@ public class IcebergCompatibilityTest {
result.close();
return actual;
}
+
+ private void validateIcebergResult(List<Object[]> expected) throws
Exception {
+ HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(),
tempDir.toString());
+ TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t");
+ org.apache.iceberg.Table icebergTable =
icebergCatalog.loadTable(icebergIdentifier);
+
+ Types.StructType type = icebergTable.schema().asStruct();
+
+ StructLikeSet actualSet = StructLikeSet.create(type);
+ StructLikeSet expectSet = StructLikeSet.create(type);
+
+ try (CloseableIterable<Record> reader =
IcebergGenerics.read(icebergTable).build()) {
+ reader.forEach(actualSet::add);
+ }
+ expectSet.addAll(
+ expected.stream().map(r -> icebergRecord(type,
r)).collect(Collectors.toList()));
+
+ assertThat(actualSet).isEqualTo(expectSet);
+ }
+
+ private org.apache.iceberg.data.GenericRecord icebergRecord(
+ Types.StructType type, Object[] row) {
+ org.apache.iceberg.data.GenericRecord record =
+ org.apache.iceberg.data.GenericRecord.create(type);
+ for (int i = 0; i < row.length; i++) {
+ record.set(i, row[i]);
+ }
+ return record;
+ }
+
+ private List<String> getPaimonResult(FileStoreTable paimonTable) throws
Exception {
+ List<Split> splits =
paimonTable.newReadBuilder().newScan().plan().splits();
+ TableRead read = paimonTable.newReadBuilder().newRead();
+ try (RecordReader<InternalRow> recordReader =
read.createReader(splits)) {
+ List<String> result = new ArrayList<>();
+ recordReader.forEachRemaining(
+ row ->
+ result.add(
+ DataFormatTestUtil.toStringWithRowKind(
+ row, paimonTable.rowType())));
+ return result;
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 227a3b58ee..6dc77799ae 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
@@ -179,7 +180,10 @@ public class TableCommitTest {
}
@Override
- public void call(List<ManifestEntry> entries, Snapshot snapshot) {
+ public void call(
+ List<ManifestEntry> entries,
+ List<IndexManifestEntry> indexFiles,
+ Snapshot snapshot) {
commitCallbackResult.get(testId).add(snapshot.commitIdentifier());
}
diff --git a/paimon-iceberg/pom.xml b/paimon-iceberg/pom.xml
new file mode 100644
index 0000000000..61f0f45ff1
--- /dev/null
+++ b/paimon-iceberg/pom.xml
@@ -0,0 +1,187 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>paimon-parent</artifactId>
+ <groupId>org.apache.paimon</groupId>
+ <version>1.2-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>paimon-iceberg</artifactId>
+ <name>Paimon : Iceberg</name>
+
+ <properties>
+ <target.java.version>11</target.java.version>
+ <iceberg.version>1.8.1</iceberg.version>
+ <flink.version>${paimon-flink-common.flink.version}</flink.version>
+ <iceberg.flink.version>1.19</iceberg.flink.version>
+ </properties>
+
+ <dependencies>
+
+ <!-- paimon test dependency -->
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-bundle</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- hadoop dependency -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.21</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- iceberg dependency -->
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-data</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>parquet-hadoop</artifactId>
+ <groupId>org.apache.parquet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-flink-${iceberg.flink.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- flink dependency -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/paimon-iceberg/src/test/java/org/apache/paimon/core/IcebergDVCompatibilityTest.java
b/paimon-iceberg/src/test/java/org/apache/paimon/core/IcebergDVCompatibilityTest.java
new file mode 100644
index 0000000000..e24109371e
--- /dev/null
+++
b/paimon-iceberg/src/test/java/org/apache/paimon/core/IcebergDVCompatibilityTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.core;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for Iceberg DV compatibility. */
+public class IcebergDVCompatibilityTest {
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testSyncDVWithoutBase() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ Map<String, String> customOptions = new HashMap<>();
+ customOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+ // must set deletion-vectors.bitmap64 = true
+ customOptions.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(), "true");
+ // must set metadata.iceberg.format-version = 3
+ customOptions.put(IcebergOptions.FORMAT_VERSION.key(), "3");
+
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ 1,
+ customOptions);
+
+ // disable iceberg metadata storage
+ Map<String, String> options = new HashMap<>();
+ options.put(
+ IcebergOptions.METADATA_ICEBERG_STORAGE.key(),
+ IcebergOptions.StorageType.DISABLED.toString());
+ table = table.copy(options);
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write =
+ table.newWrite(commitUser)
+ .withIOManager(new IOManagerImpl(tempDir.toString() +
"/tmp"));
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 10));
+ write.write(GenericRow.of(2, 20));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ write.write(GenericRow.ofKind(RowKind.DELETE, 2, 20));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ // produce dv
+ write.compact(BinaryRow.EMPTY_ROW, 0, false);
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ assertThat(
+ table.store()
+ .newIndexFileHandler()
+
.scan(DeletionVectorsIndexFile.DELETION_VECTORS_INDEX)
+ .size())
+ .isGreaterThan(0);
+ // enable iceberg metadata storage and commit changes to iceberg
+ options.put(
+ IcebergOptions.METADATA_ICEBERG_STORAGE.key(),
+ IcebergOptions.StorageType.TABLE_LOCATION.toString());
+ table = table.copy(options);
+ write =
+ table.newWrite(commitUser)
+ .withIOManager(new IOManagerImpl(tempDir.toString() +
"/tmp"));
+ commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
+ commit.commit(4, write.prepareCommit(false, 4));
+
+ validateIcebergResult(Collections.singletonList(new Object[] {1, 10}));
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSyncDVWithBase() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ Map<String, String> customOptions = new HashMap<>();
+ customOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+ // must set deletion-vectors.bitmap64 = true
+ customOptions.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(), "true");
+ // must set metadata.iceberg.format-version = 3
+ customOptions.put(IcebergOptions.FORMAT_VERSION.key(), "3");
+
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ 1,
+ customOptions);
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write =
+ table.newWrite(commitUser)
+ .withIOManager(new IOManagerImpl(tempDir.toString() +
"/tmp"));
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 1));
+ write.write(GenericRow.of(2, 2));
+ write.write(GenericRow.of(3, 3));
+ commit.commit(1, write.prepareCommit(false, 1));
+ validateIcebergResult(
+ Arrays.asList(new Object[] {1, 1}, new Object[] {2, 2}, new
Object[] {3, 3}));
+
+ write.write(GenericRow.of(1, 11));
+ write.write(GenericRow.of(4, 4));
+ // compact to generate dv index
+ write.compact(BinaryRow.EMPTY_ROW, 0, false);
+ commit.commit(2, write.prepareCommit(true, 2));
+ validateIcebergResult(
+ Arrays.asList(
+ new Object[] {1, 11},
+ new Object[] {2, 2},
+ new Object[] {3, 3},
+ new Object[] {4, 4}));
+
+ // level-0 file will not be added to iceberg
+ write.write(GenericRow.of(2, 22));
+ write.write(GenericRow.of(5, 5));
+ commit.commit(3, write.prepareCommit(false, 3));
+ validateIcebergResult(
+ Arrays.asList(
+ new Object[] {1, 11},
+ new Object[] {2, 2},
+ new Object[] {3, 3},
+ new Object[] {4, 4}));
+
+ // compact to generate dv index
+ write.compact(BinaryRow.EMPTY_ROW, 0, false);
+ commit.commit(4, write.prepareCommit(true, 4));
+ validateIcebergResult(
+ Arrays.asList(
+ new Object[] {1, 11},
+ new Object[] {2, 22},
+ new Object[] {3, 3},
+ new Object[] {4, 4},
+ new Object[] {5, 5}));
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testUpgradeFormatVersion() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ Map<String, String> customOptions = new HashMap<>();
+ customOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+ // must set deletion-vectors.bitmap64 = true
+ customOptions.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(), "true");
+
+ FileStoreTable table =
+ createPaimonTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ 1,
+ customOptions);
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write =
+ table.newWrite(commitUser)
+ .withIOManager(new IOManagerImpl(tempDir.toString() +
"/tmp"));
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 1));
+ write.write(GenericRow.of(2, 2));
+ commit.commit(1, write.prepareCommit(false, 1));
+ validateIcebergResult(Arrays.asList(new Object[] {1, 1}, new Object[]
{2, 2}));
+
+ // upgrade format version to 3
+ Map<String, String> options = new HashMap<>();
+ options.put(IcebergOptions.FORMAT_VERSION.key(), "3");
+ table = table.copy(options);
+ write =
+ table.newWrite(commitUser)
+ .withIOManager(new IOManagerImpl(tempDir.toString() +
"/tmp"));
+ commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(1, 11));
+ write.write(GenericRow.of(3, 3));
+ // compact to generate dv index
+ write.compact(BinaryRow.EMPTY_ROW, 0, false);
+ commit.commit(2, write.prepareCommit(true, 2));
+ validateIcebergResult(
+ Arrays.asList(new Object[] {1, 11}, new Object[] {2, 2}, new
Object[] {3, 3}));
+
+ write.close();
+ commit.close();
+ }
+
+ private FileStoreTable createPaimonTable(
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ int numBuckets,
+ Map<String, String> customOptions)
+ throws Exception {
+ LocalFileIO fileIO = LocalFileIO.create();
+ Path path = new Path(tempDir.toString());
+
+ Options options = new Options(customOptions);
+ options.set(CoreOptions.BUCKET, numBuckets);
+ options.set(
+ IcebergOptions.METADATA_ICEBERG_STORAGE,
IcebergOptions.StorageType.TABLE_LOCATION);
+ options.set(CoreOptions.FILE_FORMAT, "avro");
+ options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
+ options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 4);
+ options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 8);
+ options.set(IcebergOptions.METADATA_DELETE_AFTER_COMMIT, true);
+ options.set(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX, 1);
+ options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE,
MemorySize.ofKibiBytes(8));
+ Schema schema =
+ new Schema(rowType.getFields(), partitionKeys, primaryKeys,
options.toMap(), "");
+
+ try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO,
path)) {
+ paimonCatalog.createDatabase("mydb", false);
+ Identifier paimonIdentifier = Identifier.create("mydb", "t");
+ paimonCatalog.createTable(paimonIdentifier, schema, false);
+ return (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
+ }
+ }
+
+ private void validateIcebergResult(List<Object[]> expected) throws
Exception {
+ HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(),
tempDir.toString());
+ TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t");
+ org.apache.iceberg.Table icebergTable =
icebergCatalog.loadTable(icebergIdentifier);
+
+ Types.StructType type = icebergTable.schema().asStruct();
+
+ StructLikeSet actualSet = StructLikeSet.create(type);
+ StructLikeSet expectSet = StructLikeSet.create(type);
+
+ try (CloseableIterable<Record> reader =
IcebergGenerics.read(icebergTable).build()) {
+ reader.forEach(actualSet::add);
+ }
+ expectSet.addAll(
+ expected.stream().map(r -> icebergRecord(type,
r)).collect(Collectors.toList()));
+
+ assertThat(actualSet).isEqualTo(expectSet);
+ }
+
+ private org.apache.iceberg.data.GenericRecord icebergRecord(
+ Types.StructType type, Object[] row) {
+ org.apache.iceberg.data.GenericRecord record =
+ org.apache.iceberg.data.GenericRecord.create(type);
+ for (int i = 0; i < row.length; i++) {
+ record.set(i, row[i]);
+ }
+ return record;
+ }
+}
diff --git
a/paimon-iceberg/src/test/java/org/apache/paimon/flink/FlinkIcebergITCase.java
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/FlinkIcebergITCase.java
new file mode 100644
index 0000000000..a71ee78d1e
--- /dev/null
+++
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/FlinkIcebergITCase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for Paimon Iceberg compatibility. */
+public class FlinkIcebergITCase extends AbstractTestBase {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"avro"})
+ public void testDeletionVector(String format) throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
tableEnvironmentBuilder().batchMode().parallelism(2).build();
+ tEnv.executeSql(
+ "CREATE CATALOG paimon WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql(
+ "CREATE TABLE paimon.`default`.T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v INT,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'metadata.iceberg.storage' = 'hadoop-catalog',\n"
+ + " 'metadata.iceberg.format-version' = '3',\n"
+ + " 'deletion-vectors.enabled' = 'true',\n"
+ + " 'deletion-vectors.bitmap64' = 'true',\n"
+ + " 'file.format' = '"
+ + format
+ + "'\n"
+ + ")");
+ tEnv.executeSql(
+ "INSERT INTO paimon.`default`.T VALUES "
+ + "(1, 9, 90), "
+ + "(1, 10, 100), "
+ + "(1, 11, 110), "
+ + "(2, 20, 200)")
+ .await();
+
+ tEnv.executeSql(
+ "CREATE CATALOG iceberg WITH (\n"
+ + " 'type' = 'iceberg',\n"
+ + " 'catalog-type' = 'hadoop',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "/iceberg',\n"
+ + " 'cache-enabled' = 'false'\n"
+ + ")");
+ assertThat(
+ collect(
+ tEnv.executeSql(
+ "SELECT v, k, pt FROM
iceberg.`default`.T ORDER BY pt, k")))
+ .containsExactly(
+ Row.of(90, 9, 1),
+ Row.of(100, 10, 1),
+ Row.of(110, 11, 1),
+ Row.of(200, 20, 2));
+
+ tEnv.executeSql(
+ "INSERT INTO paimon.`default`.T VALUES "
+ + "(1, 10, 101), "
+ + "(2, 20, 201), "
+ + "(1, 12, 121)")
+ .await();
+
+ // make sure that there are dv indexes generated
+ CloseableIterator<Row> iter =
+ tEnv.executeSql(
+ "SELECT * FROM
paimon.`default`.`T$table_indexes` WHERE index_type='DELETION_VECTORS'")
+ .collect();
+ assertThat(ImmutableList.copyOf(iter).size()).isGreaterThan(0);
+ assertThat(
+ collect(
+ tEnv.executeSql(
+ "SELECT v, k, pt FROM
iceberg.`default`.T ORDER BY pt, k")))
+ .containsExactly(
+ Row.of(90, 9, 1),
+ Row.of(101, 10, 1),
+ Row.of(110, 11, 1),
+ Row.of(121, 12, 1),
+ Row.of(201, 20, 2));
+ }
+
+ private List<Row> collect(TableResult result) throws Exception {
+ List<Row> rows = new ArrayList<>();
+ try (CloseableIterator<Row> it = result.collect()) {
+ while (it.hasNext()) {
+ rows.add(it.next());
+ }
+ }
+ return rows;
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
index c93e10ef90..d44164da3c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
@@ -19,12 +19,11 @@
package org.apache.paimon.spark
import org.apache.paimon.{CoreOptions, Snapshot}
-import org.apache.paimon.manifest.{ManifestCommittable, ManifestEntry}
+import org.apache.paimon.manifest.{IndexManifestEntry, ManifestCommittable,
ManifestEntry}
import org.apache.paimon.table.sink.CommitCallback
import org.junit.jupiter.api.Assertions
-import java.lang
import java.util.List
class PaimonCommitTest extends PaimonSparkTestBase {
@@ -64,7 +63,10 @@ object PaimonCommitTest {
case class CustomCommitCallback(testId: String) extends CommitCallback {
- override def call(committedEntries: List[ManifestEntry], snapshot:
Snapshot): Unit = {
+ override def call(
+ committedEntries: List[ManifestEntry],
+ indexFiles: List[IndexManifestEntry],
+ snapshot: Snapshot): Unit = {
PaimonCommitTest.id = testId
}
diff --git a/pom.xml b/pom.xml
index 28bb674e11..da22b98e8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,6 +71,7 @@ under the License.
<module>tools/ci/paimon-ci-tools</module>
<module>paimon-hudi</module>
<module>paimon-api</module>
+ <module>paimon-iceberg</module>
</modules>
<properties>