This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 44dfecd  [FLINK-25876] Implement overwrite in FlinkStoreCommitImpl
44dfecd is described below

commit 44dfecd56695d9575e75b0b63a49aa8211ecde9f
Author: tsreaper <tsreape...@gmail.com>
AuthorDate: Thu Feb 17 13:36:47 2022 +0800

    [FLINK-25876] Implement overwrite in FlinkStoreCommitImpl
    
    This closes #16
---
 .../apache/flink/table/store/file/Snapshot.java    |  16 +-
 .../store/file/manifest/ManifestCommittable.java   |  23 +-
 .../manifest/ManifestCommittableSerializer.java    |   6 +-
 .../store/file/manifest/ManifestFileMeta.java      |  29 +-
 .../table/store/file/manifest/ManifestList.java    |   4 -
 .../store/file/operation/FileStoreCommit.java      |   4 +-
 .../store/file/operation/FileStoreCommitImpl.java  | 326 ++++++++++++---------
 .../flink/table/store/file/utils/TypeUtils.java    |  93 ++++++
 .../table/store/file/TestKeyValueGenerator.java    |   9 +
 ...ommitTestBase.java => FileStoreCommitTest.java} | 150 +++++++---
 .../store/file/operation/FileStoreExpireTest.java  |   4 +-
 .../store/file/operation/FileStoreScanTest.java    |   4 +-
 .../store/file/operation/OperationTestUtils.java   |  56 +++-
 .../store/file/operation/TestCommitThread.java     | 113 +++++--
 .../src/test/resources/log4j2-test.xml             |  29 ++
 15 files changed, 613 insertions(+), 253 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index e02fa0d..9b9085b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -37,7 +37,7 @@ public class Snapshot {
     private static final String FIELD_ID = "id";
     private static final String FIELD_MANIFEST_LIST = "manifestList";
     private static final String FIELD_COMMIT_USER = "commitUser";
-    private static final String FIELD_COMMIT_DIGEST = "commitDigest";
+    private static final String FIELD_COMMIT_UUID = "commitUuid";
     private static final String FIELD_COMMIT_KIND = "commitKind";
     private static final String FIELD_TIME_MILLIS = "timeMillis";
 
@@ -51,8 +51,8 @@ public class Snapshot {
     private final String commitUser;
 
     // for deduplication
-    @JsonProperty(FIELD_COMMIT_DIGEST)
-    private final String commitDigest;
+    @JsonProperty(FIELD_COMMIT_UUID)
+    private final String commitUuid;
 
     @JsonProperty(FIELD_COMMIT_KIND)
     private final CommitKind commitKind;
@@ -65,13 +65,13 @@ public class Snapshot {
             @JsonProperty(FIELD_ID) long id,
             @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
             @JsonProperty(FIELD_COMMIT_USER) String commitUser,
-            @JsonProperty(FIELD_COMMIT_DIGEST) String commitDigest,
+            @JsonProperty(FIELD_COMMIT_UUID) String commitUuid,
             @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
             @JsonProperty(FIELD_TIME_MILLIS) long timeMillis) {
         this.id = id;
         this.manifestList = manifestList;
         this.commitUser = commitUser;
-        this.commitDigest = commitDigest;
+        this.commitUuid = commitUuid;
         this.commitKind = commitKind;
         this.timeMillis = timeMillis;
     }
@@ -91,9 +91,9 @@ public class Snapshot {
         return commitUser;
     }
 
-    @JsonGetter(FIELD_COMMIT_DIGEST)
-    public String commitDigest() {
-        return commitDigest;
+    @JsonGetter(FIELD_COMMIT_UUID)
+    public String commitUuid() {
+        return commitUuid;
     }
 
     @JsonGetter(FIELD_COMMIT_KIND)
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
index 4fcd763..765fe1a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -27,26 +27,26 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 
 /** Manifest commit message. */
 public class ManifestCommittable {
 
+    private final String uuid;
     private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles;
-
     private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
compactBefore;
-
     private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
compactAfter;
 
     public ManifestCommittable() {
-        this.newFiles = new HashMap<>();
-        this.compactBefore = new HashMap<>();
-        this.compactAfter = new HashMap<>();
+        this(UUID.randomUUID().toString(), new HashMap<>(), new HashMap<>(), 
new HashMap<>());
     }
 
     public ManifestCommittable(
+            String uuid,
             Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles,
             Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore,
             Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter) {
+        this.uuid = uuid;
         this.newFiles = newFiles;
         this.compactBefore = compactBefore;
         this.compactAfter = compactAfter;
@@ -68,6 +68,10 @@ public class ManifestCommittable {
                 .addAll(files);
     }
 
+    public String uuid() {
+        return uuid;
+    }
+
     public Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles() {
         return newFiles;
     }
@@ -89,19 +93,22 @@ public class ManifestCommittable {
             return false;
         }
         ManifestCommittable that = (ManifestCommittable) o;
-        return Objects.equals(newFiles, that.newFiles)
+        return Objects.equals(uuid, that.uuid)
+                && Objects.equals(newFiles, that.newFiles)
                 && Objects.equals(compactBefore, that.compactBefore)
                 && Objects.equals(compactAfter, that.compactAfter);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(newFiles, compactBefore, compactAfter);
+        return Objects.hash(uuid, newFiles, compactBefore, compactAfter);
     }
 
     @Override
     public String toString() {
-        return "new files:\n"
+        return "uuid: "
+                + uuid
+                + "\nnew files:\n"
                 + filesToString(newFiles)
                 + "compact before:\n"
                 + filesToString(compactBefore)
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
index 65e18b4..81e4e1f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -55,6 +55,7 @@ public class ManifestCommittableSerializer
     public byte[] serialize(ManifestCommittable obj) throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+        view.writeUTF(obj.uuid());
         serializeFiles(view, obj.newFiles());
         serializeFiles(view, obj.compactBefore());
         serializeFiles(view, obj.compactAfter());
@@ -105,6 +106,9 @@ public class ManifestCommittableSerializer
     public ManifestCommittable deserialize(int version, byte[] serialized) 
throws IOException {
         DataInputDeserializer view = new DataInputDeserializer(serialized);
         return new ManifestCommittable(
-                deserializeFiles(view), deserializeFiles(view), 
deserializeFiles(view));
+                view.readUTF(),
+                deserializeFiles(view),
+                deserializeFiles(view),
+                deserializeFiles(view));
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index cc883d3..d6deadd 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -32,6 +32,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 /** Metadata of a manifest file. */
 public class ManifestFileMeta {
@@ -148,23 +149,29 @@ public class ManifestFileMeta {
                 candidate.add(manifest);
                 if (totalSize >= suggestedMetaSize) {
                     // reach suggested file size, perform merging and produce 
new file
-                    ManifestFileMeta merged;
                     if (candidate.size() == 1) {
-                        merged = candidate.get(0);
+                        result.add(candidate.get(0));
                     } else {
-                        merged = mergeIntoOneFile(candidate, 
Collections.emptyList(), manifestFile);
-                        newMetas.add(merged);
+                        mergeIntoOneFile(candidate, Collections.emptyList(), 
manifestFile)
+                                .ifPresent(
+                                        merged -> {
+                                            newMetas.add(merged);
+                                            result.add(merged);
+                                        });
                     }
-                    result.add(merged);
+
                     candidate.clear();
                     totalSize = 0;
                 }
             }
 
             // merge the last bit of metas with entries
-            ManifestFileMeta merged = mergeIntoOneFile(candidate, entries, 
manifestFile);
-            newMetas.add(merged);
-            result.add(merged);
+            mergeIntoOneFile(candidate, entries, manifestFile)
+                    .ifPresent(
+                            merged -> {
+                                newMetas.add(merged);
+                                result.add(merged);
+                            });
         } catch (Throwable e) {
             // exception occurs, clean up and rethrow
             for (ManifestFileMeta manifest : newMetas) {
@@ -176,14 +183,16 @@ public class ManifestFileMeta {
         return result;
     }
 
-    private static ManifestFileMeta mergeIntoOneFile(
+    private static Optional<ManifestFileMeta> mergeIntoOneFile(
             List<ManifestFileMeta> metas, List<ManifestEntry> entries, 
ManifestFile manifestFile) {
         Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
         for (ManifestFileMeta manifest : metas) {
             mergeEntries(manifestFile.read(manifest.fileName), map);
         }
         mergeEntries(entries, map);
-        return manifestFile.write(new ArrayList<>(map.values()));
+        return map.isEmpty()
+                ? Optional.empty()
+                : Optional.of(manifestFile.write(new 
ArrayList<>(map.values())));
     }
 
     private static void mergeEntries(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index 8a9707b..bde36a9 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.List;
@@ -71,9 +70,6 @@ public class ManifestList {
      * <p>NOTE: This method is atomic.
      */
     public String write(List<ManifestFileMeta> metas) {
-        Preconditions.checkArgument(
-                metas.size() > 0, "Manifest file metas to write must not be 
empty.");
-
         Path path = pathFactory.newManifestList();
         try {
             return write(metas, path);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
index 912da92..87d1407 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
@@ -39,7 +39,9 @@ public interface FileStoreCommit {
      * Overwrite from manifest committable and partition.
      *
      * @param partition A single partition maps each partition key to a 
partition value. Depending
-     *     on the * user-defined statement, the partition might not include 
all partition keys.
+     *     on the user-defined statement, the partition might not include all 
partition keys. Also
+     *     note that this partition does not necessarily equal to the 
partitions of the newly added
+     *     key-values. This is just the partition to be cleaned up.
      */
     void overwrite(
             Map<String, String> partition,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 7c56acd..cb854ef 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -25,25 +25,24 @@ import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.store.file.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -72,7 +71,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitImpl.class);
 
     private final String commitUser;
-    private final ManifestCommittableSerializer committableSerializer;
+    private final RowType partitionType;
+    private final RowDataToObjectArrayConverter partitionObjectConverter;
     private final FileStorePathFactory pathFactory;
     private final ManifestFile manifestFile;
     private final ManifestList manifestList;
@@ -83,14 +83,15 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
     public FileStoreCommitImpl(
             String commitUser,
-            ManifestCommittableSerializer committableSerializer,
+            RowType partitionType,
             FileStorePathFactory pathFactory,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             FileStoreScan scan,
             FileStoreOptions fileStoreOptions) {
         this.commitUser = commitUser;
-        this.committableSerializer = committableSerializer;
+        this.partitionType = partitionType;
+        this.partitionObjectConverter = new 
RowDataToObjectArrayConverter(partitionType);
         this.pathFactory = pathFactory;
         this.manifestFile = manifestFileFactory.create();
         this.manifestList = manifestListFactory.create();
@@ -108,29 +109,24 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
     @Override
     public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committableList) {
-        committableList = new ArrayList<>(committableList);
-
-        // filter out commits with no new files
-        committableList.removeIf(committable -> 
committable.newFiles().isEmpty());
-
         // if there is no previous snapshots then nothing should be filtered
         Long latestSnapshotId = pathFactory.latestSnapshotId();
         if (latestSnapshotId == null) {
             return committableList;
         }
 
-        // check if a committable is already committed by its hash
-        Map<String, ManifestCommittable> digests = new LinkedHashMap<>();
+        // check if a committable is already committed by its uuid
+        Map<String, ManifestCommittable> uuids = new LinkedHashMap<>();
         for (ManifestCommittable committable : committableList) {
-            digests.put(digestManifestCommittable(committable), committable);
+            uuids.put(committable.uuid(), committable);
         }
 
         for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; 
id--) {
             Path snapshotPath = pathFactory.toSnapshotPath(id);
             Snapshot snapshot = Snapshot.fromPath(snapshotPath);
             if (commitUser.equals(snapshot.commitUser())) {
-                if (digests.containsKey(snapshot.commitDigest())) {
-                    digests.remove(snapshot.commitDigest());
+                if (uuids.containsKey(snapshot.commitUuid())) {
+                    uuids.remove(snapshot.commitUuid());
                 } else {
                     // early exit, because committableList must be the latest 
commits by this
                     // commit user
@@ -139,7 +135,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             }
         }
 
-        return new ArrayList<>(digests.values());
+        return new ArrayList<>(uuids.values());
     }
 
     @Override
@@ -148,18 +144,14 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             LOG.debug("Ready to commit\n" + committable.toString());
         }
 
-        String hash = digestManifestCommittable(committable);
-
         List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
-        if (!appendChanges.isEmpty()) {
-            tryCommit(appendChanges, hash, Snapshot.CommitKind.APPEND);
-        }
+        tryCommit(appendChanges, committable.uuid(), 
Snapshot.CommitKind.APPEND, false);
 
         List<ManifestEntry> compactChanges = new ArrayList<>();
         compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
         compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
         if (!compactChanges.isEmpty()) {
-            tryCommit(compactChanges, hash, Snapshot.CommitKind.COMPACT);
+            tryCommit(compactChanges, committable.uuid(), 
Snapshot.CommitKind.COMPACT, true);
         }
     }
 
@@ -168,21 +160,82 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             Map<String, String> partition,
             ManifestCommittable committable,
             Map<String, String> properties) {
-        throw new UnsupportedOperationException();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Ready to overwrite partition "
+                            + partition.toString()
+                            + "\n"
+                            + committable.toString());
+        }
+
+        List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
+        // sanity check, all changes must be done within the given partition
+        Predicate partitionFilter = 
TypeUtils.partitionMapToPredicate(partition, partitionType);
+        for (ManifestEntry entry : appendChanges) {
+            if 
(!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
+                throw new IllegalArgumentException(
+                        "Trying to overwrite partition "
+                                + partition.toString()
+                                + ", but the changes in "
+                                + 
pathFactory.getPartitionString(entry.partition())
+                                + " does not belong to this partition");
+            }
+        }
+        // overwrite new files
+        tryOverwrite(
+                partitionFilter, appendChanges, committable.uuid(), 
Snapshot.CommitKind.APPEND);
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, committable.uuid(), 
Snapshot.CommitKind.COMPACT, true);
+        }
     }
 
-    private String digestManifestCommittable(ManifestCommittable committable) {
-        try {
-            return new String(
-                    Base64.getEncoder()
-                            .encode(
-                                    MessageDigest.getInstance("MD5")
-                                            
.digest(committableSerializer.serialize(committable))));
-        } catch (NoSuchAlgorithmException e) {
-            throw new RuntimeException("MD5 algorithm not found. This is 
impossible.", e);
-        } catch (IOException e) {
-            throw new RuntimeException(
-                    "Failed to serialize ManifestCommittable. This is 
unexpected.", e);
+    private void tryCommit(
+            List<ManifestEntry> changes,
+            String hash,
+            Snapshot.CommitKind commitKind,
+            boolean checkDeletedFiles) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            if (tryCommitOnce(changes, hash, commitKind, latestSnapshotId, 
checkDeletedFiles)) {
+                break;
+            }
+        }
+    }
+
+    private void tryOverwrite(
+            Predicate partitionFilter,
+            List<ManifestEntry> changes,
+            String hash,
+            Snapshot.CommitKind commitKind) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+
+            List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
+            if (latestSnapshotId != null) {
+                List<ManifestEntry> currentEntries =
+                        scan.withSnapshot(latestSnapshotId)
+                                .withPartitionFilter(partitionFilter)
+                                .plan()
+                                .files();
+                for (ManifestEntry entry : currentEntries) {
+                    changesWithOverwrite.add(
+                            new ManifestEntry(
+                                    ValueKind.DELETE,
+                                    entry.partition(),
+                                    entry.bucket(),
+                                    entry.totalBuckets(),
+                                    entry.file()));
+                }
+            }
+            changesWithOverwrite.addAll(changes);
+
+            if (tryCommitOnce(changesWithOverwrite, hash, commitKind, 
latestSnapshotId, false)) {
+                break;
+            }
         }
     }
 
@@ -209,114 +262,125 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return changes;
     }
 
-    private void tryCommit(
-            List<ManifestEntry> changes, String hash, Snapshot.CommitKind 
commitKind) {
-        while (true) {
-            Long latestSnapshotId = pathFactory.latestSnapshotId();
-            long newSnapshotId =
-                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshotId + 1;
-            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
-            Path tmpSnapshotPath = 
pathFactory.toTmpSnapshotPath(newSnapshotId);
+    private boolean tryCommitOnce(
+            List<ManifestEntry> changes,
+            String hash,
+            Snapshot.CommitKind commitKind,
+            Long latestSnapshotId,
+            boolean checkDeletedFiles) {
+        long newSnapshotId =
+                latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshotId + 1;
+        Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+        Path tmpSnapshotPath = pathFactory.toTmpSnapshotPath(newSnapshotId);
 
-            Snapshot latestSnapshot = null;
-            if (latestSnapshotId != null) {
-                noConflictsOrFail(latestSnapshotId, changes);
-                latestSnapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit changes to snapshot #" + newSnapshotId);
+            for (ManifestEntry entry : changes) {
+                LOG.debug("  * " + entry.toString());
             }
+        }
 
-            Snapshot newSnapshot;
-            String manifestListName = null;
-            List<ManifestFileMeta> oldMetas = new ArrayList<>();
-            List<ManifestFileMeta> newMetas = new ArrayList<>();
-            try {
-                if (latestSnapshot != null) {
-                    // read all previous manifest files
-                    
oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
-                }
-                // merge manifest files with changes
-                newMetas.addAll(
-                        ManifestFileMeta.merge(
-                                oldMetas,
-                                changes,
-                                manifestFile,
-                                
fileStoreOptions.manifestSuggestedSize.getBytes()));
-                // prepare snapshot file
-                manifestListName = manifestList.write(newMetas);
-                newSnapshot =
-                        new Snapshot(
-                                newSnapshotId,
-                                manifestListName,
-                                commitUser,
-                                hash,
-                                commitKind,
-                                System.currentTimeMillis());
-                FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
-            } catch (Throwable e) {
-                // fails when preparing for commit, we should clean up
-                cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, 
oldMetas, newMetas);
-                throw new RuntimeException(
-                        String.format(
-                                "Exception occurs when preparing snapshot #%d 
(path %s) by user %s "
-                                        + "with hash %s and kind %s. Clean 
up.",
-                                newSnapshotId,
-                                newSnapshotPath.toString(),
-                                commitUser,
-                                hash,
-                                commitKind.name()),
-                        e);
+        Snapshot latestSnapshot = null;
+        if (latestSnapshotId != null) {
+            if (checkDeletedFiles) {
+                noConflictsOrFail(latestSnapshotId, changes);
             }
+            latestSnapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+        }
 
-            boolean success;
-            try {
-                FileSystem fs = tmpSnapshotPath.getFileSystem();
-                // atomic rename
-                if (lock != null) {
-                    success =
-                            lock.runWithLock(
-                                    () ->
-                                            // fs.rename may not returns false 
if target file
-                                            // already exists, or even not 
atomic
-                                            // as we're relying on external 
locking, we can first
-                                            // check if file exist then rename 
to work around this
-                                            // case
-                                            !fs.exists(newSnapshotPath)
-                                                    && 
fs.rename(tmpSnapshotPath, newSnapshotPath));
-                } else {
-                    success = fs.rename(tmpSnapshotPath, newSnapshotPath);
-                }
-            } catch (Throwable e) {
-                // exception when performing the atomic rename,
-                // we cannot clean up because we can't determine the success
-                throw new RuntimeException(
-                        String.format(
-                                "Exception occurs when committing snapshot #%d 
(path %s) by user %s "
-                                        + "with hash %s and kind %s. "
-                                        + "Cannot clean up because we can't 
determine the success.",
-                                newSnapshotId,
-                                newSnapshotPath.toString(),
-                                commitUser,
-                                hash,
-                                commitKind.name()),
-                        e);
+        Snapshot newSnapshot;
+        String manifestListName = null;
+        List<ManifestFileMeta> oldMetas = new ArrayList<>();
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        try {
+            if (latestSnapshot != null) {
+                // read all previous manifest files
+                
oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
             }
+            // merge manifest files with changes
+            newMetas.addAll(
+                    ManifestFileMeta.merge(
+                            oldMetas,
+                            changes,
+                            manifestFile,
+                            
fileStoreOptions.manifestSuggestedSize.getBytes()));
+            // prepare snapshot file
+            manifestListName = manifestList.write(newMetas);
+            newSnapshot =
+                    new Snapshot(
+                            newSnapshotId,
+                            manifestListName,
+                            commitUser,
+                            hash,
+                            commitKind,
+                            System.currentTimeMillis());
+            FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
+        } catch (Throwable e) {
+            // fails when preparing for commit, we should clean up
+            cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
+            throw new RuntimeException(
+                    String.format(
+                            "Exception occurs when preparing snapshot #%d 
(path %s) by user %s "
+                                    + "with hash %s and kind %s. Clean up.",
+                            newSnapshotId,
+                            newSnapshotPath.toString(),
+                            commitUser,
+                            hash,
+                            commitKind.name()),
+                    e);
+        }
 
-            if (success) {
-                return;
+        boolean success;
+        try {
+            FileSystem fs = tmpSnapshotPath.getFileSystem();
+            // atomic rename
+            if (lock != null) {
+                success =
+                        lock.runWithLock(
+                                () ->
+                                        // fs.rename may not returns false if 
target file
+                                        // already exists, or even not atomic
+                                        // as we're relying on external 
locking, we can first
+                                        // check if file exist then rename to 
work around this
+                                        // case
+                                        !fs.exists(newSnapshotPath)
+                                                && fs.rename(tmpSnapshotPath, 
newSnapshotPath));
+            } else {
+                success = fs.rename(tmpSnapshotPath, newSnapshotPath);
             }
-
-            // atomic rename fails, clean up and try again
-            LOG.warn(
+        } catch (Throwable e) {
+            // exception when performing the atomic rename,
+            // we cannot clean up because we can't determine the success
+            throw new RuntimeException(
                     String.format(
-                            "Atomic rename failed for snapshot #%d (path %s) 
by user %s "
+                            "Exception occurs when committing snapshot #%d 
(path %s) by user %s "
                                     + "with hash %s and kind %s. "
-                                    + "Clean up and try again.",
+                                    + "Cannot clean up because we can't 
determine the success.",
                             newSnapshotId,
                             newSnapshotPath.toString(),
                             commitUser,
                             hash,
-                            commitKind.name()));
-            cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
+                            commitKind.name()),
+                    e);
+        }
+
+        if (success) {
+            return true;
         }
+
+        // atomic rename fails, clean up and try again
+        LOG.warn(
+                String.format(
+                        "Atomic rename failed for snapshot #%d (path %s) by 
user %s "
+                                + "with hash %s and kind %s. "
+                                + "Clean up and try again.",
+                        newSnapshotId,
+                        newSnapshotPath.toString(),
+                        commitUser,
+                        hash,
+                        commitKind.name()));
+        cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
+        return false;
     }
 
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> 
changes) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
new file mode 100644
index 0000000..a27542c
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.Map;
+
+/** Utils for parsing among different types. */
+public class TypeUtils {
+
+    public static Predicate partitionMapToPredicate(
+            Map<String, String> partition, RowType partitionType) {
+        List<String> fieldNames = partitionType.getFieldNames();
+        Predicate predicate = null;
+        for (Map.Entry<String, String> entry : partition.entrySet()) {
+            int idx = fieldNames.indexOf(entry.getKey());
+            LogicalType type = partitionType.getTypeAt(idx);
+            Literal literal = new Literal(type, 
castFromString(entry.getValue(), type));
+            if (predicate == null) {
+                predicate = new Equal(idx, literal);
+            } else {
+                predicate = new And(predicate, new Equal(idx, literal));
+            }
+        }
+        return predicate;
+    }
+
+    private static Object castFromString(String s, LogicalType type) {
+        BinaryStringData str = BinaryStringData.fromString(s);
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                return str;
+            case BOOLEAN:
+                return BinaryStringDataUtil.toBoolean(str);
+            case BINARY:
+            case VARBINARY:
+                // this implementation does not match the new behavior of 
StringToBinaryCastRule,
+                // change this if needed
+                return s.getBytes();
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) type;
+                return BinaryStringDataUtil.toDecimal(
+                        str, decimalType.getPrecision(), 
decimalType.getScale());
+            case TINYINT:
+                return BinaryStringDataUtil.toByte(str);
+            case SMALLINT:
+                return BinaryStringDataUtil.toShort(str);
+            case INTEGER:
+                return BinaryStringDataUtil.toInt(str);
+            case BIGINT:
+                return BinaryStringDataUtil.toLong(str);
+            case FLOAT:
+                return BinaryStringDataUtil.toFloat(str);
+            case DOUBLE:
+                return BinaryStringDataUtil.toDouble(str);
+            case DATE:
+                return BinaryStringDataUtil.toDate(str);
+            case TIME_WITHOUT_TIME_ZONE:
+                return BinaryStringDataUtil.toTime(str);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return BinaryStringDataUtil.toTimestamp(str);
+            default:
+                throw new UnsupportedOperationException("Unsupported type " + 
type.toString());
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
index 3f6ad2f..bb73ea7 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
@@ -34,7 +34,9 @@ import org.apache.flink.table.types.logical.VarCharType;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 /** Random {@link KeyValue} generator. */
@@ -159,6 +161,13 @@ public class TestKeyValueGenerator {
                 .copy();
     }
 
+    public static Map<String, String> toPartitionMap(BinaryRowData partition) {
+        Map<String, String> map = new HashMap<>();
+        map.put("dt", partition.getString(0).toString());
+        map.put("hr", String.valueOf(partition.getInt(1)));
+        return map;
+    }
+
     public void sort(List<KeyValue> kvs) {
         kvs.sort(
                 (a, b) -> {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
similarity index 57%
rename from 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
rename to 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 0851ff4..927e211 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -30,8 +30,10 @@ import 
org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,13 +50,13 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link FileStoreCommitImpl}. */
-public abstract class FileStoreCommitTestBase {
+public class FileStoreCommitTest {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitTestBase.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitTest.class);
 
     private final FileFormat avro =
             FileFormat.fromIdentifier(
-                    FileStoreCommitTestBase.class.getClassLoader(), "avro", 
new Configuration());
+                    FileStoreCommitTest.class.getClassLoader(), "avro", new 
Configuration());
 
     private TestKeyValueGenerator gen;
     @TempDir java.nio.file.Path tempDir;
@@ -64,21 +66,26 @@ public abstract class FileStoreCommitTestBase {
         gen = new TestKeyValueGenerator();
         Path root = new Path(tempDir.toString());
         root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+        // for failure tests
+        FailingAtomicRenameFileSystem.resetFailCounter(100);
+        FailingAtomicRenameFileSystem.setFailPossibility(5000);
     }
 
-    protected abstract String getScheme();
-
-    @RepeatedTest(10)
-    public void testSingleCommitUser() throws Exception {
-        testRandomConcurrentNoConflict(1);
+    @ParameterizedTest
+    @ValueSource(
+            strings = {TestAtomicRenameFileSystem.SCHEME, 
FailingAtomicRenameFileSystem.SCHEME})
+    public void testSingleCommitUser(String scheme) throws Exception {
+        testRandomConcurrentNoConflict(1, scheme);
     }
 
-    @RepeatedTest(10)
-    public void testManyCommitUsersNoConflict() throws Exception {
-        testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) 
+ 2);
+    @ParameterizedTest
+    @ValueSource(
+            strings = {TestAtomicRenameFileSystem.SCHEME, 
FailingAtomicRenameFileSystem.SCHEME})
+    public void testManyCommitUsersNoConflict(String scheme) throws Exception {
+        testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) 
+ 2, scheme);
     }
 
-    protected void testRandomConcurrentNoConflict(int numThreads) throws 
Exception {
+    protected void testRandomConcurrentNoConflict(int numThreads, String 
scheme) throws Exception {
         // prepare test data
         Map<BinaryRowData, List<KeyValue>> data =
                 generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
@@ -88,11 +95,6 @@ public abstract class FileStoreCommitTestBase {
                                 .flatMap(Collection::stream)
                                 .collect(Collectors.toList()),
                 "input");
-        Map<BinaryRowData, BinaryRowData> expected =
-                OperationTestUtils.toKvMap(
-                        data.values().stream()
-                                .flatMap(Collection::stream)
-                                .collect(Collectors.toList()));
 
         List<Map<BinaryRowData, List<KeyValue>>> dataPerThread = new 
ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
@@ -110,15 +112,26 @@ public abstract class FileStoreCommitTestBase {
             TestCommitThread thread =
                     new TestCommitThread(
                             dataPerThread.get(i),
-                            OperationTestUtils.createPathFactory(getScheme(), 
tempDir.toString()),
+                            OperationTestUtils.createPathFactory(scheme, 
tempDir.toString()),
                             OperationTestUtils.createPathFactory(
                                     TestAtomicRenameFileSystem.SCHEME, 
tempDir.toString()));
             thread.start();
             threads.add(thread);
         }
+
+        // calculate expected results
+        Map<BinaryRowData, List<KeyValue>> threadResults = new HashMap<>();
         for (TestCommitThread thread : threads) {
             thread.join();
+            for (Map.Entry<BinaryRowData, List<KeyValue>> entry : 
thread.getResult().entrySet()) {
+                threadResults.put(entry.getKey(), entry.getValue());
+            }
         }
+        Map<BinaryRowData, BinaryRowData> expected =
+                OperationTestUtils.toKvMap(
+                        threadResults.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()));
 
         // read actual data and compare
         FileStorePathFactory safePathFactory =
@@ -136,6 +149,79 @@ public abstract class FileStoreCommitTestBase {
         assertThat(actual).isEqualTo(expected);
     }
 
+    @Test
+    public void testOverwritePartialCommit() throws Exception {
+        Map<BinaryRowData, List<KeyValue>> data1 =
+                generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
+        logData(
+                () ->
+                        data1.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()),
+                "data1");
+
+        FileStorePathFactory pathFactory =
+                OperationTestUtils.createPathFactory(
+                        TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+        OperationTestUtils.commitData(
+                
data1.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+                gen::getPartition,
+                kv -> 0,
+                avro,
+                pathFactory);
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        String dtToOverwrite =
+                new ArrayList<>(data1.keySet())
+                        .get(random.nextInt(data1.size()))
+                        .getString(0)
+                        .toString();
+        Map<String, String> partitionToOverwrite = new HashMap<>();
+        partitionToOverwrite.put("dt", dtToOverwrite);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("dtToOverwrite " + dtToOverwrite);
+        }
+
+        Map<BinaryRowData, List<KeyValue>> data2 =
+                generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
+        // remove all records not belonging to dtToOverwrite
+        data2.entrySet().removeIf(e -> 
!dtToOverwrite.equals(e.getKey().getString(0).toString()));
+        logData(
+                () ->
+                        data2.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()),
+                "data2");
+        OperationTestUtils.overwriteData(
+                
data2.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+                gen::getPartition,
+                kv -> 0,
+                avro,
+                pathFactory,
+                partitionToOverwrite);
+
+        List<KeyValue> expectedKvs = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, List<KeyValue>> entry : 
data1.entrySet()) {
+            if (dtToOverwrite.equals(entry.getKey().getString(0).toString())) {
+                continue;
+            }
+            expectedKvs.addAll(entry.getValue());
+        }
+        data2.values().forEach(expectedKvs::addAll);
+        gen.sort(expectedKvs);
+        Map<BinaryRowData, BinaryRowData> expected = 
OperationTestUtils.toKvMap(expectedKvs);
+
+        List<KeyValue> actualKvs =
+                OperationTestUtils.readKvsFromSnapshot(
+                        pathFactory.latestSnapshotId(), avro, pathFactory);
+        gen.sort(actualKvs);
+        Map<BinaryRowData, BinaryRowData> actual = 
OperationTestUtils.toKvMap(actualKvs);
+
+        logData(() -> kvMapToKvList(expected), "expected");
+        logData(() -> kvMapToKvList(actual), "actual");
+        assertThat(actual).isEqualTo(expected);
+    }
+
     private Map<BinaryRowData, List<KeyValue>> generateData(int numRecords) {
         Map<BinaryRowData, List<KeyValue>> data = new HashMap<>();
         for (int i = 0; i < numRecords; i++) {
@@ -162,30 +248,4 @@ public abstract class FileStoreCommitTestBase {
         }
         LOG.debug("========== End of " + name + " ==========");
     }
-
-    /** Tests for {@link FileStoreCommitImpl} with {@link 
TestAtomicRenameFileSystem}. */
-    public static class WithTestAtomicRenameFileSystem extends 
FileStoreCommitTestBase {
-
-        @Override
-        protected String getScheme() {
-            return TestAtomicRenameFileSystem.SCHEME;
-        }
-    }
-
-    /** Tests for {@link FileStoreCommitImpl} with {@link 
FailingAtomicRenameFileSystem}. */
-    public static class WithFailingAtomicRenameFileSystem extends 
FileStoreCommitTestBase {
-
-        @BeforeEach
-        @Override
-        public void beforeEach() throws IOException {
-            super.beforeEach();
-            FailingAtomicRenameFileSystem.resetFailCounter(100);
-            FailingAtomicRenameFileSystem.setFailPossibility(5000);
-        }
-
-        @Override
-        protected String getScheme() {
-            return FailingAtomicRenameFileSystem.SCHEME;
-        }
-    }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 16a7ab1..de8f410 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -46,7 +46,7 @@ public class FileStoreExpireTest {
 
     private final FileFormat avro =
             FileFormat.fromIdentifier(
-                    FileStoreCommitTestBase.class.getClassLoader(), "avro", 
new Configuration());
+                    FileStoreCommitTest.class.getClassLoader(), "avro", new 
Configuration());
 
     private TestKeyValueGenerator gen;
     @TempDir java.nio.file.Path tempDir;
@@ -187,7 +187,7 @@ public class FileStoreExpireTest {
             }
             allData.addAll(data);
             List<Snapshot> snapshots =
-                    OperationTestUtils.writeAndCommitData(
+                    OperationTestUtils.commitData(
                             data, gen::getPartition, kv -> 0, avro, 
pathFactory);
             for (int j = 0; j < snapshots.size(); j++) {
                 snapshotPositions.add(allData.size());
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 8a12007..0d1a76c 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -53,7 +53,7 @@ public class FileStoreScanTest {
 
     private final FileFormat avro =
             FileFormat.fromIdentifier(
-                    FileStoreCommitTestBase.class.getClassLoader(), "avro", 
new Configuration());
+                    FileStoreCommitTest.class.getClassLoader(), "avro", new 
Configuration());
 
     private TestKeyValueGenerator gen;
     @TempDir java.nio.file.Path tempDir;
@@ -199,7 +199,7 @@ public class FileStoreScanTest {
 
     private Snapshot writeData(List<KeyValue> kvs) throws Exception {
         List<Snapshot> snapshots =
-                OperationTestUtils.writeAndCommitData(
+                OperationTestUtils.commitData(
                         kvs, gen::getPartition, this::getBucket, avro, 
pathFactory);
         return snapshots.get(snapshots.size() - 1);
     }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
index ef93d26..6a62421 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
@@ -40,6 +39,7 @@ import 
org.apache.flink.table.store.file.mergetree.sst.SstFile;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.util.function.QuadFunction;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -51,6 +51,7 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 /** Utils for operation tests. */
@@ -85,18 +86,13 @@ public class OperationTestUtils {
 
     public static FileStoreCommit createCommit(
             FileFormat fileFormat, FileStorePathFactory pathFactory) {
-        ManifestCommittableSerializer serializer =
-                new ManifestCommittableSerializer(
-                        TestKeyValueGenerator.PARTITION_TYPE,
-                        TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.ROW_TYPE);
         ManifestFile.Factory testManifestFileFactory =
                 createManifestFileFactory(fileFormat, pathFactory);
         ManifestList.Factory testManifestListFactory =
                 createManifestListFactory(fileFormat, pathFactory);
         return new FileStoreCommitImpl(
                 UUID.randomUUID().toString(),
-                serializer,
+                TestKeyValueGenerator.PARTITION_TYPE,
                 pathFactory,
                 testManifestFileFactory,
                 testManifestListFactory,
@@ -153,13 +149,52 @@ public class OperationTestUtils {
                 TestKeyValueGenerator.PARTITION_TYPE, fileFormat, pathFactory);
     }
 
-    public static List<Snapshot> writeAndCommitData(
+    public static List<Snapshot> commitData(
             List<KeyValue> kvs,
             Function<KeyValue, BinaryRowData> partitionCalculator,
             Function<KeyValue, Integer> bucketCalculator,
             FileFormat fileFormat,
             FileStorePathFactory pathFactory)
             throws Exception {
+        return commitDataImpl(
+                kvs,
+                partitionCalculator,
+                bucketCalculator,
+                fileFormat,
+                pathFactory,
+                FileStoreWrite::createWriter,
+                (commit, committable) -> commit.commit(committable, 
Collections.emptyMap()));
+    }
+
+    public static List<Snapshot> overwriteData(
+            List<KeyValue> kvs,
+            Function<KeyValue, BinaryRowData> partitionCalculator,
+            Function<KeyValue, Integer> bucketCalculator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            Map<String, String> partition)
+            throws Exception {
+        return commitDataImpl(
+                kvs,
+                partitionCalculator,
+                bucketCalculator,
+                fileFormat,
+                pathFactory,
+                FileStoreWrite::createEmptyWriter,
+                (commit, committable) ->
+                        commit.overwrite(partition, committable, 
Collections.emptyMap()));
+    }
+
+    private static List<Snapshot> commitDataImpl(
+            List<KeyValue> kvs,
+            Function<KeyValue, BinaryRowData> partitionCalculator,
+            Function<KeyValue, Integer> bucketCalculator,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            QuadFunction<FileStoreWrite, BinaryRowData, Integer, 
ExecutorService, RecordWriter>
+                    createWriterFunction,
+            BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
+            throws Exception {
         FileStoreWrite write = createWrite(fileFormat, pathFactory);
         Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new 
HashMap<>();
         for (KeyValue kv : kvs) {
@@ -171,7 +206,8 @@ public class OperationTestUtils {
                             (b, w) -> {
                                 if (w == null) {
                                     ExecutorService service = 
Executors.newSingleThreadExecutor();
-                                    return write.createWriter(partition, 
bucket, service);
+                                    return createWriterFunction.apply(
+                                            write, partition, bucket, service);
                                 } else {
                                     return w;
                                 }
@@ -194,7 +230,7 @@ public class OperationTestUtils {
         if (snapshotIdBeforeCommit == null) {
             snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
         }
-        commit.commit(committable, Collections.emptyMap());
+        commitFunction.accept(commit, committable);
         Long snapshotIdAfterCommit = pathFactory.latestSnapshotId();
         if (snapshotIdAfterCommit == null) {
             snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 5d96cf5..8033b40 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -44,6 +45,7 @@ public class TestCommitThread extends Thread {
     private static final Logger LOG = 
LoggerFactory.getLogger(TestCommitThread.class);
 
     private final Map<BinaryRowData, List<KeyValue>> data;
+    private final Map<BinaryRowData, List<KeyValue>> result;
     private final Map<BinaryRowData, MergeTreeWriter> writers;
 
     private final FileStoreWrite write;
@@ -54,46 +56,33 @@ public class TestCommitThread extends Thread {
             FileStorePathFactory testPathFactory,
             FileStorePathFactory safePathFactory) {
         this.data = data;
+        this.result = new HashMap<>();
         this.writers = new HashMap<>();
 
         FileFormat avro =
                 FileFormat.fromIdentifier(
-                        FileStoreCommitTestBase.class.getClassLoader(),
-                        "avro",
-                        new Configuration());
+                        FileStoreCommitTest.class.getClassLoader(), "avro", 
new Configuration());
         this.write = OperationTestUtils.createWrite(avro, safePathFactory);
         this.commit = OperationTestUtils.createCommit(avro, testPathFactory);
     }
 
+    public Map<BinaryRowData, List<KeyValue>> getResult() {
+        return result;
+    }
+
     @Override
     public void run() {
         while (!data.isEmpty()) {
-            ManifestCommittable committable;
             try {
-                committable = createCommittable();
+                if (ThreadLocalRandom.current().nextInt(5) == 0) {
+                    // 20% probability to overwrite
+                    doOverwrite();
+                } else {
+                    doCommit();
+                }
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
-
-            boolean shouldCheckFilter = false;
-            while (true) {
-                try {
-                    if (shouldCheckFilter) {
-                        if 
(commit.filterCommitted(Collections.singletonList(committable))
-                                .isEmpty()) {
-                            break;
-                        }
-                    }
-                    commit.commit(committable, Collections.emptyMap());
-                    break;
-                } catch (Throwable e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.warn("Failed to commit because of exception, try 
again", e);
-                    }
-                    writers.clear();
-                    shouldCheckFilter = true;
-                }
-            }
         }
 
         for (MergeTreeWriter writer : writers.values()) {
@@ -105,29 +94,87 @@ public class TestCommitThread extends Thread {
         }
     }
 
-    private ManifestCommittable createCommittable() throws Exception {
+    private void doCommit() throws Exception {
         int numWrites = ThreadLocalRandom.current().nextInt(3) + 1;
         for (int i = 0; i < numWrites && !data.isEmpty(); i++) {
             writeData();
         }
-
         ManifestCommittable committable = new ManifestCommittable();
         for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : 
writers.entrySet()) {
             committable.add(entry.getKey(), 0, 
entry.getValue().prepareCommit());
         }
-        return committable;
+
+        runWithRetry(committable, () -> commit.commit(committable, 
Collections.emptyMap()));
+    }
+
+    private void doOverwrite() throws Exception {
+        BinaryRowData partition = overwriteData();
+        ManifestCommittable committable = new ManifestCommittable();
+        committable.add(partition, 0, writers.get(partition).prepareCommit());
+
+        runWithRetry(
+                committable,
+                () ->
+                        commit.overwrite(
+                                
TestKeyValueGenerator.toPartitionMap(partition),
+                                committable,
+                                Collections.emptyMap()));
+    }
+
+    private void runWithRetry(ManifestCommittable committable, Runnable 
runnable) {
+        boolean shouldCheckFilter = false;
+        while (true) {
+            try {
+                if (shouldCheckFilter) {
+                    if 
(commit.filterCommitted(Collections.singletonList(committable)).isEmpty()) {
+                        break;
+                    }
+                }
+                runnable.run();
+                break;
+            } catch (Throwable e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.warn("Failed to commit because of exception, try 
again", e);
+                }
+                writers.clear();
+                shouldCheckFilter = true;
+            }
+        }
     }
 
     private void writeData() throws Exception {
         List<KeyValue> changes = new ArrayList<>();
         BinaryRowData partition = pickData(changes);
+        result.compute(partition, (p, l) -> l == null ? new ArrayList<>() : 
l).addAll(changes);
+
         MergeTreeWriter writer =
-                writers.compute(partition, (k, v) -> v == null ? 
createWriter(k) : v);
+                writers.compute(partition, (p, w) -> w == null ? 
createWriter(p, false) : w);
         for (KeyValue kv : changes) {
             writer.write(kv.valueKind(), kv.key(), kv.value());
         }
     }
 
+    private BinaryRowData overwriteData() throws Exception {
+        List<KeyValue> changes = new ArrayList<>();
+        BinaryRowData partition = pickData(changes);
+        List<KeyValue> resultOfPartition =
+                result.compute(partition, (p, l) -> l == null ? new 
ArrayList<>() : l);
+        resultOfPartition.clear();
+        resultOfPartition.addAll(changes);
+
+        if (writers.containsKey(partition)) {
+            MergeTreeWriter oldWriter = writers.get(partition);
+            oldWriter.close();
+        }
+        MergeTreeWriter writer = createWriter(partition, true);
+        writers.put(partition, writer);
+        for (KeyValue kv : changes) {
+            writer.write(kv.valueKind(), kv.key(), kv.value());
+        }
+
+        return partition;
+    }
+
     private BinaryRowData pickData(List<KeyValue> changes) {
         List<BinaryRowData> keys = new ArrayList<>(data.keySet());
         BinaryRowData partition = 
keys.get(ThreadLocalRandom.current().nextInt(keys.size()));
@@ -142,7 +189,7 @@ public class TestCommitThread extends Thread {
         return partition;
     }
 
-    private MergeTreeWriter createWriter(BinaryRowData partition) {
+    private MergeTreeWriter createWriter(BinaryRowData partition, boolean 
empty) {
         ExecutorService service =
                 Executors.newSingleThreadExecutor(
                         r -> {
@@ -150,6 +197,10 @@ public class TestCommitThread extends Thread {
                             t.setName(Thread.currentThread().getName() + 
"-writer-service-pool");
                             return t;
                         });
-        return (MergeTreeWriter) write.createWriter(partition, 0, service);
+        if (empty) {
+            return (MergeTreeWriter) write.createEmptyWriter(partition, 0, 
service);
+        } else {
+            return (MergeTreeWriter) write.createWriter(partition, 0, service);
+        }
     }
 }
diff --git a/flink-table-store-core/src/test/resources/log4j2-test.xml 
b/flink-table-store-core/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..ae98052
--- /dev/null
+++ b/flink-table-store-core/src/test/resources/log4j2-test.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} 
- %msg%n" />
+        </Console>
+    </Appenders>
+    <Loggers>
+        <!-- <Logger name="org.apache.flink.table.store.file.operation" 
level="DEBUG" /> -->
+    </Loggers>
+</Configuration>
\ No newline at end of file

Reply via email to