This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new bc5646cc [FLINK-29842] Change commitIdentifier in Table Store snapshot 
to long value
bc5646cc is described below

commit bc5646cc62bb341954d6320c7f3ed375195e747f
Author: tsreaper <tsreape...@gmail.com>
AuthorDate: Tue Nov 8 14:11:07 2022 +0800

    [FLINK-29842] Change commitIdentifier in Table Store snapshot to long value
    
    This closes #358.
---
 .gitignore                                         |   1 -
 .../table/store/connector/sink/StoreCommitter.java |   2 +-
 .../apache/flink/table/store/file/Snapshot.java    |  27 ++++-
 .../store/file/manifest/ManifestCommittable.java   |   8 +-
 .../manifest/ManifestCommittableSerializer.java    |   4 +-
 .../store/file/operation/FileStoreCommitImpl.java  |  10 +-
 .../flink/table/store/table/sink/TableCommit.java  |   4 +-
 .../flink/table/store/file/TestFileStore.java      |  10 +-
 .../ManifestCommittableSerializerTest.java         |   3 +-
 .../store/file/operation/FileStoreCommitTest.java  |   4 +-
 .../store/file/operation/TestCommitThread.java     |  11 +-
 .../store/table/AppendOnlyFileStoreTableTest.java  |   8 +-
 .../ChangelogValueCountFileStoreTableTest.java     |   6 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 113 +++++++++++++++++++--
 .../table/store/table/FileStoreTableTestBase.java  |  24 ++---
 .../table/store/table/SchemaEvolutionTest.java     |   4 +-
 .../table/store/table/WritePreemptMemoryTest.java  |   2 +-
 .../table/store/utils/CompatibilityTestUtils.java  |  59 +++++++++++
 .../compatibility/table-changelog-0.2.1.zip        | Bin 0 -> 11903 bytes
 .../hive/TableStoreHiveStorageHandlerITCase.java   |  24 +++--
 .../store/mapred/TableStoreRecordReaderTest.java   |   6 +-
 .../table/store/spark/SimpleTableTestHelper.java   |   7 +-
 .../table/store/spark/SimpleTableTestHelper.java   |   7 +-
 23 files changed, 263 insertions(+), 81 deletions(-)

diff --git a/.gitignore b/.gitignore
index afd1e953..516ceef5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,7 +10,6 @@ target
 *.iml
 *.swp
 *.jar
