This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch release-0.2 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push: new bc5646cc [FLINK-29842] Change commitIdentifier in Table Store snapshot to long value bc5646cc is described below commit bc5646cc62bb341954d6320c7f3ed375195e747f Author: tsreaper <tsreape...@gmail.com> AuthorDate: Tue Nov 8 14:11:07 2022 +0800 [FLINK-29842] Change commitIdentifier in Table Store snapshot to long value This closes #358. --- .gitignore | 1 - .../table/store/connector/sink/StoreCommitter.java | 2 +- .../apache/flink/table/store/file/Snapshot.java | 27 ++++- .../store/file/manifest/ManifestCommittable.java | 8 +- .../manifest/ManifestCommittableSerializer.java | 4 +- .../store/file/operation/FileStoreCommitImpl.java | 10 +- .../flink/table/store/table/sink/TableCommit.java | 4 +- .../flink/table/store/file/TestFileStore.java | 10 +- .../ManifestCommittableSerializerTest.java | 3 +- .../store/file/operation/FileStoreCommitTest.java | 4 +- .../store/file/operation/TestCommitThread.java | 11 +- .../store/table/AppendOnlyFileStoreTableTest.java | 8 +- .../ChangelogValueCountFileStoreTableTest.java | 6 +- .../table/ChangelogWithKeyFileStoreTableTest.java | 113 +++++++++++++++++++-- .../table/store/table/FileStoreTableTestBase.java | 24 ++--- .../table/store/table/SchemaEvolutionTest.java | 4 +- .../table/store/table/WritePreemptMemoryTest.java | 2 +- .../table/store/utils/CompatibilityTestUtils.java | 59 +++++++++++ .../compatibility/table-changelog-0.2.1.zip | Bin 0 -> 11903 bytes .../hive/TableStoreHiveStorageHandlerITCase.java | 24 +++-- .../store/mapred/TableStoreRecordReaderTest.java | 6 +- .../table/store/spark/SimpleTableTestHelper.java | 7 +- .../table/store/spark/SimpleTableTestHelper.java | 7 +- 23 files changed, 263 insertions(+), 81 deletions(-) diff --git a/.gitignore b/.gitignore index afd1e953..516ceef5 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,6 @@ target *.iml *.swp *.jar -*.zip *.log *.pyc .DS_Store diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java index cfe0551b..839a0592 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java @@ -42,7 +42,7 @@ public class StoreCommitter implements Committer { @Override public ManifestCommittable combine(long checkpointId, List<Committable> committables) { - ManifestCommittable fileCommittable = new ManifestCommittable(String.valueOf(checkpointId)); + ManifestCommittable fileCommittable = new ManifestCommittable(checkpointId); for (Committable committable : committables) { switch (committable.kind()) { case FILE: diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java index 72791ccb..39ae7a32 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java @@ -33,7 +33,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -/** This file is the entrance to all data committed at some specific time point. */ +/** + * This file is the entrance to all data committed at some specific time point. + * + * <p>Unversioned change list: + * + * <ul> + * <li>Since table store 0.22 and table store 0.3, commitIdentifier is changed from a String to a + * long value. For table store < 0.22, only Flink connectors have table store sink and they + * use checkpointId as commitIdentifier (which is a long value). Json can automatically + * perform type conversion so there is no compatibility issue. + * </ul> + */ public class Snapshot { public static final long FIRST_SNAPSHOT_ID = 1; @@ -66,9 +77,15 @@ public class Snapshot { @JsonProperty(FIELD_COMMIT_USER) private final String commitUser; - // for deduplication + // Mainly for snapshot deduplication. + // + // If multiple snapshots have the same commitIdentifier, reading from any of these snapshots + // must produce the same table. + // + // If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be + // committed before snapshot B, and thus snapshot A must contain older records than snapshot B. @JsonProperty(FIELD_COMMIT_IDENTIFIER) - private final String commitIdentifier; + private final long commitIdentifier; @JsonProperty(FIELD_COMMIT_KIND) private final CommitKind commitKind; @@ -86,7 +103,7 @@ public class Snapshot { @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList, @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList, @JsonProperty(FIELD_COMMIT_USER) String commitUser, - @JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier, + @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier, @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind, @JsonProperty(FIELD_TIME_MILLIS) long timeMillis, @JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets) { @@ -127,7 +144,7 @@ public class Snapshot { } @JsonGetter(FIELD_COMMIT_IDENTIFIER) - public String commitIdentifier() { + public long commitIdentifier() { return commitIdentifier; } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java index 101cc758..6bcb2b0c 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java @@ -31,13 +31,13 @@ import java.util.Objects; /** Manifest commit message. */ public class ManifestCommittable { - private final String identifier; + private final long identifier; private final Map<Integer, Long> logOffsets; private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> newFiles; private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> compactBefore; private final Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> compactAfter; - public ManifestCommittable(String identifier) { + public ManifestCommittable(long identifier) { this.identifier = identifier; this.logOffsets = new HashMap<>(); this.newFiles = new HashMap<>(); @@ -46,7 +46,7 @@ public class ManifestCommittable { } public ManifestCommittable( - String identifier, + long identifier, Map<Integer, Long> logOffsets, Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> newFiles, Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> compactBefore, @@ -83,7 +83,7 @@ public class ManifestCommittable { .addAll(files); } - public String identifier() { + public long identifier() { return identifier; } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java index e3a89a75..a0a9d894 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java @@ -54,7 +54,7 @@ public class ManifestCommittableSerializer public byte[] serialize(ManifestCommittable obj) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); - view.writeUTF(obj.identifier()); + view.writeLong(obj.identifier()); serializeOffsets(view, obj.logOffsets()); serializeFiles(view, obj.newFiles()); serializeFiles(view, obj.compactBefore()); @@ -124,7 +124,7 @@ public class ManifestCommittableSerializer public ManifestCommittable deserialize(int version, byte[] serialized) throws IOException { DataInputDeserializer view = new DataInputDeserializer(serialized); return new ManifestCommittable( - view.readUTF(), + view.readLong(), deserializeOffsets(view), deserializeFiles(view), deserializeFiles(view), diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java index 71003c14..8103b570 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java @@ -156,7 +156,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { } // check if a committable is already committed by its identifier - Map<String, ManifestCommittable> identifiers = new LinkedHashMap<>(); + Map<Long, ManifestCommittable> identifiers = new LinkedHashMap<>(); for (ManifestCommittable committable : committableList) { identifiers.put(committable.identifier(), committable); } @@ -291,7 +291,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private void tryCommit( List<ManifestEntry> changes, - String hash, + long identifier, Map<Integer, Long> logOffsets, Snapshot.CommitKind commitKind, Long safeLatestSnapshotId) { @@ -299,7 +299,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { Long latestSnapshotId = snapshotManager.latestSnapshotId(); if (tryCommitOnce( changes, - hash, + identifier, logOffsets, commitKind, latestSnapshotId, @@ -312,7 +312,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private void tryOverwrite( Predicate partitionFilter, List<ManifestEntry> changes, - String identifier, + long identifier, Map<Integer, Long> logOffsets) { while (true) { Long latestSnapshotId = snapshotManager.latestSnapshotId(); @@ -373,7 +373,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private boolean tryCommitOnce( List<ManifestEntry> changes, - String identifier, + long identifier, Map<Integer, Long> logOffsets, Snapshot.CommitKind commitKind, Long latestSnapshotId, diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java index 1b5cea0b..51a3000d 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java @@ -68,7 +68,7 @@ public class TableCommit implements AutoCloseable { return commit.filterCommitted(committables); } - public void commit(String identifier, List<FileCommittable> fileCommittables) { + public void commit(long identifier, List<FileCommittable> fileCommittables) { ManifestCommittable committable = new ManifestCommittable(identifier); for (FileCommittable fileCommittable : fileCommittables) { committable.addFileCommittable( @@ -96,7 +96,7 @@ public class TableCommit implements AutoCloseable { // create an empty committable // identifier is Long.MAX_VALUE, come from batch job // TODO maybe it can be produced by CommitterOperator - committable = new ManifestCommittable(String.valueOf(Long.MAX_VALUE)); + committable = new ManifestCommittable(Long.MAX_VALUE); } commit.overwrite(overwritePartition, committable, new HashMap<>()); } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java index 76f6f8bc..54742c0b 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java @@ -59,7 +59,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -85,6 +84,8 @@ public class TestFileStore extends KeyValueFileStore { private final RowDataSerializer valueSerializer; private final String user; + private long commitIdentifier; + public static TestFileStore create( String format, String root, @@ -132,6 +133,8 @@ public class TestFileStore extends KeyValueFileStore { this.keySerializer = new RowDataSerializer(keyType); this.valueSerializer = new RowDataSerializer(valueType); this.user = UUID.randomUUID().toString(); + + this.commitIdentifier = 0L; } public FileStoreCommitImpl newCommit() { @@ -197,7 +200,7 @@ public class TestFileStore extends KeyValueFileStore { Function<KeyValue, BinaryRowData> partitionCalculator, Function<KeyValue, Integer> bucketCalculator, boolean emptyWriter, - String identifier, + Long identifier, BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction) throws Exception { FileStoreWrite<KeyValue> write = newWrite(); @@ -232,8 +235,7 @@ public class TestFileStore extends KeyValueFileStore { FileStoreCommit commit = newCommit(user); ManifestCommittable committable = - new ManifestCommittable( - identifier == null ? String.valueOf(new Random().nextLong()) : identifier); + new ManifestCommittable(identifier == null ? commitIdentifier++ : identifier); for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> entryWithPartition : writers.entrySet()) { for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket : diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java index 9867811d..e51c4bfb 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java @@ -51,8 +51,7 @@ public class ManifestCommittableSerializerTest { } public static ManifestCommittable create() { - ManifestCommittable committable = - new ManifestCommittable(String.valueOf(new Random().nextLong())); + ManifestCommittable committable = new ManifestCommittable(new Random().nextLong()); addAndAssert(committable, row(0), 0); addAndAssert(committable, row(0), 1); addAndAssert(committable, row(1), 0); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java index 8d29b364..a6970aa6 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java @@ -118,7 +118,7 @@ public class FileStoreCommitTest { FileUtils.deleteOrWarn(firstSnapshotPath); // this test succeeds if this call does not fail store.newCommit(UUID.randomUUID().toString()) - .filterCommitted(Collections.singletonList(new ManifestCommittable("dummy"))); + .filterCommitted(Collections.singletonList(new ManifestCommittable(999))); } protected void testRandomConcurrentNoConflict(int numThreads, boolean failing) @@ -356,7 +356,7 @@ public class FileStoreCommitTest { gen::getPartition, kv -> 0, false, - String.valueOf(i), + (long) i, (commit, committable) -> { commit.commit(committable, Collections.emptyMap()); committables.add(committable); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java index 120131be..10a3d2c2 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java @@ -36,7 +36,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -62,6 +61,8 @@ public class TestCommitThread extends Thread { private final FileStoreWrite<KeyValue> write; private final FileStoreCommit commit; + private long commitIdentifier; + public TestCommitThread( RowType keyType, RowType valueType, @@ -87,6 +88,8 @@ public class TestCommitThread extends Thread { this.write = safeStore.newWrite(); this.commit = testStore.newCommit(UUID.randomUUID().toString()).withCreateEmptyCommit(true); + + this.commitIdentifier = 0; } public List<KeyValue> getResult() { @@ -122,8 +125,7 @@ public class TestCommitThread extends Thread { for (int i = 0; i < numWrites && !data.isEmpty(); i++) { writeData(); } - ManifestCommittable committable = - new ManifestCommittable(String.valueOf(new Random().nextLong())); + ManifestCommittable committable = new ManifestCommittable(commitIdentifier++); for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : writers.entrySet()) { committable.addFileCommittable(entry.getKey(), 0, entry.getValue().prepareCommit(true)); } @@ -133,8 +135,7 @@ public class TestCommitThread extends Thread { private void doOverwrite() throws Exception { BinaryRowData partition = overwriteData(); - ManifestCommittable committable = - new ManifestCommittable(String.valueOf(new Random().nextLong())); + ManifestCommittable committable = new ManifestCommittable(commitIdentifier++); committable.addFileCommittable(partition, 0, writers.get(partition).prepareCommit(true)); runWithRetry( diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java index 82f6508e..fc804e87 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java @@ -188,7 +188,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase { dataset.add(new HashMap<>(dataPerBucket)); dataPerBucket.clear(); } - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); int partition = random.nextInt(numOfPartition); List<Integer> availableBucket = new ArrayList<>(dataset.get(partition).keySet()); @@ -216,17 +216,17 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase { write.write(rowData(1, 10, 100L)); write.write(rowData(2, 20, 200L)); write.write(rowData(1, 11, 101L)); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.write(rowData(1, 12, 102L)); write.write(rowData(2, 21, 201L)); write.write(rowData(2, 22, 202L)); - commit.commit("1", write.prepareCommit(true)); + commit.commit(1, write.prepareCommit(true)); write.write(rowData(1, 11, 101L)); write.write(rowData(2, 21, 201L)); write.write(rowData(1, 12, 102L)); - commit.commit("2", write.prepareCommit(true)); + commit.commit(2, write.prepareCommit(true)); write.close(); } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java index 8ce299dd..d1cc39ee 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java @@ -155,20 +155,20 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas write.write(rowData(1, 10, 100L)); write.write(rowData(2, 20, 200L)); write.write(rowData(1, 11, 101L)); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.write(rowData(2, 21, 201L)); write.write(rowData(1, 12, 102L)); write.write(rowData(2, 21, 201L)); write.write(rowData(2, 21, 201L)); - commit.commit("1", write.prepareCommit(true)); + commit.commit(1, write.prepareCommit(true)); write.write(rowData(1, 11, 101L)); write.write(rowData(2, 22, 202L)); write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L)); write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L)); write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L)); - commit.commit("2", write.prepareCommit(true)); + commit.commit(2, write.prepareCommit(true)); write.close(); } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java index 2d143c39..0950fd40 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java @@ -21,6 +21,7 @@ package org.apache.flink.table.store.table; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.CoreOptions.ChangelogProducer; +import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.table.store.file.WriteMode; import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.predicate.PredicateBuilder; @@ -29,9 +30,12 @@ import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.schema.UpdateSchema; import org.apache.flink.table.store.table.sink.TableCommit; import org.apache.flink.table.store.table.sink.TableWrite; +import org.apache.flink.table.store.table.source.SnapshotEnumerator; import org.apache.flink.table.store.table.source.Split; import org.apache.flink.table.store.table.source.TableRead; +import org.apache.flink.table.store.utils.CompatibilityTestUtils; import org.apache.flink.types.RowKind; +import org.apache.flink.util.function.FunctionWithException; import org.junit.jupiter.api.Test; @@ -54,9 +58,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase { write.write(rowData(1, 10, 200L)); write.write(rowData(1, 10, 100L)); write.write(rowData(1, 11, 101L)); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.write(rowData(1, 11, 55L)); - commit.commit("1", write.prepareCommit(true)); + commit.commit(1, write.prepareCommit(true)); write.close(); List<Split> splits = table.newScan().plan().splits; @@ -174,7 +178,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase { write.write(rowData(1, 10, 101L)); write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 101L)); write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L)); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.close(); List<Split> splits = table.newScan().withIncremental(true).plan().splits; @@ -189,6 +193,97 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase { "+U 1|10|102|binary|varbinary")); } + @Test + public void testStreamingChangelogCompatibility021() throws Exception { + // already contains 2 commits + CompatibilityTestUtils.unzip( + "compatibility/table-changelog-0.2.1.zip", tablePath.getPath()); + FileStoreTable table = + createFileStoreTable( + conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT)); + + List<List<List<String>>> expected = + Arrays.asList( + // first changelog snapshot + Arrays.asList( + // partition 1 + Arrays.asList( + "+I 1|10|100|binary|varbinary", + "+I 1|20|200|binary|varbinary", + "-D 1|10|100|binary|varbinary", + "+I 1|10|101|binary|varbinary", + "-U 1|10|101|binary|varbinary", + "+U 1|10|102|binary|varbinary"), + // partition 2 + Collections.singletonList("+I 2|10|300|binary|varbinary")), + // second changelog snapshot + Arrays.asList( + // partition 1 + Collections.singletonList("-D 1|20|200|binary|varbinary"), + // partition 2 + Arrays.asList( + "-U 2|10|300|binary|varbinary", + "+U 2|10|301|binary|varbinary", + "+I 2|20|400|binary|varbinary")), + // third changelog snapshot + Arrays.asList( + // partition 1 + Arrays.asList( + "-U 1|10|102|binary|varbinary", + "+U 1|10|103|binary|varbinary", + "+I 1|20|201|binary|varbinary"), + // partition 2 + Collections.singletonList("-D 2|10|301|binary|varbinary"))); + + SnapshotEnumerator enumerator = + new SnapshotEnumerator( + tablePath, + table.newScan().withIncremental(true), + Snapshot.FIRST_SNAPSHOT_ID - 1); + + FunctionWithException<Integer, Void, Exception> assertNextSnapshot = + i -> { + SnapshotEnumerator.EnumeratorResult result = enumerator.call(); + assertThat(result).isNotNull(); + + TableRead read = table.newRead(); + for (int j = 0; j < 2; j++) { + assertThat( + getResult( + read, + result.plan.splits, + binaryRow(j + 1), + 0, + CHANGELOG_ROW_TO_STRING)) + .isEqualTo(expected.get(i).get(j)); + } + + return null; + }; + + for (int i = 0; i < 2; i++) { + assertNextSnapshot.apply(i); + } + + // no more changelog + assertThat(enumerator.call()).isNull(); + + // write another commit + TableWrite write = table.newWrite(); + TableCommit commit = table.newCommit("user"); + write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 102L)); + write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 103L)); + write.write(rowDataWithKind(RowKind.INSERT, 1, 20, 201L)); + write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L)); + commit.commit(2, write.prepareCommit(true)); + write.close(); + + assertNextSnapshot.apply(2); + + // no more changelog + assertThat(enumerator.call()).isNull(); + } + private void writeData() throws Exception { FileStoreTable table = createFileStoreTable(); TableWrite write = table.newWrite(); @@ -197,19 +292,19 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase { write.write(rowData(1, 10, 100L)); write.write(rowData(2, 20, 200L)); write.write(rowData(1, 11, 101L)); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.write(rowData(1, 10, 1000L)); write.write(rowData(2, 21, 201L)); write.write(rowData(2, 21, 2001L)); - commit.commit("1", write.prepareCommit(true)); + commit.commit(1, write.prepareCommit(true)); write.write(rowData(1, 11, 1001L)); write.write(rowData(2, 21, 20001L)); write.write(rowData(2, 22, 202L)); write.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L)); write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L)); - commit.commit("2", write.prepareCommit(true)); + commit.commit(2, write.prepareCommit(true)); write.close(); } @@ -224,15 +319,15 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase { write.write(rowData(1, 10, 100L)); write.write(rowData(1, 20, 200L)); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.write(rowData(1, 30, 300L)); write.write(rowData(1, 40, 400L)); - commit.commit("1", write.prepareCommit(true)); + commit.commit(1, write.prepareCommit(true)); write.write(rowData(1, 50, 500L)); write.write(rowData(1, 60, 600L)); - commit.commit("2", write.prepareCommit(true)); + commit.commit(2, write.prepareCommit(true)); PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); List<Split> splits = table.newScan().plan().splits; diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java index 4c100ee9..13defd93 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java @@ -130,7 +130,7 @@ public abstract class FileStoreTableTestBase { TableCommit commit = table.newCommit("user"); write.write(rowData(1, 10, 100L)); write.write(rowData(2, 20, 200L)); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.close(); write = table.newWrite().withOverwrite(true); @@ -138,7 +138,7 @@ public abstract class FileStoreTableTestBase { write.write(rowData(2, 21, 201L)); Map<String, String> overwritePartition = new HashMap<>(); overwritePartition.put("pt", "2"); - commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit(true)); + commit.withOverwritePartition(overwritePartition).commit(1, write.prepareCommit(true)); write.close(); List<Split> splits = table.newScan().plan().splits; @@ -164,7 +164,7 @@ public abstract class FileStoreTableTestBase { write.write(rowData(1, 5, 6L)); write.write(rowData(1, 7, 8L)); write.write(rowData(1, 9, 10L)); - table.newCommit("user").commit("0", write.prepareCommit(true)); + table.newCommit("user").commit(0, write.prepareCommit(true)); write.close(); List<Split> splits = @@ -185,15 +185,15 @@ public abstract class FileStoreTableTestBase { write.write(rowData(1, 10, 100L)); write.write(rowData(1, 20, 200L)); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.write(rowData(1, 30, 300L)); write.write(rowData(1, 40, 400L)); - commit.commit("1", write.prepareCommit(true)); + commit.commit(1, write.prepareCommit(true)); write.write(rowData(1, 50, 500L)); write.write(rowData(1, 60, 600L)); - commit.commit("2", write.prepareCommit(true)); + commit.commit(2, write.prepareCommit(true)); write.close(); @@ -216,7 +216,7 @@ public abstract class FileStoreTableTestBase { for (int j = 0; j < 1000; j++) { write.write(rowData(1, 10 * i * j, 100L * i * j)); } - commit.commit(String.valueOf(i), write.prepareCommit(false)); + commit.commit(i, write.prepareCommit(false)); } write.write(rowData(1, 40, 400L)); @@ -236,9 +236,9 @@ public abstract class FileStoreTableTestBase { // if remove writer too fast, will see old files, do another compaction // then will be conflicts - commit.commit("4", commit4); - commit.commit("5", commit5); - commit.commit("6", commit6); + commit.commit(4, commit4); + commit.commit(5, commit5); + commit.commit(6, commit6); } else { // commit4 is a compaction commit // do compaction commit5 @@ -247,8 +247,8 @@ public abstract class FileStoreTableTestBase { // wait compaction finish // commit5 should be a compaction commit - commit.commit("4", commit4); - commit.commit("5", commit5); + commit.commit(4, commit4); + commit.commit(5, commit5); } write.close(); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java index d52df819..f5a2ed5d 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java @@ -86,7 +86,7 @@ public class SchemaEvolutionTest { TableWrite write = table.newWrite(); write.write(GenericRowData.of(1, 1L)); write.write(GenericRowData.of(2, 2L)); - table.newCommit("").commit("0", write.prepareCommit(true)); + table.newCommit("").commit(0, write.prepareCommit(true)); write.close(); schemaManager.commitChanges( @@ -96,7 +96,7 @@ public class SchemaEvolutionTest { write = table.newWrite(); write.write(GenericRowData.of(3, 3L, 3L)); write.write(GenericRowData.of(4, 4L, 4L)); - table.newCommit("").commit("1", write.prepareCommit(true)); + table.newCommit("").commit(1, write.prepareCommit(true)); write.close(); // read all diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java index 353ac24e..cab9276c 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java @@ -73,7 +73,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase { expected.add(BATCH_ROW_TO_STRING.apply(row)); } List<FileCommittable> committables = write.prepareCommit(true); - commit.commit("0", committables); + commit.commit(0, committables); write.close(); // read diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/utils/CompatibilityTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/utils/CompatibilityTestUtils.java new file mode 100644 index 00000000..a76f8d72 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/utils/CompatibilityTestUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.utils; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +/** Utils for compatibility tests. */ +public class CompatibilityTestUtils { + + private static final int BUFFER_SIZE = 8192; + + public static void unzip(String zipFileName, String targetDirectory) throws IOException { + try (ZipInputStream zip = + new ZipInputStream( + CompatibilityTestUtils.class + .getClassLoader() + .getResourceAsStream(zipFileName))) { + ZipEntry entry; + while ((entry = zip.getNextEntry()) != null) { + File file = new File(targetDirectory, entry.getName()); + + if (entry.isDirectory()) { + file.mkdirs(); + continue; + } + + byte[] buffer = new byte[BUFFER_SIZE]; + file.getParentFile().mkdirs(); + BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file)); + int count; + while ((count = zip.read(buffer)) != -1) { + out.write(buffer, 0, count); + } + out.close(); + } + } + } +} diff --git a/flink-table-store-core/src/test/resources/compatibility/table-changelog-0.2.1.zip b/flink-table-store-core/src/test/resources/compatibility/table-changelog-0.2.1.zip new file mode 100644 index 00000000..bb202764 Binary files /dev/null and b/flink-table-store-core/src/test/resources/compatibility/table-changelog-0.2.1.zip differ diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java index 1301b5e9..d9ed166a 100644 --- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java +++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java @@ -76,6 +76,8 @@ public class TableStoreHiveStorageHandlerITCase { private static String engine; + private long commitIdentifier; + @BeforeClass public static void beforeClass() { // TODO Currently FlinkEmbeddedHiveRunner can only be used for one test class, @@ -98,6 +100,8 @@ public class TableStoreHiveStorageHandlerITCase { hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db"); hiveShell.execute("USE test_db"); + + commitIdentifier = 0; } @After @@ -626,10 +630,10 @@ public class TableStoreHiveStorageHandlerITCase { for (RowData rowData : data) { write.write(rowData); if (ThreadLocalRandom.current().nextInt(5) == 0) { - commit.commit(UUID.randomUUID().toString(), write.prepareCommit(false)); + commit.commit(commitIdentifier++, write.prepareCommit(false)); } } - commit.commit(UUID.randomUUID().toString(), write.prepareCommit(true)); + commit.commit(commitIdentifier++, write.prepareCommit(true)); write.close(); String tableName = "test_table_" + (UUID.randomUUID().toString().substring(0, 4)); @@ -674,7 +678,7 @@ public class TableStoreHiveStorageHandlerITCase { for (GenericRowData rowData : input) { write.write(rowData); } - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.close(); hiveShell.execute( @@ -779,17 +783,17 @@ public class TableStoreHiveStorageHandlerITCase { TableWrite write = table.newWrite(); TableCommit commit = table.newCommit("user"); write.write(GenericRowData.of(1)); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.write(GenericRowData.of((Object) null)); - commit.commit("1", write.prepareCommit(true)); + commit.commit(1, write.prepareCommit(true)); write.write(GenericRowData.of(2)); write.write(GenericRowData.of(3)); write.write(GenericRowData.of((Object) null)); - commit.commit("2", write.prepareCommit(true)); + commit.commit(2, write.prepareCommit(true)); write.write(GenericRowData.of(4)); write.write(GenericRowData.of(5)); write.write(GenericRowData.of(6)); - commit.commit("3", write.prepareCommit(true)); + commit.commit(3, write.prepareCommit(true)); write.close(); hiveShell.execute( @@ -875,15 +879,15 @@ public class TableStoreHiveStorageHandlerITCase { 375, /* 1971-01-11 */ TimestampData.fromLocalDateTime( LocalDateTime.of(2022, 5, 17, 17, 29, 20)))); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); write.write(GenericRowData.of(null, null)); - commit.commit("1", write.prepareCommit(true)); + commit.commit(1, write.prepareCommit(true)); write.write(GenericRowData.of(376 /* 1971-01-12 */, null)); write.write( GenericRowData.of( null, TimestampData.fromLocalDateTime(LocalDateTime.of(2022, 6, 18, 8, 30, 0)))); - commit.commit("2", write.prepareCommit(true)); + commit.commit(2, write.prepareCommit(true)); write.close(); hiveShell.execute( diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java index e8106dc3..4b43e90e 100644 --- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java +++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java @@ -78,7 +78,7 @@ public class TableStoreRecordReaderTest { write.write(GenericRowData.of(3L, StringData.fromString("World"))); write.write(GenericRowData.of(1L, StringData.fromString("Hi again"))); write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, StringData.fromString("Hello"))); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0); RowDataContainer container = reader.createValue(); @@ -120,7 +120,7 @@ public class TableStoreRecordReaderTest { write.write(GenericRowData.of(1, StringData.fromString("Hi"))); write.write(GenericRowData.ofKind(RowKind.DELETE, 2, StringData.fromString("Hello"))); write.write(GenericRowData.of(1, StringData.fromString("Hi"))); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0); RowDataContainer container = reader.createValue(); @@ -160,7 +160,7 @@ public class TableStoreRecordReaderTest { write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi"))); write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello"))); write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi"))); - commit.commit("0", write.prepareCommit(true)); + commit.commit(0, write.prepareCommit(true)); TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0, Arrays.asList("c", "a")); diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java index f31eeaea..1f2e4394 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java @@ -34,7 +34,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; /** A simple table test helper to write and commit. */ public class SimpleTableTestHelper { @@ -42,6 +41,8 @@ public class SimpleTableTestHelper { private final TableWrite writer; private final TableCommit commit; + private long commitIdentifier; + public SimpleTableTestHelper(Path path, RowType rowType) throws Exception { this(path, rowType, Collections.emptyList(), Collections.emptyList()); } @@ -60,6 +61,8 @@ public class SimpleTableTestHelper { FileStoreTable table = FileStoreTableFactory.create(conf); this.writer = table.newWrite(); this.commit = table.newCommit("user"); + + this.commitIdentifier = 0; } public void write(RowData row) throws Exception { @@ -67,6 +70,6 @@ public class SimpleTableTestHelper { } public void commit() throws Exception { - commit.commit(UUID.randomUUID().toString(), writer.prepareCommit(true)); + commit.commit(commitIdentifier++, writer.prepareCommit(true)); } } diff --git a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java index 705a961a..8e7f10ef 100644 --- a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java +++ b/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java @@ -33,7 +33,6 @@ import org.apache.flink.table.types.logical.RowType; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.UUID; /** A simple table test helper to write and commit. */ public class SimpleTableTestHelper { @@ -41,6 +40,8 @@ public class SimpleTableTestHelper { private final TableWrite writer; private final TableCommit commit; + private long commitIdentifier; + public SimpleTableTestHelper(Path path, RowType rowType) throws Exception { Map<String, String> options = new HashMap<>(); // orc is shaded, can not find shaded classes in ide @@ -58,6 +59,8 @@ public class SimpleTableTestHelper { FileStoreTable table = FileStoreTableFactory.create(conf); this.writer = table.newWrite(); this.commit = table.newCommit("user"); + + this.commitIdentifier = 0; } public void write(RowData row) throws Exception { @@ -65,6 +68,6 @@ public class SimpleTableTestHelper { } public void commit() throws Exception { - commit.commit(UUID.randomUUID().toString(), writer.prepareCommit(true)); + commit.commit(commitIdentifier++, writer.prepareCommit(true)); } }