This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 3b1f7593ccdd14979125040b5b74ef1125e59008 Author: Zouxxyy <[email protected]> AuthorDate: Fri Sep 19 15:39:01 2025 +0800 [core] Fix the commit kind when performing row-level changes on non-pk table (#6025) --- .../src/main/java/org/apache/paimon/Snapshot.java | 7 +- .../apache/paimon/operation/FileStoreCommit.java | 2 +- .../paimon/operation/FileStoreCommitImpl.java | 181 ++++++++++++--------- .../apache/paimon/table/sink/TableCommitImpl.java | 3 +- .../test/java/org/apache/paimon/TestFileStore.java | 2 +- .../apache/paimon/operation/FileDeletionTest.java | 4 +- .../apache/paimon/operation/TestCommitThread.java | 2 +- .../paimon/spark/sql/MergeIntoTableTest.scala | 2 + .../paimon/spark/sql/MergeIntoTableTest.scala | 2 + .../paimon/spark/sql/MergeIntoTableTest.scala | 2 + .../paimon/spark/sql/MergeIntoTableTest.scala | 2 + .../paimon/spark/sql/MergeIntoTableTest.scala | 2 + .../paimon/spark/sql/DeleteFromTableTestBase.scala | 38 ++++- .../paimon/spark/sql/MergeIntoTableTestBase.scala | 91 ++++++++++- .../paimon/spark/sql/UpdateTableTestBase.scala | 38 ++++- 15 files changed, 287 insertions(+), 91 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java index 31defa3bfb..a318a72e40 100644 --- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java @@ -496,13 +496,16 @@ public class Snapshot implements Serializable { /** Type of changes in this snapshot. */ public enum CommitKind { - /** Changes flushed from the mem table. */ + /** New data files are appended to the table and no data file is deleted. */ APPEND, /** Changes by compacting existing data files. */ COMPACT, - /** Changes that clear up the whole partition and then add new records. */ + /** + * New data files are added to overwrite existing data files or just delete existing data + * files. + */ OVERWRITE, /** Collect statistics. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index 4156ce0a83..6a00db6f0e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -50,7 +50,7 @@ public interface FileStoreCommit extends AutoCloseable { * note that this partition does not necessarily equal to the partitions of the newly added * key-values. This is just the partition to be cleaned up. */ - int overwrite( + int overwritePartition( Map<String, String> partition, ManifestCommittable committable, Map<String, String> properties); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index a11339fe7d..300cc9c926 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -25,7 +25,6 @@ import org.apache.paimon.catalog.SnapshotCommit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.FileEntry; @@ -88,7 +87,6 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; -import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; import static org.apache.paimon.manifest.ManifestEntry.recordCount; import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd; import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete; @@ -296,24 +294,24 @@ public class FileStoreCommitImpl implements FileStoreCommit { List<ManifestEntry> appendTableFiles = new ArrayList<>(); List<ManifestEntry> appendChangelog = new ArrayList<>(); + List<IndexManifestEntry> appendIndexFiles = new ArrayList<>(); List<ManifestEntry> compactTableFiles = new ArrayList<>(); List<ManifestEntry> compactChangelog = new ArrayList<>(); - List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>(); - List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>(); + List<IndexManifestEntry> compactIndexFiles = new ArrayList<>(); collectChanges( committable.fileCommittables(), appendTableFiles, appendChangelog, + appendIndexFiles, compactTableFiles, compactChangelog, - appendHashIndexFiles, - compactDvIndexFiles); + compactIndexFiles); try { List<SimpleFileEntry> appendSimpleEntries = SimpleFileEntry.from(appendTableFiles); if (!ignoreEmptyCommit || !appendTableFiles.isEmpty() || !appendChangelog.isEmpty() - || !appendHashIndexFiles.isEmpty()) { + || !appendIndexFiles.isEmpty()) { // Optimization for common path. // Step 1: // Read manifest entries from changed partitions here and check for conflicts. @@ -322,6 +320,10 @@ public class FileStoreCommitImpl implements FileStoreCommit { // This optimization is mainly used to decrease the number of times we read from // files. latestSnapshot = snapshotManager.latestSnapshot(); + boolean hasDelete = hasDelete(appendSimpleEntries, appendIndexFiles); + Snapshot.CommitKind commitKind = + hasDelete ? Snapshot.CommitKind.OVERWRITE : Snapshot.CommitKind.APPEND; + if (latestSnapshot != null && checkAppendFiles) { // it is possible that some partitions only have compact changes, // so we need to contain all changes @@ -332,7 +334,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { latestSnapshot.commitUser(), baseEntries, appendSimpleEntries, - Snapshot.CommitKind.APPEND); + commitKind); safeLatestSnapshotId = latestSnapshot.id(); } @@ -340,20 +342,20 @@ public class FileStoreCommitImpl implements FileStoreCommit { tryCommit( appendTableFiles, appendChangelog, - appendHashIndexFiles, + appendIndexFiles, committable.identifier(), committable.watermark(), committable.logOffsets(), committable.properties(), - Snapshot.CommitKind.APPEND, - noConflictCheck(), + commitKind, + hasDelete ? mustConflictCheck() : noConflictCheck(), null); generatedSnapshot += 1; } if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty() - || !compactDvIndexFiles.isEmpty()) { + || !compactIndexFiles.isEmpty()) { // Optimization for common path. // Step 2: // Add appendChanges to the manifest entries read above and check for conflicts. @@ -376,7 +378,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { tryCommit( compactTableFiles, compactChangelog, - compactDvIndexFiles, + compactIndexFiles, committable.identifier(), committable.watermark(), committable.logOffsets(), @@ -426,8 +428,23 @@ public class FileStoreCommitImpl implements FileStoreCommit { commitMetrics.reportCommit(commitStats); } + private boolean hasDelete( + List<SimpleFileEntry> appendSimpleEntries, List<IndexManifestEntry> appendIndexFiles) { + for (SimpleFileEntry appendSimpleEntry : appendSimpleEntries) { + if (appendSimpleEntry.kind().equals(FileKind.DELETE)) { + return true; + } + } + for (IndexManifestEntry appendIndexFile : appendIndexFiles) { + if (appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) { + return true; + } + } + return false; + } + @Override - public int overwrite( + public int overwritePartition( Map<String, String> partition, ManifestCommittable committable, Map<String, String> properties) { @@ -448,18 +465,18 @@ public class FileStoreCommitImpl implements FileStoreCommit { int attempts = 0; List<ManifestEntry> appendTableFiles = new ArrayList<>(); List<ManifestEntry> appendChangelog = new ArrayList<>(); + List<IndexManifestEntry> appendIndexFiles = new ArrayList<>(); List<ManifestEntry> compactTableFiles = new ArrayList<>(); List<ManifestEntry> compactChangelog = new ArrayList<>(); - List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>(); - List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>(); + List<IndexManifestEntry> compactIndexFiles = new ArrayList<>(); collectChanges( committable.fileCommittables(), appendTableFiles, appendChangelog, + appendIndexFiles, compactTableFiles, compactChangelog, - appendHashIndexFiles, - compactDvIndexFiles); + compactIndexFiles); if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) { StringBuilder warnMessage = @@ -493,7 +510,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions); } } else { - // partition may be partial partition fields, so here must to use predicate way. + // partition may be partial partition fields, so here must use predicate way. Predicate partitionPredicate = createPartitionPredicate(partition, partitionType, partitionDefaultName); partitionFilter = @@ -516,10 +533,10 @@ public class FileStoreCommitImpl implements FileStoreCommit { // overwrite new files if (!skipOverwrite) { attempts += - tryOverwrite( + tryOverwritePartition( partitionFilter, appendTableFiles, - appendHashIndexFiles, + appendIndexFiles, committable.identifier(), committable.watermark(), committable.logOffsets(), @@ -527,12 +544,12 @@ public class FileStoreCommitImpl implements FileStoreCommit { generatedSnapshot += 1; } - if (!compactTableFiles.isEmpty() || !compactDvIndexFiles.isEmpty()) { + if (!compactTableFiles.isEmpty() || !compactIndexFiles.isEmpty()) { attempts += tryCommit( compactTableFiles, emptyList(), - compactDvIndexFiles, + compactIndexFiles, committable.identifier(), committable.watermark(), committable.logOffsets(), @@ -590,7 +607,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { partitionFilter = PartitionPredicate.fromPredicate(partitionType, predicate); } - tryOverwrite( + tryOverwritePartition( partitionFilter, emptyList(), emptyList(), @@ -602,7 +619,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { @Override public void truncateTable(long commitIdentifier) { - tryOverwrite( + tryOverwritePartition( null, emptyList(), emptyList(), @@ -666,10 +683,10 @@ public class FileStoreCommitImpl implements FileStoreCommit { List<CommitMessage> commitMessages, List<ManifestEntry> appendTableFiles, List<ManifestEntry> appendChangelog, + List<IndexManifestEntry> appendIndexFiles, List<ManifestEntry> compactTableFiles, List<ManifestEntry> compactChangelog, - List<IndexManifestEntry> appendHashIndexFiles, - List<IndexManifestEntry> compactDvIndexFiles) { + List<IndexManifestEntry> compactIndexFiles) { for (CommitMessage message : commitMessages) { CommitMessageImpl commitMessage = (CommitMessageImpl) message; commitMessage @@ -687,6 +704,29 @@ public class FileStoreCommitImpl implements FileStoreCommit { .newFilesIncrement() .changelogFiles() .forEach(m -> appendChangelog.add(makeEntry(FileKind.ADD, commitMessage, m))); + commitMessage + .newFilesIncrement() + .newIndexFiles() + .forEach( + m -> + appendIndexFiles.add( + new IndexManifestEntry( + FileKind.ADD, + commitMessage.partition(), + commitMessage.bucket(), + m))); + commitMessage + .newFilesIncrement() + .deletedIndexFiles() + .forEach( + m -> + appendIndexFiles.add( + new IndexManifestEntry( + FileKind.DELETE, + commitMessage.partition(), + commitMessage.bucket(), + m))); + commitMessage .compactIncrement() .compactBefore() @@ -702,53 +742,28 @@ public class FileStoreCommitImpl implements FileStoreCommit { .compactIncrement() .changelogFiles() .forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD, commitMessage, m))); - - // todo: split them - List<IndexFileMeta> newIndexFiles = - new ArrayList<>(commitMessage.newFilesIncrement().newIndexFiles()); - newIndexFiles.addAll(commitMessage.compactIncrement().newIndexFiles()); - newIndexFiles.forEach( - f -> { - switch (f.indexType()) { - case HASH_INDEX: - appendHashIndexFiles.add( - new IndexManifestEntry( - FileKind.ADD, - commitMessage.partition(), - commitMessage.bucket(), - f)); - break; - case DELETION_VECTORS_INDEX: - compactDvIndexFiles.add( - new IndexManifestEntry( - FileKind.ADD, - commitMessage.partition(), - commitMessage.bucket(), - f)); - break; - default: - throw new RuntimeException("Unknown index type: " + f.indexType()); - } - }); - - // todo: split them - List<IndexFileMeta> deletedIndexFiles = - new ArrayList<>(commitMessage.newFilesIncrement().deletedIndexFiles()); - deletedIndexFiles.addAll(commitMessage.compactIncrement().deletedIndexFiles()); - deletedIndexFiles.forEach( - f -> { - if (f.indexType().equals(DELETION_VECTORS_INDEX)) { - compactDvIndexFiles.add( - new IndexManifestEntry( - FileKind.DELETE, - commitMessage.partition(), - commitMessage.bucket(), - f)); - } else { - throw new RuntimeException( - "This index type is not supported to delete: " + f.indexType()); - } - }); + commitMessage + .compactIncrement() + .newIndexFiles() + .forEach( + m -> + compactIndexFiles.add( + new IndexManifestEntry( + FileKind.ADD, + commitMessage.partition(), + commitMessage.bucket(), + m))); + commitMessage + .compactIncrement() + .deletedIndexFiles() + .forEach( + m -> + compactIndexFiles.add( + new IndexManifestEntry( + FileKind.DELETE, + commitMessage.partition(), + commitMessage.bucket(), + m))); } if (!commitMessages.isEmpty()) { List<String> msg = new ArrayList<>(); @@ -758,17 +773,17 @@ public class FileStoreCommitImpl implements FileStoreCommit { if (!appendChangelog.isEmpty()) { msg.add(appendChangelog.size() + " append Changelogs"); } + if (!appendIndexFiles.isEmpty()) { + msg.add(appendIndexFiles.size() + " append index files"); + } if (!compactTableFiles.isEmpty()) { msg.add(compactTableFiles.size() + " compact table files"); } if (!compactChangelog.isEmpty()) { msg.add(compactChangelog.size() + " compact Changelogs"); } - if (!appendHashIndexFiles.isEmpty()) { - msg.add(appendHashIndexFiles.size() + " append hash index files"); - } - if (!compactDvIndexFiles.isEmpty()) { - msg.add(compactDvIndexFiles.size() + " compact dv index files"); + if (!compactIndexFiles.isEmpty()) { + msg.add(compactIndexFiles.size() + " compact index files"); } LOG.info("Finished collecting changes, including: {}", String.join(", ", msg)); } @@ -836,7 +851,13 @@ public class FileStoreCommitImpl implements FileStoreCommit { return retryCount + 1; } - private int tryOverwrite( + /** + * Try to overwrite partition. + * + * @param partitionFilter Partition filter indicating which partitions to overwrite, if {@code + * null}, overwrites the entire table. + */ + private int tryOverwritePartition( @Nullable PartitionPredicate partitionFilter, List<ManifestEntry> changes, List<IndexManifestEntry> indexFiles, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index e549bd2e2c..7f19c7443c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -248,7 +248,8 @@ public class TableCommitImpl implements InnerTableCommit { committable = new ManifestCommittable(Long.MAX_VALUE); } int newSnapshots = - commit.overwrite(overwritePartition, committable, Collections.emptyMap()); + commit.overwritePartition( + overwritePartition, committable, Collections.emptyMap()); maintain( committable.identifier(), maintainExecutor, diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index dc1702c264..aa405060f4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -263,7 +263,7 @@ public class TestFileStore extends KeyValueFileStore { null, Collections.emptyList(), (commit, committable) -> - commit.overwrite(partition, committable, Collections.emptyMap())); + commit.overwritePartition(partition, committable, Collections.emptyMap())); } public Snapshot dropPartitions(List<Map<String, String>> partitions) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 8e794fe749..0bc6e041ab 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -154,13 +154,13 @@ public class FileDeletionTest { FileStoreCommitImpl commit = store.newCommit(); Map<String, String> partitionSpec = new HashMap<>(); partitionSpec.put("dt", "0401"); - commit.overwrite( + commit.overwritePartition( partitionSpec, new ManifestCommittable(commitIdentifier++), Collections.emptyMap()); // step 3: generate snapshot 3 by cleaning partition dt=0402/hr=10 partitionSpec.put("dt", "0402"); partitionSpec.put("hr", "8"); - commit.overwrite( + commit.overwritePartition( partitionSpec, new ManifestCommittable(commitIdentifier++), Collections.emptyMap()); commit.close(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java index 2f409a9176..cdd2d8fba1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java @@ -169,7 +169,7 @@ public class TestCommitThread extends Thread { runWithRetry( committable, () -> - commit.overwrite( + commit.overwritePartition( TestKeyValueGenerator.toPartitionMap(partition, MULTI_PARTITIONED), committable, Collections.emptyMap())); diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index f1f0d8c065..5ab343c41c 100644 --- a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -32,8 +32,10 @@ class MergeIntoPrimaryKeyNonBucketTableTest class MergeIntoAppendBucketedTableTest extends MergeIntoTableTestBase + with MergeIntoAppendTableTest with PaimonAppendBucketedTableTest {} class MergeIntoAppendNonBucketedTableTest extends MergeIntoTableTestBase + with MergeIntoAppendTableTest with PaimonAppendNonBucketTableTest {} diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index f1f0d8c065..5ab343c41c 100644 --- a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -32,8 +32,10 @@ class MergeIntoPrimaryKeyNonBucketTableTest class MergeIntoAppendBucketedTableTest extends MergeIntoTableTestBase + with MergeIntoAppendTableTest with PaimonAppendBucketedTableTest {} class MergeIntoAppendNonBucketedTableTest extends MergeIntoTableTestBase + with MergeIntoAppendTableTest with PaimonAppendNonBucketTableTest {} diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index e1cfe3a396..b9a85b147e 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest class MergeIntoAppendBucketedTableTest extends MergeIntoTableTestBase + with MergeIntoAppendTableTest with MergeIntoNotMatchedBySourceTest with PaimonAppendBucketedTableTest {} class MergeIntoAppendNonBucketedTableTest extends MergeIntoTableTestBase + with MergeIntoAppendTableTest with MergeIntoNotMatchedBySourceTest with PaimonAppendNonBucketTableTest {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index e1cfe3a396..b9a85b147e 100644 --- a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest class MergeIntoAppendBucketedTableTest extends MergeIntoTableTestBase + with MergeIntoAppendTableTest with MergeIntoNotMatchedBySourceTest with PaimonAppendBucketedTableTest {} class MergeIntoAppendNonBucketedTableTest extends MergeIntoTableTestBase + with MergeIntoAppendTableTest with MergeIntoNotMatchedBySourceTest with PaimonAppendNonBucketTableTest {} diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index e1cfe3a396..b9a85b147e 100644 --- a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest class MergeIntoAppendBucketedTableTest extends MergeIntoTableTestBase + with MergeIntoAppendTableTest with MergeIntoNotMatchedBySourceTest with PaimonAppendBucketedTableTest {} class MergeIntoAppendNonBucketedTableTest extends MergeIntoTableTestBase + with MergeIntoAppendTableTest with MergeIntoNotMatchedBySourceTest with PaimonAppendNonBucketTableTest {} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala index 6b38c1f5f0..ea05d94dd3 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark.sql -import org.apache.paimon.CoreOptions +import org.apache.paimon.{CoreOptions, Snapshot} import org.apache.paimon.CoreOptions.MergeEngine import org.apache.paimon.spark.PaimonSparkTestBase @@ -435,4 +435,40 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase { } } } + + test("Paimon delete: non pk table commit kind") { + for (dvEnabled <- Seq(true, false)) { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('deletion-vectors.enabled' = '$dvEnabled')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") + + sql("DELETE FROM t WHERE id = 1") + checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(2, 2), Row(3, 3))) + val table = loadTable("t") + var latestSnapshot = table.latestSnapshot().get() + assert(latestSnapshot.id == 2) + assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE)) + + sql("DELETE FROM t WHERE id = 2") + checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(3, 3))) + latestSnapshot = table.latestSnapshot().get() + assert(latestSnapshot.id == 3) + assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE)) + } + } + } + + test("Paimon delete: pk dv table commit kind") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'primary-key' = 'id')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") + sql("DELETE FROM t WHERE id = 1") + val table = loadTable("t") + val latestSnapshot = table.latestSnapshot().get() + assert(latestSnapshot.id == 4) + assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT)) + } + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index 4564e1646c..c867aed9f0 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -18,10 +18,15 @@ package org.apache.paimon.spark.sql -import org.apache.paimon.spark.{PaimonPrimaryKeyTable, PaimonSparkTestBase, PaimonTableTest} +import org.apache.paimon.Snapshot +import org.apache.paimon.spark.{PaimonAppendTable, PaimonPrimaryKeyTable, PaimonSparkTestBase, PaimonTableTest} import org.apache.spark.sql.Row +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.DurationInt + abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTableTest { import testImplicits._ @@ -714,3 +719,87 @@ trait MergeIntoPrimaryKeyTableTest extends PaimonSparkTestBase with PaimonPrimar } } } + +trait MergeIntoAppendTableTest extends PaimonSparkTestBase with PaimonAppendTable { + + test("Paimon MergeInto: non pk table commit kind") { + withTable("s", "t") { + createTable("s", "id INT, b INT, c INT", Seq("id")) + sql("INSERT INTO s VALUES (1, 1, 1)") + + createTable("t", "id INT, b INT, c INT", Seq("id")) + sql("INSERT INTO t VALUES (2, 2, 2)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN + |INSERT (id, b, c) VALUES (s.id, s.b, s.c); + |""".stripMargin) + + val table = loadTable("t") + var latestSnapshot = table.latestSnapshot().get() + assert(latestSnapshot.id == 2) + // no old data is deleted, so the commit kind is APPEND + assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.APPEND)) + + sql("INSERT INTO s VALUES (2, 22, 22)") + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN + |UPDATE SET id = s.id, b = s.b, c = s.c + |""".stripMargin) + latestSnapshot = table.latestSnapshot().get() + assert(latestSnapshot.id == 3) + // new data is updated, so the commit kind is OVERWRITE + assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE)) + } + } + + test("Paimon MergeInto: concurrent merge and compact") { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT, c INT)") + sql("INSERT INTO s VALUES (1, 1, 1)") + + sql("CREATE TABLE t (id INT, b INT, c INT)") + sql("INSERT INTO t VALUES (1, 1, 1)") + + val mergeInto = Future { + for (_ <- 1 to 10) { + try { + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN + |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c + |""".stripMargin) + } catch { + case a: Throwable => + assert( + a.getMessage.contains("Conflicts during commits") || a.getMessage.contains( + "Missing file")) + } + checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1))) + } + } + + val compact = Future { + for (_ <- 1 to 10) { + try { + sql("CALL sys.compact(table => 't', order_strategy => 'order', order_by => 'id')") + } catch { + case a: Throwable => assert(a.getMessage.contains("Conflicts during commits")) + } + checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1))) + } + } + + Await.result(mergeInto, 60.seconds) + Await.result(compact, 60.seconds) + } + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala index 5cbefe9faa..097d2c4e14 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark.sql -import org.apache.paimon.CoreOptions +import org.apache.paimon.{CoreOptions, Snapshot} import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.spark.catalyst.analysis.Update @@ -357,4 +357,40 @@ abstract class UpdateTableTestBase extends PaimonSparkTestBase { sql("UPDATE T SET c = 'b' WHERE id = 1") checkAnswer(sql("SELECT * FROM T"), Seq(Row(1, "s", "b"))) } + + test("Paimon update: non pk table commit kind") { + for (dvEnabled <- Seq(true, false)) { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('deletion-vectors.enabled' = '$dvEnabled')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") + + sql("UPDATE t SET data = 111 WHERE id = 1") + checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 111), Row(2, 2), Row(3, 3))) + val table = loadTable("t") + var latestSnapshot = table.latestSnapshot().get() + assert(latestSnapshot.id == 2) + assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE)) + + sql("UPDATE t SET data = 222 WHERE id = 2") + checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 111), Row(2, 222), Row(3, 3))) + latestSnapshot = table.latestSnapshot().get() + assert(latestSnapshot.id == 3) + assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE)) + } + } + } + + test("Paimon update: pk dv table commit kind") { + withTable("t") { + sql( + s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'primary-key' = 'id')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(1, 4)") + sql("UPDATE t SET data = 111 WHERE id = 1") + val table = loadTable("t") + val latestSnapshot = table.latestSnapshot().get() + assert(latestSnapshot.id == 4) + assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT)) + } + } }
