This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4b7875c853 [core] Fix the commit kind when performing row-level
changes on non-pk table (#6025)
4b7875c853 is described below
commit 4b7875c853a6f02f0a2752ad8d388138a7ae7105
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Sep 19 15:39:01 2025 +0800
[core] Fix the commit kind when performing row-level changes on non-pk
table (#6025)
---
.../src/main/java/org/apache/paimon/Snapshot.java | 7 +-
.../apache/paimon/operation/FileStoreCommit.java | 2 +-
.../paimon/operation/FileStoreCommitImpl.java | 181 ++++++++++++---------
.../apache/paimon/table/sink/TableCommitImpl.java | 3 +-
.../test/java/org/apache/paimon/TestFileStore.java | 2 +-
.../apache/paimon/operation/FileDeletionTest.java | 4 +-
.../apache/paimon/operation/TestCommitThread.java | 2 +-
.../paimon/spark/sql/MergeIntoTableTest.scala | 2 +
.../paimon/spark/sql/MergeIntoTableTest.scala | 2 +
.../paimon/spark/sql/MergeIntoTableTest.scala | 2 +
.../paimon/spark/sql/MergeIntoTableTest.scala | 2 +
.../paimon/spark/sql/MergeIntoTableTest.scala | 2 +
.../paimon/spark/sql/DeleteFromTableTestBase.scala | 38 ++++-
.../paimon/spark/sql/MergeIntoTableTestBase.scala | 91 ++++++++++-
.../paimon/spark/sql/UpdateTableTestBase.scala | 38 ++++-
15 files changed, 287 insertions(+), 91 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
index 31defa3bfb..a318a72e40 100644
--- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
@@ -496,13 +496,16 @@ public class Snapshot implements Serializable {
/** Type of changes in this snapshot. */
public enum CommitKind {
- /** Changes flushed from the mem table. */
+ /** New data files are appended to the table and no data file is
deleted. */
APPEND,
/** Changes by compacting existing data files. */
COMPACT,
- /** Changes that clear up the whole partition and then add new
records. */
+ /**
+ * New data files are added to overwrite existing data files or just
delete existing data
+ * files.
+ */
OVERWRITE,
/** Collect statistics. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 4156ce0a83..6a00db6f0e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -50,7 +50,7 @@ public interface FileStoreCommit extends AutoCloseable {
* note that this partition does not necessarily equal to the
partitions of the newly added
* key-values. This is just the partition to be cleaned up.
*/
- int overwrite(
+ int overwritePartition(
Map<String, String> partition,
ManifestCommittable committable,
Map<String, String> properties);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index a11339fe7d..300cc9c926 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -25,7 +25,6 @@ import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileEntry;
@@ -88,7 +87,6 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
-import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
@@ -296,24 +294,24 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
List<ManifestEntry> appendTableFiles = new ArrayList<>();
List<ManifestEntry> appendChangelog = new ArrayList<>();
+ List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
List<ManifestEntry> compactChangelog = new ArrayList<>();
- List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
- List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
+ List<IndexManifestEntry> compactIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
+ appendIndexFiles,
compactTableFiles,
compactChangelog,
- appendHashIndexFiles,
- compactDvIndexFiles);
+ compactIndexFiles);
try {
List<SimpleFileEntry> appendSimpleEntries =
SimpleFileEntry.from(appendTableFiles);
if (!ignoreEmptyCommit
|| !appendTableFiles.isEmpty()
|| !appendChangelog.isEmpty()
- || !appendHashIndexFiles.isEmpty()) {
+ || !appendIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 1:
// Read manifest entries from changed partitions here and
check for conflicts.
@@ -322,6 +320,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// This optimization is mainly used to decrease the number of
times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot();
+ boolean hasDelete = hasDelete(appendSimpleEntries,
appendIndexFiles);
+ Snapshot.CommitKind commitKind =
+ hasDelete ? Snapshot.CommitKind.OVERWRITE :
Snapshot.CommitKind.APPEND;
+
if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact
changes,
// so we need to contain all changes
@@ -332,7 +334,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
latestSnapshot.commitUser(),
baseEntries,
appendSimpleEntries,
- Snapshot.CommitKind.APPEND);
+ commitKind);
safeLatestSnapshotId = latestSnapshot.id();
}
@@ -340,20 +342,20 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
tryCommit(
appendTableFiles,
appendChangelog,
- appendHashIndexFiles,
+ appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
committable.properties(),
- Snapshot.CommitKind.APPEND,
- noConflictCheck(),
+ commitKind,
+ hasDelete ? mustConflictCheck() :
noConflictCheck(),
null);
generatedSnapshot += 1;
}
if (!compactTableFiles.isEmpty()
|| !compactChangelog.isEmpty()
- || !compactDvIndexFiles.isEmpty()) {
+ || !compactIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 2:
// Add appendChanges to the manifest entries read above and
check for conflicts.
@@ -376,7 +378,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
tryCommit(
compactTableFiles,
compactChangelog,
- compactDvIndexFiles,
+ compactIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
@@ -426,8 +428,23 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
commitMetrics.reportCommit(commitStats);
}
+ private boolean hasDelete(
+ List<SimpleFileEntry> appendSimpleEntries,
List<IndexManifestEntry> appendIndexFiles) {
+ for (SimpleFileEntry appendSimpleEntry : appendSimpleEntries) {
+ if (appendSimpleEntry.kind().equals(FileKind.DELETE)) {
+ return true;
+ }
+ }
+ for (IndexManifestEntry appendIndexFile : appendIndexFiles) {
+ if
(appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
- public int overwrite(
+ public int overwritePartition(
Map<String, String> partition,
ManifestCommittable committable,
Map<String, String> properties) {
@@ -448,18 +465,18 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
int attempts = 0;
List<ManifestEntry> appendTableFiles = new ArrayList<>();
List<ManifestEntry> appendChangelog = new ArrayList<>();
+ List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
List<ManifestEntry> compactChangelog = new ArrayList<>();
- List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
- List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
+ List<IndexManifestEntry> compactIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
+ appendIndexFiles,
compactTableFiles,
compactChangelog,
- appendHashIndexFiles,
- compactDvIndexFiles);
+ compactIndexFiles);
if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
StringBuilder warnMessage =
@@ -493,7 +510,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
partitionFilter =
PartitionPredicate.fromMultiple(partitionType, partitions);
}
} else {
- // partition may be partial partition fields, so here must to
use predicate way.
+ // partition may be partial partition fields, so here must use
predicate way.
Predicate partitionPredicate =
createPartitionPredicate(partition, partitionType,
partitionDefaultName);
partitionFilter =
@@ -516,10 +533,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// overwrite new files
if (!skipOverwrite) {
attempts +=
- tryOverwrite(
+ tryOverwritePartition(
partitionFilter,
appendTableFiles,
- appendHashIndexFiles,
+ appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
@@ -527,12 +544,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
generatedSnapshot += 1;
}
- if (!compactTableFiles.isEmpty() ||
!compactDvIndexFiles.isEmpty()) {
+ if (!compactTableFiles.isEmpty() || !compactIndexFiles.isEmpty()) {
attempts +=
tryCommit(
compactTableFiles,
emptyList(),
- compactDvIndexFiles,
+ compactIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
@@ -590,7 +607,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
partitionFilter = PartitionPredicate.fromPredicate(partitionType,
predicate);
}
- tryOverwrite(
+ tryOverwritePartition(
partitionFilter,
emptyList(),
emptyList(),
@@ -602,7 +619,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Override
public void truncateTable(long commitIdentifier) {
- tryOverwrite(
+ tryOverwritePartition(
null,
emptyList(),
emptyList(),
@@ -666,10 +683,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
List<CommitMessage> commitMessages,
List<ManifestEntry> appendTableFiles,
List<ManifestEntry> appendChangelog,
+ List<IndexManifestEntry> appendIndexFiles,
List<ManifestEntry> compactTableFiles,
List<ManifestEntry> compactChangelog,
- List<IndexManifestEntry> appendHashIndexFiles,
- List<IndexManifestEntry> compactDvIndexFiles) {
+ List<IndexManifestEntry> compactIndexFiles) {
for (CommitMessage message : commitMessages) {
CommitMessageImpl commitMessage = (CommitMessageImpl) message;
commitMessage
@@ -687,6 +704,29 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.newFilesIncrement()
.changelogFiles()
.forEach(m -> appendChangelog.add(makeEntry(FileKind.ADD,
commitMessage, m)));
+ commitMessage
+ .newFilesIncrement()
+ .newIndexFiles()
+ .forEach(
+ m ->
+ appendIndexFiles.add(
+ new IndexManifestEntry(
+ FileKind.ADD,
+ commitMessage.partition(),
+ commitMessage.bucket(),
+ m)));
+ commitMessage
+ .newFilesIncrement()
+ .deletedIndexFiles()
+ .forEach(
+ m ->
+ appendIndexFiles.add(
+ new IndexManifestEntry(
+ FileKind.DELETE,
+ commitMessage.partition(),
+ commitMessage.bucket(),
+ m)));
+
commitMessage
.compactIncrement()
.compactBefore()
@@ -702,53 +742,28 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.compactIncrement()
.changelogFiles()
.forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD,
commitMessage, m)));
-
- // todo: split them
- List<IndexFileMeta> newIndexFiles =
- new
ArrayList<>(commitMessage.newFilesIncrement().newIndexFiles());
-
newIndexFiles.addAll(commitMessage.compactIncrement().newIndexFiles());
- newIndexFiles.forEach(
- f -> {
- switch (f.indexType()) {
- case HASH_INDEX:
- appendHashIndexFiles.add(
- new IndexManifestEntry(
- FileKind.ADD,
- commitMessage.partition(),
- commitMessage.bucket(),
- f));
- break;
- case DELETION_VECTORS_INDEX:
- compactDvIndexFiles.add(
- new IndexManifestEntry(
- FileKind.ADD,
- commitMessage.partition(),
- commitMessage.bucket(),
- f));
- break;
- default:
- throw new RuntimeException("Unknown index
type: " + f.indexType());
- }
- });
-
- // todo: split them
- List<IndexFileMeta> deletedIndexFiles =
- new
ArrayList<>(commitMessage.newFilesIncrement().deletedIndexFiles());
-
deletedIndexFiles.addAll(commitMessage.compactIncrement().deletedIndexFiles());
- deletedIndexFiles.forEach(
- f -> {
- if (f.indexType().equals(DELETION_VECTORS_INDEX)) {
- compactDvIndexFiles.add(
- new IndexManifestEntry(
- FileKind.DELETE,
- commitMessage.partition(),
- commitMessage.bucket(),
- f));
- } else {
- throw new RuntimeException(
- "This index type is not supported to
delete: " + f.indexType());
- }
- });
+ commitMessage
+ .compactIncrement()
+ .newIndexFiles()
+ .forEach(
+ m ->
+ compactIndexFiles.add(
+ new IndexManifestEntry(
+ FileKind.ADD,
+ commitMessage.partition(),
+ commitMessage.bucket(),
+ m)));
+ commitMessage
+ .compactIncrement()
+ .deletedIndexFiles()
+ .forEach(
+ m ->
+ compactIndexFiles.add(
+ new IndexManifestEntry(
+ FileKind.DELETE,
+ commitMessage.partition(),
+ commitMessage.bucket(),
+ m)));
}
if (!commitMessages.isEmpty()) {
List<String> msg = new ArrayList<>();
@@ -758,17 +773,17 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
if (!appendChangelog.isEmpty()) {
msg.add(appendChangelog.size() + " append Changelogs");
}
+ if (!appendIndexFiles.isEmpty()) {
+ msg.add(appendIndexFiles.size() + " append index files");
+ }
if (!compactTableFiles.isEmpty()) {
msg.add(compactTableFiles.size() + " compact table files");
}
if (!compactChangelog.isEmpty()) {
msg.add(compactChangelog.size() + " compact Changelogs");
}
- if (!appendHashIndexFiles.isEmpty()) {
- msg.add(appendHashIndexFiles.size() + " append hash index
files");
- }
- if (!compactDvIndexFiles.isEmpty()) {
- msg.add(compactDvIndexFiles.size() + " compact dv index
files");
+ if (!compactIndexFiles.isEmpty()) {
+ msg.add(compactIndexFiles.size() + " compact index files");
}
LOG.info("Finished collecting changes, including: {}",
String.join(", ", msg));
}
@@ -836,7 +851,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return retryCount + 1;
}
- private int tryOverwrite(
+ /**
+ * Try to overwrite partition.
+ *
+ * @param partitionFilter Partition filter indicating which partitions to
overwrite, if {@code
+ * null}, overwrites the entire table.
+ */
+ private int tryOverwritePartition(
@Nullable PartitionPredicate partitionFilter,
List<ManifestEntry> changes,
List<IndexManifestEntry> indexFiles,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index e549bd2e2c..7f19c7443c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -248,7 +248,8 @@ public class TableCommitImpl implements InnerTableCommit {
committable = new ManifestCommittable(Long.MAX_VALUE);
}
int newSnapshots =
- commit.overwrite(overwritePartition, committable,
Collections.emptyMap());
+ commit.overwritePartition(
+ overwritePartition, committable,
Collections.emptyMap());
maintain(
committable.identifier(),
maintainExecutor,
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index dc1702c264..aa405060f4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -263,7 +263,7 @@ public class TestFileStore extends KeyValueFileStore {
null,
Collections.emptyList(),
(commit, committable) ->
- commit.overwrite(partition, committable,
Collections.emptyMap()));
+ commit.overwritePartition(partition, committable,
Collections.emptyMap()));
}
public Snapshot dropPartitions(List<Map<String, String>> partitions) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 8e794fe749..0bc6e041ab 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -154,13 +154,13 @@ public class FileDeletionTest {
FileStoreCommitImpl commit = store.newCommit();
Map<String, String> partitionSpec = new HashMap<>();
partitionSpec.put("dt", "0401");
- commit.overwrite(
+ commit.overwritePartition(
partitionSpec, new ManifestCommittable(commitIdentifier++),
Collections.emptyMap());
// step 3: generate snapshot 3 by cleaning partition dt=0402/hr=10
partitionSpec.put("dt", "0402");
partitionSpec.put("hr", "8");
- commit.overwrite(
+ commit.overwritePartition(
partitionSpec, new ManifestCommittable(commitIdentifier++),
Collections.emptyMap());
commit.close();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index 2f409a9176..cdd2d8fba1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -169,7 +169,7 @@ public class TestCommitThread extends Thread {
runWithRetry(
committable,
() ->
- commit.overwrite(
+ commit.overwritePartition(
TestKeyValueGenerator.toPartitionMap(partition, MULTI_PARTITIONED),
committable,
Collections.emptyMap()));
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index f1f0d8c065..5ab343c41c 100644
---
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -32,8 +32,10 @@ class MergeIntoPrimaryKeyNonBucketTableTest
class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
with PaimonAppendBucketedTableTest {}
class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
with PaimonAppendNonBucketTableTest {}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index f1f0d8c065..5ab343c41c 100644
---
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -32,8 +32,10 @@ class MergeIntoPrimaryKeyNonBucketTableTest
class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
with PaimonAppendBucketedTableTest {}
class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
with PaimonAppendNonBucketTableTest {}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index e1cfe3a396..b9a85b147e 100644
---
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest
class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendBucketedTableTest {}
class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendNonBucketTableTest {}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index e1cfe3a396..b9a85b147e 100644
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest
class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendBucketedTableTest {}
class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendNonBucketTableTest {}
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index e1cfe3a396..b9a85b147e 100644
---
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest
class MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendBucketedTableTest {}
class MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
+ with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendNonBucketTableTest {}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 6b38c1f5f0..ea05d94dd3 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.sql
-import org.apache.paimon.CoreOptions
+import org.apache.paimon.{CoreOptions, Snapshot}
import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.spark.PaimonSparkTestBase
@@ -435,4 +435,40 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
}
}
}
+
+ test("Paimon delete: non pk table commit kind") {
+ for (dvEnabled <- Seq(true, false)) {
+ withTable("t") {
+ sql(
+ s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('deletion-vectors.enabled' = '$dvEnabled')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM
range(1, 4)")
+
+ sql("DELETE FROM t WHERE id = 1")
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(2, 2), Row(3,
3)))
+ val table = loadTable("t")
+ var latestSnapshot = table.latestSnapshot().get()
+ assert(latestSnapshot.id == 2)
+ assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
+
+ sql("DELETE FROM t WHERE id = 2")
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(3, 3)))
+ latestSnapshot = table.latestSnapshot().get()
+ assert(latestSnapshot.id == 3)
+ assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
+ }
+ }
+ }
+
+ test("Paimon delete: pk dv table commit kind") {
+ withTable("t") {
+ sql(
+ s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('deletion-vectors.enabled' = 'true', 'primary-key' = 'id')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM
range(1, 4)")
+ sql("DELETE FROM t WHERE id = 1")
+ val table = loadTable("t")
+ val latestSnapshot = table.latestSnapshot().get()
+ assert(latestSnapshot.id == 4)
+ assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT))
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index 4564e1646c..c867aed9f0 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -18,10 +18,15 @@
package org.apache.paimon.spark.sql
-import org.apache.paimon.spark.{PaimonPrimaryKeyTable, PaimonSparkTestBase,
PaimonTableTest}
+import org.apache.paimon.Snapshot
+import org.apache.paimon.spark.{PaimonAppendTable, PaimonPrimaryKeyTable,
PaimonSparkTestBase, PaimonTableTest}
import org.apache.spark.sql.Row
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.DurationInt
+
abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with
PaimonTableTest {
import testImplicits._
@@ -714,3 +719,87 @@ trait MergeIntoPrimaryKeyTableTest extends
PaimonSparkTestBase with PaimonPrimar
}
}
}
+
+trait MergeIntoAppendTableTest extends PaimonSparkTestBase with
PaimonAppendTable {
+
+ test("Paimon MergeInto: non pk table commit kind") {
+ withTable("s", "t") {
+ createTable("s", "id INT, b INT, c INT", Seq("id"))
+ sql("INSERT INTO s VALUES (1, 1, 1)")
+
+ createTable("t", "id INT, b INT, c INT", Seq("id"))
+ sql("INSERT INTO t VALUES (2, 2, 2)")
+
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN NOT MATCHED THEN
+ |INSERT (id, b, c) VALUES (s.id, s.b, s.c);
+ |""".stripMargin)
+
+ val table = loadTable("t")
+ var latestSnapshot = table.latestSnapshot().get()
+ assert(latestSnapshot.id == 2)
+ // no old data is deleted, so the commit kind is APPEND
+ assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.APPEND))
+
+ sql("INSERT INTO s VALUES (2, 22, 22)")
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN
+ |UPDATE SET id = s.id, b = s.b, c = s.c
+ |""".stripMargin)
+ latestSnapshot = table.latestSnapshot().get()
+ assert(latestSnapshot.id == 3)
+ // new data is updated, so the commit kind is OVERWRITE
+ assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
+ }
+ }
+
+ test("Paimon MergeInto: concurrent merge and compact") {
+ withTable("s", "t") {
+ sql("CREATE TABLE s (id INT, b INT, c INT)")
+ sql("INSERT INTO s VALUES (1, 1, 1)")
+
+ sql("CREATE TABLE t (id INT, b INT, c INT)")
+ sql("INSERT INTO t VALUES (1, 1, 1)")
+
+ val mergeInto = Future {
+ for (_ <- 1 to 10) {
+ try {
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN
+ |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
+ |""".stripMargin)
+ } catch {
+ case a: Throwable =>
+ assert(
+ a.getMessage.contains("Conflicts during commits") ||
a.getMessage.contains(
+ "Missing file"))
+ }
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
+ }
+ }
+
+ val compact = Future {
+ for (_ <- 1 to 10) {
+ try {
+ sql("CALL sys.compact(table => 't', order_strategy => 'order',
order_by => 'id')")
+ } catch {
+ case a: Throwable => assert(a.getMessage.contains("Conflicts
during commits"))
+ }
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
+ }
+ }
+
+ Await.result(mergeInto, 60.seconds)
+ Await.result(compact, 60.seconds)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
index 5cbefe9faa..097d2c4e14 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.sql
-import org.apache.paimon.CoreOptions
+import org.apache.paimon.{CoreOptions, Snapshot}
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.analysis.Update
@@ -357,4 +357,40 @@ abstract class UpdateTableTestBase extends
PaimonSparkTestBase {
sql("UPDATE T SET c = 'b' WHERE id = 1")
checkAnswer(sql("SELECT * FROM T"), Seq(Row(1, "s", "b")))
}
+
+ test("Paimon update: non pk table commit kind") {
+ for (dvEnabled <- Seq(true, false)) {
+ withTable("t") {
+ sql(
+ s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('deletion-vectors.enabled' = '$dvEnabled')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM
range(1, 4)")
+
+ sql("UPDATE t SET data = 111 WHERE id = 1")
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 111),
Row(2, 2), Row(3, 3)))
+ val table = loadTable("t")
+ var latestSnapshot = table.latestSnapshot().get()
+ assert(latestSnapshot.id == 2)
+ assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
+
+ sql("UPDATE t SET data = 222 WHERE id = 2")
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 111),
Row(2, 222), Row(3, 3)))
+ latestSnapshot = table.latestSnapshot().get()
+ assert(latestSnapshot.id == 3)
+ assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
+ }
+ }
+ }
+
+ test("Paimon update: pk dv table commit kind") {
+ withTable("t") {
+ sql(
+ s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('deletion-vectors.enabled' = 'true', 'primary-key' = 'id')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM
range(1, 4)")
+ sql("UPDATE t SET data = 111 WHERE id = 1")
+ val table = loadTable("t")
+ val latestSnapshot = table.latestSnapshot().get()
+ assert(latestSnapshot.id == 4)
+ assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT))
+ }
+ }
}