-*.zip
 *.log
 *.pyc
 .DS_Store
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
index cfe0551b..839a0592 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
@@ -42,7 +42,7 @@ public class StoreCommitter implements Committer {
 
     @Override
     public ManifestCommittable combine(long checkpointId, List<Committable> 
committables) {
-        ManifestCommittable fileCommittable = new 
ManifestCommittable(String.valueOf(checkpointId));
+        ManifestCommittable fileCommittable = new 
ManifestCommittable(checkpointId);
         for (Committable committable : committables) {
             switch (committable.kind()) {
                 case FILE:
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 72791ccb..39ae7a32 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -33,7 +33,18 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-/** This file is the entrance to all data committed at some specific time 
point. */
+/**
+ * This file is the entrance to all data committed at some specific time point.
+ *
+ * <p>Unversioned change list:
+ *
+ * <ul>
+ *   <li>Since table store 0.22 and table store 0.3, commitIdentifier is 
changed from a String to a
+ *       long value. For table store < 0.22, only Flink connectors have table 
store sink and they
+ *       use checkpointId as commitIdentifier (which is a long value). Json 
can automatically
+ *       perform type conversion so there is no compatibility issue.
+ * </ul>
+ */
 public class Snapshot {
 
     public static final long FIRST_SNAPSHOT_ID = 1;
@@ -66,9 +77,15 @@ public class Snapshot {
     @JsonProperty(FIELD_COMMIT_USER)
     private final String commitUser;
 
-    // for deduplication
+    // Mainly for snapshot deduplication.
+    //
+    // If multiple snapshots have the same commitIdentifier, reading from any 
of these snapshots
+    // must produce the same table.
+    //
+    // If snapshot A has a smaller commitIdentifier than snapshot B, then 
snapshot A must be
+    // committed before snapshot B, and thus snapshot A must contain older 
records than snapshot B.
     @JsonProperty(FIELD_COMMIT_IDENTIFIER)
-    private final String commitIdentifier;
+    private final long commitIdentifier;
 
     @JsonProperty(FIELD_COMMIT_KIND)
     private final CommitKind commitKind;
@@ -86,7 +103,7 @@ public class Snapshot {
             @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
             @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
             @JsonProperty(FIELD_COMMIT_USER) String commitUser,
-            @JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier,
+            @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
             @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
             @JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
             @JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets) {
@@ -127,7 +144,7 @@ public class Snapshot {
     }
 
     @JsonGetter(FIELD_COMMIT_IDENTIFIER)
-    public String commitIdentifier() {
+    public long commitIdentifier() {
         return commitIdentifier;
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
index 101cc758..6bcb2b0c 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -31,13 +31,13 @@ import java.util.Objects;
 /** Manifest commit message. */
 public class ManifestCommittable {
 
-    private final String identifier;
+    private final long identifier;
     private final Map<Integer, Long> logOffsets;
     private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
newFiles;
     private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
compactBefore;
     private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
compactAfter;
 
-    public ManifestCommittable(String identifier) {
+    public ManifestCommittable(long identifier) {
         this.identifier = identifier;
         this.logOffsets = new HashMap<>();
         this.newFiles = new HashMap<>();
@@ -46,7 +46,7 @@ public class ManifestCommittable {
     }
 
     public ManifestCommittable(
-            String identifier,
+            long identifier,
             Map<Integer, Long> logOffsets,
             Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> newFiles,
             Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> compactBefore,
@@ -83,7 +83,7 @@ public class ManifestCommittable {
                 .addAll(files);
     }
 
-    public String identifier() {
+    public long identifier() {
         return identifier;
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
index e3a89a75..a0a9d894 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -54,7 +54,7 @@ public class ManifestCommittableSerializer
     public byte[] serialize(ManifestCommittable obj) throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
-        view.writeUTF(obj.identifier());
+        view.writeLong(obj.identifier());
         serializeOffsets(view, obj.logOffsets());
         serializeFiles(view, obj.newFiles());
         serializeFiles(view, obj.compactBefore());
@@ -124,7 +124,7 @@ public class ManifestCommittableSerializer
     public ManifestCommittable deserialize(int version, byte[] serialized) 
throws IOException {
         DataInputDeserializer view = new DataInputDeserializer(serialized);
         return new ManifestCommittable(
-                view.readUTF(),
+                view.readLong(),
                 deserializeOffsets(view),
                 deserializeFiles(view),
                 deserializeFiles(view),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 71003c14..8103b570 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -156,7 +156,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         }
 
         // check if a committable is already committed by its identifier
-        Map<String, ManifestCommittable> identifiers = new LinkedHashMap<>();
+        Map<Long, ManifestCommittable> identifiers = new LinkedHashMap<>();
         for (ManifestCommittable committable : committableList) {
             identifiers.put(committable.identifier(), committable);
         }
@@ -291,7 +291,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
     private void tryCommit(
             List<ManifestEntry> changes,
-            String hash,
+            long identifier,
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
             Long safeLatestSnapshotId) {
@@ -299,7 +299,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             Long latestSnapshotId = snapshotManager.latestSnapshotId();
             if (tryCommitOnce(
                     changes,
-                    hash,
+                    identifier,
                     logOffsets,
                     commitKind,
                     latestSnapshotId,
@@ -312,7 +312,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private void tryOverwrite(
             Predicate partitionFilter,
             List<ManifestEntry> changes,
-            String identifier,
+            long identifier,
             Map<Integer, Long> logOffsets) {
         while (true) {
             Long latestSnapshotId = snapshotManager.latestSnapshotId();
@@ -373,7 +373,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
     private boolean tryCommitOnce(
             List<ManifestEntry> changes,
-            String identifier,
+            long identifier,
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
             Long latestSnapshotId,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
index 1b5cea0b..51a3000d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -68,7 +68,7 @@ public class TableCommit implements AutoCloseable {
         return commit.filterCommitted(committables);
     }
 
-    public void commit(String identifier, List<FileCommittable> 
fileCommittables) {
+    public void commit(long identifier, List<FileCommittable> 
fileCommittables) {
         ManifestCommittable committable = new ManifestCommittable(identifier);
         for (FileCommittable fileCommittable : fileCommittables) {
             committable.addFileCommittable(
@@ -96,7 +96,7 @@ public class TableCommit implements AutoCloseable {
                 // create an empty committable
                 // identifier is Long.MAX_VALUE, come from batch job
                 // TODO maybe it can be produced by CommitterOperator
-                committable = new 
ManifestCommittable(String.valueOf(Long.MAX_VALUE));
+                committable = new ManifestCommittable(Long.MAX_VALUE);
             }
             commit.overwrite(overwritePartition, committable, new HashMap<>());
         }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 76f6f8bc..54742c0b 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -59,7 +59,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -85,6 +84,8 @@ public class TestFileStore extends KeyValueFileStore {
     private final RowDataSerializer valueSerializer;
     private final String user;
 
+    private long commitIdentifier;
+
     public static TestFileStore create(
             String format,
             String root,
@@ -132,6 +133,8 @@ public class TestFileStore extends KeyValueFileStore {
         this.keySerializer = new RowDataSerializer(keyType);
         this.valueSerializer = new RowDataSerializer(valueType);
         this.user = UUID.randomUUID().toString();
+
+        this.commitIdentifier = 0L;
     }
 
     public FileStoreCommitImpl newCommit() {
@@ -197,7 +200,7 @@ public class TestFileStore extends KeyValueFileStore {
             Function<KeyValue, BinaryRowData> partitionCalculator,
             Function<KeyValue, Integer> bucketCalculator,
             boolean emptyWriter,
-            String identifier,
+            Long identifier,
             BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
             throws Exception {
         FileStoreWrite<KeyValue> write = newWrite();
@@ -232,8 +235,7 @@ public class TestFileStore extends KeyValueFileStore {
 
         FileStoreCommit commit = newCommit(user);
         ManifestCommittable committable =
-                new ManifestCommittable(
-                        identifier == null ? String.valueOf(new 
Random().nextLong()) : identifier);
+                new ManifestCommittable(identifier == null ? 
commitIdentifier++ : identifier);
         for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> 
entryWithPartition :
                 writers.entrySet()) {
             for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 9867811d..e51c4bfb 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -51,8 +51,7 @@ public class ManifestCommittableSerializerTest {
     }
 
     public static ManifestCommittable create() {
-        ManifestCommittable committable =
-                new ManifestCommittable(String.valueOf(new 
Random().nextLong()));
+        ManifestCommittable committable = new ManifestCommittable(new 
Random().nextLong());
         addAndAssert(committable, row(0), 0);
         addAndAssert(committable, row(0), 1);
         addAndAssert(committable, row(1), 0);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 8d29b364..a6970aa6 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -118,7 +118,7 @@ public class FileStoreCommitTest {
         FileUtils.deleteOrWarn(firstSnapshotPath);
         // this test succeeds if this call does not fail
         store.newCommit(UUID.randomUUID().toString())
-                .filterCommitted(Collections.singletonList(new 
ManifestCommittable("dummy")));
+                .filterCommitted(Collections.singletonList(new 
ManifestCommittable(999)));
     }
 
     protected void testRandomConcurrentNoConflict(int numThreads, boolean 
failing)
@@ -356,7 +356,7 @@ public class FileStoreCommitTest {
                     gen::getPartition,
                     kv -> 0,
                     false,
-                    String.valueOf(i),
+                    (long) i,
                     (commit, committable) -> {
                         commit.commit(committable, Collections.emptyMap());
                         committables.add(committable);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 120131be..10a3d2c2 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -36,7 +36,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -62,6 +61,8 @@ public class TestCommitThread extends Thread {
     private final FileStoreWrite<KeyValue> write;
     private final FileStoreCommit commit;
 
+    private long commitIdentifier;
+
     public TestCommitThread(
             RowType keyType,
             RowType valueType,
@@ -87,6 +88,8 @@ public class TestCommitThread extends Thread {
 
         this.write = safeStore.newWrite();
         this.commit = 
testStore.newCommit(UUID.randomUUID().toString()).withCreateEmptyCommit(true);
+
+        this.commitIdentifier = 0;
     }
 
     public List<KeyValue> getResult() {
@@ -122,8 +125,7 @@ public class TestCommitThread extends Thread {
         for (int i = 0; i < numWrites && !data.isEmpty(); i++) {
             writeData();
         }
-        ManifestCommittable committable =
-                new ManifestCommittable(String.valueOf(new 
Random().nextLong()));
+        ManifestCommittable committable = new 
ManifestCommittable(commitIdentifier++);
         for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : 
writers.entrySet()) {
             committable.addFileCommittable(entry.getKey(), 0, 
entry.getValue().prepareCommit(true));
         }
@@ -133,8 +135,7 @@ public class TestCommitThread extends Thread {
 
     private void doOverwrite() throws Exception {
         BinaryRowData partition = overwriteData();
-        ManifestCommittable committable =
-                new ManifestCommittable(String.valueOf(new 
Random().nextLong()));
+        ManifestCommittable committable = new 
ManifestCommittable(commitIdentifier++);
         committable.addFileCommittable(partition, 0, 
writers.get(partition).prepareCommit(true));
 
         runWithRetry(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 82f6508e..fc804e87 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -188,7 +188,7 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
             dataset.add(new HashMap<>(dataPerBucket));
             dataPerBucket.clear();
         }
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         int partition = random.nextInt(numOfPartition);
         List<Integer> availableBucket = new 
ArrayList<>(dataset.get(partition).keySet());
@@ -216,17 +216,17 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.write(rowData(1, 10, 100L));
         write.write(rowData(2, 20, 200L));
         write.write(rowData(1, 11, 101L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         write.write(rowData(1, 12, 102L));
         write.write(rowData(2, 21, 201L));
         write.write(rowData(2, 22, 202L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowData(1, 11, 101L));
         write.write(rowData(2, 21, 201L));
         write.write(rowData(1, 12, 102L));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         write.close();
     }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 8ce299dd..d1cc39ee 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -155,20 +155,20 @@ public class ChangelogValueCountFileStoreTableTest 
extends FileStoreTableTestBas
         write.write(rowData(1, 10, 100L));
         write.write(rowData(2, 20, 200L));
         write.write(rowData(1, 11, 101L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         write.write(rowData(2, 21, 201L));
         write.write(rowData(1, 12, 102L));
         write.write(rowData(2, 21, 201L));
         write.write(rowData(2, 21, 201L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowData(1, 11, 101L));
         write.write(rowData(2, 22, 202L));
         write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
         write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
         write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         write.close();
     }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 2d143c39..0950fd40 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
+import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
@@ -29,9 +30,12 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.SnapshotEnumerator;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.utils.CompatibilityTestUtils;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.function.FunctionWithException;
 
 import org.junit.jupiter.api.Test;
 
@@ -54,9 +58,9 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.write(rowData(1, 10, 200L));
         write.write(rowData(1, 10, 100L));
         write.write(rowData(1, 11, 101L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.write(rowData(1, 11, 55L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().plan().splits;
@@ -174,7 +178,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.write(rowData(1, 10, 101L));
         write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
         write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.close();
 
         List<Split> splits = 
table.newScan().withIncremental(true).plan().splits;
@@ -189,6 +193,97 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 "+U 1|10|102|binary|varbinary"));
     }
 
+    @Test
+    public void testStreamingChangelogCompatibility021() throws Exception {
+        // already contains 2 commits
+        CompatibilityTestUtils.unzip(
+                "compatibility/table-changelog-0.2.1.zip", 
tablePath.getPath());
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, 
ChangelogProducer.INPUT));
+
+        List<List<List<String>>> expected =
+                Arrays.asList(
+                        // first changelog snapshot
+                        Arrays.asList(
+                                // partition 1
+                                Arrays.asList(
+                                        "+I 1|10|100|binary|varbinary",
+                                        "+I 1|20|200|binary|varbinary",
+                                        "-D 1|10|100|binary|varbinary",
+                                        "+I 1|10|101|binary|varbinary",
+                                        "-U 1|10|101|binary|varbinary",
+                                        "+U 1|10|102|binary|varbinary"),
+                                // partition 2
+                                Collections.singletonList("+I 
2|10|300|binary|varbinary")),
+                        // second changelog snapshot
+                        Arrays.asList(
+                                // partition 1
+                                Collections.singletonList("-D 
1|20|200|binary|varbinary"),
+                                // partition 2
+                                Arrays.asList(
+                                        "-U 2|10|300|binary|varbinary",
+                                        "+U 2|10|301|binary|varbinary",
+                                        "+I 2|20|400|binary|varbinary")),
+                        // third changelog snapshot
+                        Arrays.asList(
+                                // partition 1
+                                Arrays.asList(
+                                        "-U 1|10|102|binary|varbinary",
+                                        "+U 1|10|103|binary|varbinary",
+                                        "+I 1|20|201|binary|varbinary"),
+                                // partition 2
+                                Collections.singletonList("-D 
2|10|301|binary|varbinary")));
+
+        SnapshotEnumerator enumerator =
+                new SnapshotEnumerator(
+                        tablePath,
+                        table.newScan().withIncremental(true),
+                        Snapshot.FIRST_SNAPSHOT_ID - 1);
+
+        FunctionWithException<Integer, Void, Exception> assertNextSnapshot =
+                i -> {
+                    SnapshotEnumerator.EnumeratorResult result = 
enumerator.call();
+                    assertThat(result).isNotNull();
+
+                    TableRead read = table.newRead();
+                    for (int j = 0; j < 2; j++) {
+                        assertThat(
+                                        getResult(
+                                                read,
+                                                result.plan.splits,
+                                                binaryRow(j + 1),
+                                                0,
+                                                CHANGELOG_ROW_TO_STRING))
+                                .isEqualTo(expected.get(i).get(j));
+                    }
+
+                    return null;
+                };
+
+        for (int i = 0; i < 2; i++) {
+            assertNextSnapshot.apply(i);
+        }
+
+        // no more changelog
+        assertThat(enumerator.call()).isNull();
+
+        // write another commit
+        TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
+        write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 102L));
+        write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.INSERT, 1, 20, 201L));
+        write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L));
+        commit.commit(2, write.prepareCommit(true));
+        write.close();
+
+        assertNextSnapshot.apply(2);
+
+        // no more changelog
+        assertThat(enumerator.call()).isNull();
+    }
+
     private void writeData() throws Exception {
         FileStoreTable table = createFileStoreTable();
         TableWrite write = table.newWrite();
@@ -197,19 +292,19 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.write(rowData(1, 10, 100L));
         write.write(rowData(2, 20, 200L));
         write.write(rowData(1, 11, 101L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         write.write(rowData(1, 10, 1000L));
         write.write(rowData(2, 21, 201L));
         write.write(rowData(2, 21, 2001L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowData(1, 11, 1001L));
         write.write(rowData(2, 21, 20001L));
         write.write(rowData(2, 22, 202L));
         write.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L));
         write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         write.close();
     }
@@ -224,15 +319,15 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
 
         write.write(rowData(1, 10, 100L));
         write.write(rowData(1, 20, 200L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         write.write(rowData(1, 30, 300L));
         write.write(rowData(1, 40, 400L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowData(1, 50, 500L));
         write.write(rowData(1, 60, 600L));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
         List<Split> splits = table.newScan().plan().splits;
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 4c100ee9..13defd93 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -130,7 +130,7 @@ public abstract class FileStoreTableTestBase {
         TableCommit commit = table.newCommit("user");
         write.write(rowData(1, 10, 100L));
         write.write(rowData(2, 20, 200L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.close();
 
         write = table.newWrite().withOverwrite(true);
@@ -138,7 +138,7 @@ public abstract class FileStoreTableTestBase {
         write.write(rowData(2, 21, 201L));
         Map<String, String> overwritePartition = new HashMap<>();
         overwritePartition.put("pt", "2");
-        commit.withOverwritePartition(overwritePartition).commit("1", 
write.prepareCommit(true));
+        commit.withOverwritePartition(overwritePartition).commit(1, 
write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().plan().splits;
@@ -164,7 +164,7 @@ public abstract class FileStoreTableTestBase {
         write.write(rowData(1, 5, 6L));
         write.write(rowData(1, 7, 8L));
         write.write(rowData(1, 9, 10L));
-        table.newCommit("user").commit("0", write.prepareCommit(true));
+        table.newCommit("user").commit(0, write.prepareCommit(true));
         write.close();
 
         List<Split> splits =
@@ -185,15 +185,15 @@ public abstract class FileStoreTableTestBase {
 
         write.write(rowData(1, 10, 100L));
         write.write(rowData(1, 20, 200L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         write.write(rowData(1, 30, 300L));
         write.write(rowData(1, 40, 400L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowData(1, 50, 500L));
         write.write(rowData(1, 60, 600L));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         write.close();
 
@@ -216,7 +216,7 @@ public abstract class FileStoreTableTestBase {
             for (int j = 0; j < 1000; j++) {
                 write.write(rowData(1, 10 * i * j, 100L * i * j));
             }
-            commit.commit(String.valueOf(i), write.prepareCommit(false));
+            commit.commit(i, write.prepareCommit(false));
         }
 
         write.write(rowData(1, 40, 400L));
@@ -236,9 +236,9 @@ public abstract class FileStoreTableTestBase {
             // if remove writer too fast, will see old files, do another 
compaction
             // then will be conflicts
 
-            commit.commit("4", commit4);
-            commit.commit("5", commit5);
-            commit.commit("6", commit6);
+            commit.commit(4, commit4);
+            commit.commit(5, commit5);
+            commit.commit(6, commit6);
         } else {
             // commit4 is a compaction commit
             // do compaction commit5
@@ -247,8 +247,8 @@ public abstract class FileStoreTableTestBase {
             // wait compaction finish
             // commit5 should be a compaction commit
 
-            commit.commit("4", commit4);
-            commit.commit("5", commit5);
+            commit.commit(4, commit4);
+            commit.commit(5, commit5);
         }
 
         write.close();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index d52df819..f5a2ed5d 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -86,7 +86,7 @@ public class SchemaEvolutionTest {
         TableWrite write = table.newWrite();
         write.write(GenericRowData.of(1, 1L));
         write.write(GenericRowData.of(2, 2L));
-        table.newCommit("").commit("0", write.prepareCommit(true));
+        table.newCommit("").commit(0, write.prepareCommit(true));
         write.close();
 
         schemaManager.commitChanges(
@@ -96,7 +96,7 @@ public class SchemaEvolutionTest {
         write = table.newWrite();
         write.write(GenericRowData.of(3, 3L, 3L));
         write.write(GenericRowData.of(4, 4L, 4L));
-        table.newCommit("").commit("1", write.prepareCommit(true));
+        table.newCommit("").commit(1, write.prepareCommit(true));
         write.close();
 
         // read all
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 353ac24e..cab9276c 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -73,7 +73,7 @@ public class WritePreemptMemoryTest extends 
FileStoreTableTestBase {
             expected.add(BATCH_ROW_TO_STRING.apply(row));
         }
         List<FileCommittable> committables = write.prepareCommit(true);
-        commit.commit("0", committables);
+        commit.commit(0, committables);
         write.close();
 
         // read
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/utils/CompatibilityTestUtils.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/utils/CompatibilityTestUtils.java
new file mode 100644
index 00000000..a76f8d72
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/utils/CompatibilityTestUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+/** Utils for compatibility tests. */
+public class CompatibilityTestUtils {
+
+    private static final int BUFFER_SIZE = 8192;
+
+    public static void unzip(String zipFileName, String targetDirectory) 
throws IOException {
+        try (ZipInputStream zip =
+                new ZipInputStream(
+                        CompatibilityTestUtils.class
+                                .getClassLoader()
+                                .getResourceAsStream(zipFileName))) {
+            ZipEntry entry;
+            while ((entry = zip.getNextEntry()) != null) {
+                File file = new File(targetDirectory, entry.getName());
+
+                if (entry.isDirectory()) {
+                    file.mkdirs();
+                    continue;
+                }
+
+                byte[] buffer = new byte[BUFFER_SIZE];
+                file.getParentFile().mkdirs();
+                BufferedOutputStream out = new BufferedOutputStream(new 
FileOutputStream(file));
+                int count;
+                while ((count = zip.read(buffer)) != -1) {
+                    out.write(buffer, 0, count);
+                }
+                out.close();
+            }
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/resources/compatibility/table-changelog-0.2.1.zip
 
b/flink-table-store-core/src/test/resources/compatibility/table-changelog-0.2.1.zip
new file mode 100644
index 00000000..bb202764
Binary files /dev/null and 
b/flink-table-store-core/src/test/resources/compatibility/table-changelog-0.2.1.zip
 differ
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 1301b5e9..d9ed166a 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -76,6 +76,8 @@ public class TableStoreHiveStorageHandlerITCase {
 
     private static String engine;
 
+    private long commitIdentifier;
+
     @BeforeClass
     public static void beforeClass() {
         // TODO Currently FlinkEmbeddedHiveRunner can only be used for one 
test class,
@@ -98,6 +100,8 @@ public class TableStoreHiveStorageHandlerITCase {
 
         hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
         hiveShell.execute("USE test_db");
+
+        commitIdentifier = 0;
     }
 
     @After
@@ -626,10 +630,10 @@ public class TableStoreHiveStorageHandlerITCase {
         for (RowData rowData : data) {
             write.write(rowData);
             if (ThreadLocalRandom.current().nextInt(5) == 0) {
-                commit.commit(UUID.randomUUID().toString(), 
write.prepareCommit(false));
+                commit.commit(commitIdentifier++, write.prepareCommit(false));
             }
         }
-        commit.commit(UUID.randomUUID().toString(), write.prepareCommit(true));
+        commit.commit(commitIdentifier++, write.prepareCommit(true));
         write.close();
 
         String tableName = "test_table_" + 
(UUID.randomUUID().toString().substring(0, 4));
@@ -674,7 +678,7 @@ public class TableStoreHiveStorageHandlerITCase {
         for (GenericRowData rowData : input) {
             write.write(rowData);
         }
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
@@ -779,17 +783,17 @@ public class TableStoreHiveStorageHandlerITCase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.write(GenericRowData.of((Object) null));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
         write.write(GenericRowData.of(2));
         write.write(GenericRowData.of(3));
         write.write(GenericRowData.of((Object) null));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
         write.write(GenericRowData.of(4));
         write.write(GenericRowData.of(5));
         write.write(GenericRowData.of(6));
-        commit.commit("3", write.prepareCommit(true));
+        commit.commit(3, write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
@@ -875,15 +879,15 @@ public class TableStoreHiveStorageHandlerITCase {
                         375, /* 1971-01-11 */
                         TimestampData.fromLocalDateTime(
                                 LocalDateTime.of(2022, 5, 17, 17, 29, 20))));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.write(GenericRowData.of(null, null));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
         write.write(GenericRowData.of(376 /* 1971-01-12 */, null));
         write.write(
                 GenericRowData.of(
                         null,
                         TimestampData.fromLocalDateTime(LocalDateTime.of(2022, 
6, 18, 8, 30, 0))));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index e8106dc3..4b43e90e 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -78,7 +78,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.of(3L, StringData.fromString("World")));
         write.write(GenericRowData.of(1L, StringData.fromString("Hi again")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, 
StringData.fromString("Hello")));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         TableStoreRecordReader reader = read(table, 
BinaryRowDataUtil.EMPTY_ROW, 0);
         RowDataContainer container = reader.createValue();
@@ -120,7 +120,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.of(1, StringData.fromString("Hi")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 
StringData.fromString("Hello")));
         write.write(GenericRowData.of(1, StringData.fromString("Hi")));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         TableStoreRecordReader reader = read(table, 
BinaryRowDataUtil.EMPTY_ROW, 0);
         RowDataContainer container = reader.createValue();
@@ -160,7 +160,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
         write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         TableStoreRecordReader reader =
                 read(table, BinaryRowDataUtil.EMPTY_ROW, 0, Arrays.asList("c", 
"a"));
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index f31eeaea..1f2e4394 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -34,7 +34,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 /** A simple table test helper to write and commit. */
 public class SimpleTableTestHelper {
@@ -42,6 +41,8 @@ public class SimpleTableTestHelper {
     private final TableWrite writer;
     private final TableCommit commit;
 
+    private long commitIdentifier;
+
     public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
         this(path, rowType, Collections.emptyList(), Collections.emptyList());
     }
@@ -60,6 +61,8 @@ public class SimpleTableTestHelper {
         FileStoreTable table = FileStoreTableFactory.create(conf);
         this.writer = table.newWrite();
         this.commit = table.newCommit("user");
+
+        this.commitIdentifier = 0;
     }
 
     public void write(RowData row) throws Exception {
@@ -67,6 +70,6 @@ public class SimpleTableTestHelper {
     }
 
     public void commit() throws Exception {
-        commit.commit(UUID.randomUUID().toString(), 
writer.prepareCommit(true));
+        commit.commit(commitIdentifier++, writer.prepareCommit(true));
     }
 }
diff --git 
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
 
b/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 705a961a..8e7f10ef 100644
--- 
a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ 
b/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.types.logical.RowType;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 
 /** A simple table test helper to write and commit. */
 public class SimpleTableTestHelper {
@@ -41,6 +40,8 @@ public class SimpleTableTestHelper {
     private final TableWrite writer;
     private final TableCommit commit;
 
+    private long commitIdentifier;
+
     public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
         Map<String, String> options = new HashMap<>();
         // orc is shaded, can not find shaded classes in ide
@@ -58,6 +59,8 @@ public class SimpleTableTestHelper {
         FileStoreTable table = FileStoreTableFactory.create(conf);
         this.writer = table.newWrite();
         this.commit = table.newCommit("user");
+
+        this.commitIdentifier = 0;
     }
 
     public void write(RowData row) throws Exception {
@@ -65,6 +68,6 @@ public class SimpleTableTestHelper {
     }
 
     public void commit() throws Exception {
-        commit.commit(UUID.randomUUID().toString(), 
writer.prepareCommit(true));
+        commit.commit(commitIdentifier++, writer.prepareCommit(true));
     }
 }

Reply via email to