This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 44dfecd [FLINK-25876] Implement overwrite in FlinkStoreCommitImpl 44dfecd is described below commit 44dfecd56695d9575e75b0b63a49aa8211ecde9f Author: tsreaper <tsreape...@gmail.com> AuthorDate: Thu Feb 17 13:36:47 2022 +0800 [FLINK-25876] Implement overwrite in FlinkStoreCommitImpl This closes #16 --- .../apache/flink/table/store/file/Snapshot.java | 16 +- .../store/file/manifest/ManifestCommittable.java | 23 +- .../manifest/ManifestCommittableSerializer.java | 6 +- .../store/file/manifest/ManifestFileMeta.java | 29 +- .../table/store/file/manifest/ManifestList.java | 4 - .../store/file/operation/FileStoreCommit.java | 4 +- .../store/file/operation/FileStoreCommitImpl.java | 326 ++++++++++++--------- .../flink/table/store/file/utils/TypeUtils.java | 93 ++++++ .../table/store/file/TestKeyValueGenerator.java | 9 + ...ommitTestBase.java => FileStoreCommitTest.java} | 150 +++++++--- .../store/file/operation/FileStoreExpireTest.java | 4 +- .../store/file/operation/FileStoreScanTest.java | 4 +- .../store/file/operation/OperationTestUtils.java | 56 +++- .../store/file/operation/TestCommitThread.java | 113 +++++-- .../src/test/resources/log4j2-test.xml | 29 ++ 15 files changed, 613 insertions(+), 253 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java index e02fa0d..9b9085b 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java @@ -37,7 +37,7 @@ public class Snapshot { private static final String FIELD_ID = "id"; private static final String FIELD_MANIFEST_LIST = "manifestList"; private static final String FIELD_COMMIT_USER = "commitUser"; - private static final String FIELD_COMMIT_DIGEST = "commitDigest"; + private static final String FIELD_COMMIT_UUID = "commitUuid"; private static final String FIELD_COMMIT_KIND = "commitKind"; private static final String FIELD_TIME_MILLIS = "timeMillis"; @@ -51,8 +51,8 @@ public class Snapshot { private final String commitUser; // for deduplication - @JsonProperty(FIELD_COMMIT_DIGEST) - private final String commitDigest; + @JsonProperty(FIELD_COMMIT_UUID) + private final String commitUuid; @JsonProperty(FIELD_COMMIT_KIND) private final CommitKind commitKind; @@ -65,13 +65,13 @@ public class Snapshot { @JsonProperty(FIELD_ID) long id, @JsonProperty(FIELD_MANIFEST_LIST) String manifestList, @JsonProperty(FIELD_COMMIT_USER) String commitUser, - @JsonProperty(FIELD_COMMIT_DIGEST) String commitDigest, + @JsonProperty(FIELD_COMMIT_UUID) String commitUuid, @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind, @JsonProperty(FIELD_TIME_MILLIS) long timeMillis) { this.id = id; this.manifestList = manifestList; this.commitUser = commitUser; - this.commitDigest = commitDigest; + this.commitUuid = commitUuid; this.commitKind = commitKind; this.timeMillis = timeMillis; } @@ -91,9 +91,9 @@ public class Snapshot { return commitUser; } - @JsonGetter(FIELD_COMMIT_DIGEST) - public String commitDigest() { - return commitDigest; + @JsonGetter(FIELD_COMMIT_UUID) + public String commitUuid() { + return commitUuid; } @JsonGetter(FIELD_COMMIT_KIND) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java index 4fcd763..765fe1a 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java @@ -27,26 +27,26 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; /** Manifest commit message. */ public class ManifestCommittable { + private final String uuid; private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles; - private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore; - private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter; public ManifestCommittable() { - this.newFiles = new HashMap<>(); - this.compactBefore = new HashMap<>(); - this.compactAfter = new HashMap<>(); + this(UUID.randomUUID().toString(), new HashMap<>(), new HashMap<>(), new HashMap<>()); } public ManifestCommittable( + String uuid, Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles, Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore, Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter) { + this.uuid = uuid; this.newFiles = newFiles; this.compactBefore = compactBefore; this.compactAfter = compactAfter; @@ -68,6 +68,10 @@ public class ManifestCommittable { .addAll(files); } + public String uuid() { + return uuid; + } + public Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles() { return newFiles; } @@ -89,19 +93,22 @@ public class ManifestCommittable { return false; } ManifestCommittable that = (ManifestCommittable) o; - return Objects.equals(newFiles, that.newFiles) + return Objects.equals(uuid, that.uuid) + && Objects.equals(newFiles, that.newFiles) && Objects.equals(compactBefore, that.compactBefore) && Objects.equals(compactAfter, that.compactAfter); } @Override public int hashCode() { - return Objects.hash(newFiles, compactBefore, compactAfter); + return Objects.hash(uuid, newFiles, compactBefore, compactAfter); } @Override public String toString() { - return "new files:\n" + return "uuid: " + + uuid + + "\nnew files:\n" + filesToString(newFiles) + "compact before:\n" + filesToString(compactBefore) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java index 65e18b4..81e4e1f 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java @@ -55,6 +55,7 @@ public class ManifestCommittableSerializer public byte[] serialize(ManifestCommittable obj) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + view.writeUTF(obj.uuid()); serializeFiles(view, obj.newFiles()); serializeFiles(view, obj.compactBefore()); serializeFiles(view, obj.compactAfter()); @@ -105,6 +106,9 @@ public class ManifestCommittableSerializer public ManifestCommittable deserialize(int version, byte[] serialized) throws IOException { DataInputDeserializer view = new DataInputDeserializer(serialized); return new ManifestCommittable( - deserializeFiles(view), deserializeFiles(view), deserializeFiles(view)); + view.readUTF(), + deserializeFiles(view), + deserializeFiles(view), + deserializeFiles(view)); } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java index cc883d3..d6deadd 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java @@ -32,6 +32,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** Metadata of a manifest file. */ public class ManifestFileMeta { @@ -148,23 +149,29 @@ public class ManifestFileMeta { candidate.add(manifest); if (totalSize >= suggestedMetaSize) { // reach suggested file size, perform merging and produce new file - ManifestFileMeta merged; if (candidate.size() == 1) { - merged = candidate.get(0); + result.add(candidate.get(0)); } else { - merged = mergeIntoOneFile(candidate, Collections.emptyList(), manifestFile); - newMetas.add(merged); + mergeIntoOneFile(candidate, Collections.emptyList(), manifestFile) + .ifPresent( + merged -> { + newMetas.add(merged); + result.add(merged); + }); } - result.add(merged); + candidate.clear(); totalSize = 0; } } // merge the last bit of metas with entries - ManifestFileMeta merged = mergeIntoOneFile(candidate, entries, manifestFile); - newMetas.add(merged); - result.add(merged); + mergeIntoOneFile(candidate, entries, manifestFile) + .ifPresent( + merged -> { + newMetas.add(merged); + result.add(merged); + }); } catch (Throwable e) { // exception occurs, clean up and rethrow for (ManifestFileMeta manifest : newMetas) { @@ -176,14 +183,16 @@ public class ManifestFileMeta { return result; } - private static ManifestFileMeta mergeIntoOneFile( + private static Optional<ManifestFileMeta> mergeIntoOneFile( List<ManifestFileMeta> metas, List<ManifestEntry> entries, ManifestFile manifestFile) { Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>(); for (ManifestFileMeta manifest : metas) { mergeEntries(manifestFile.read(manifest.fileName), map); } mergeEntries(entries, map); - return manifestFile.write(new ArrayList<>(map.values())); + return map.isEmpty() + ? Optional.empty() + : Optional.of(manifestFile.write(new ArrayList<>(map.values()))); } private static void mergeEntries( diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java index 8a9707b..bde36a9 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java @@ -29,7 +29,6 @@ import org.apache.flink.table.store.file.FileFormat; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.FileUtils; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.List; @@ -71,9 +70,6 @@ public class ManifestList { * <p>NOTE: This method is atomic. */ public String write(List<ManifestFileMeta> metas) { - Preconditions.checkArgument( - metas.size() > 0, "Manifest file metas to write must not be empty."); - Path path = pathFactory.newManifestList(); try { return write(metas, path); diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java index 912da92..87d1407 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java @@ -39,7 +39,9 @@ public interface FileStoreCommit { * Overwrite from manifest committable and partition. * * @param partition A single partition maps each partition key to a partition value. Depending - * on the * user-defined statement, the partition might not include all partition keys. + * on the user-defined statement, the partition might not include all partition keys. Also + * note that this partition does not necessarily equal to the partitions of the newly added + * key-values. This is just the partition to be cleaned up. */ void overwrite( Map<String, String> partition, diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java index 7c56acd..cb854ef 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java @@ -25,25 +25,24 @@ import org.apache.flink.table.store.file.FileStoreOptions; import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.table.store.file.ValueKind; import org.apache.flink.table.store.file.manifest.ManifestCommittable; -import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer; import org.apache.flink.table.store.file.manifest.ManifestEntry; import org.apache.flink.table.store.file.manifest.ManifestFile; import org.apache.flink.table.store.file.manifest.ManifestFileMeta; import org.apache.flink.table.store.file.manifest.ManifestList; import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; +import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter; +import org.apache.flink.table.store.file.utils.TypeUtils; +import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.io.IOException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.Base64; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -72,7 +71,8 @@ public class FileStoreCommitImpl implements FileStoreCommit { private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class); private final String commitUser; - private final ManifestCommittableSerializer committableSerializer; + private final RowType partitionType; + private final RowDataToObjectArrayConverter partitionObjectConverter; private final FileStorePathFactory pathFactory; private final ManifestFile manifestFile; private final ManifestList manifestList; @@ -83,14 +83,15 @@ public class FileStoreCommitImpl implements FileStoreCommit { public FileStoreCommitImpl( String commitUser, - ManifestCommittableSerializer committableSerializer, + RowType partitionType, FileStorePathFactory pathFactory, ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, FileStoreScan scan, FileStoreOptions fileStoreOptions) { this.commitUser = commitUser; - this.committableSerializer = committableSerializer; + this.partitionType = partitionType; + this.partitionObjectConverter = new RowDataToObjectArrayConverter(partitionType); this.pathFactory = pathFactory; this.manifestFile = manifestFileFactory.create(); this.manifestList = manifestListFactory.create(); @@ -108,29 +109,24 @@ public class FileStoreCommitImpl implements FileStoreCommit { @Override public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) { - committableList = new ArrayList<>(committableList); - - // filter out commits with no new files - committableList.removeIf(committable -> committable.newFiles().isEmpty()); - // if there is no previous snapshots then nothing should be filtered Long latestSnapshotId = pathFactory.latestSnapshotId(); if (latestSnapshotId == null) { return committableList; } - // check if a committable is already committed by its hash - Map<String, ManifestCommittable> digests = new LinkedHashMap<>(); + // check if a committable is already committed by its uuid + Map<String, ManifestCommittable> uuids = new LinkedHashMap<>(); for (ManifestCommittable committable : committableList) { - digests.put(digestManifestCommittable(committable), committable); + uuids.put(committable.uuid(), committable); } for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) { Path snapshotPath = pathFactory.toSnapshotPath(id); Snapshot snapshot = Snapshot.fromPath(snapshotPath); if (commitUser.equals(snapshot.commitUser())) { - if (digests.containsKey(snapshot.commitDigest())) { - digests.remove(snapshot.commitDigest()); + if (uuids.containsKey(snapshot.commitUuid())) { + uuids.remove(snapshot.commitUuid()); } else { // early exit, because committableList must be the latest commits by this // commit user @@ -139,7 +135,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { } } - return new ArrayList<>(digests.values()); + return new ArrayList<>(uuids.values()); } @Override @@ -148,18 +144,14 @@ public class FileStoreCommitImpl implements FileStoreCommit { LOG.debug("Ready to commit\n" + committable.toString()); } - String hash = digestManifestCommittable(committable); - List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD); - if (!appendChanges.isEmpty()) { - tryCommit(appendChanges, hash, Snapshot.CommitKind.APPEND); - } + tryCommit(appendChanges, committable.uuid(), Snapshot.CommitKind.APPEND, false); List<ManifestEntry> compactChanges = new ArrayList<>(); compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE)); compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD)); if (!compactChanges.isEmpty()) { - tryCommit(compactChanges, hash, Snapshot.CommitKind.COMPACT); + tryCommit(compactChanges, committable.uuid(), Snapshot.CommitKind.COMPACT, true); } } @@ -168,21 +160,82 @@ public class FileStoreCommitImpl implements FileStoreCommit { Map<String, String> partition, ManifestCommittable committable, Map<String, String> properties) { - throw new UnsupportedOperationException(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Ready to overwrite partition " + + partition.toString() + + "\n" + + committable.toString()); + } + + List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD); + // sanity check, all changes must be done within the given partition + Predicate partitionFilter = TypeUtils.partitionMapToPredicate(partition, partitionType); + for (ManifestEntry entry : appendChanges) { + if (!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) { + throw new IllegalArgumentException( + "Trying to overwrite partition " + + partition.toString() + + ", but the changes in " + + pathFactory.getPartitionString(entry.partition()) + + " does not belong to this partition"); + } + } + // overwrite new files + tryOverwrite( + partitionFilter, appendChanges, committable.uuid(), Snapshot.CommitKind.APPEND); + + List<ManifestEntry> compactChanges = new ArrayList<>(); + compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE)); + compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD)); + if (!compactChanges.isEmpty()) { + tryCommit(compactChanges, committable.uuid(), Snapshot.CommitKind.COMPACT, true); + } } - private String digestManifestCommittable(ManifestCommittable committable) { - try { - return new String( - Base64.getEncoder() - .encode( - MessageDigest.getInstance("MD5") - .digest(committableSerializer.serialize(committable)))); - } catch (NoSuchAlgorithmException e) { - throw new RuntimeException("MD5 algorithm not found. This is impossible.", e); - } catch (IOException e) { - throw new RuntimeException( - "Failed to serialize ManifestCommittable. This is unexpected.", e); + private void tryCommit( + List<ManifestEntry> changes, + String hash, + Snapshot.CommitKind commitKind, + boolean checkDeletedFiles) { + while (true) { + Long latestSnapshotId = pathFactory.latestSnapshotId(); + if (tryCommitOnce(changes, hash, commitKind, latestSnapshotId, checkDeletedFiles)) { + break; + } + } + } + + private void tryOverwrite( + Predicate partitionFilter, + List<ManifestEntry> changes, + String hash, + Snapshot.CommitKind commitKind) { + while (true) { + Long latestSnapshotId = pathFactory.latestSnapshotId(); + + List<ManifestEntry> changesWithOverwrite = new ArrayList<>(); + if (latestSnapshotId != null) { + List<ManifestEntry> currentEntries = + scan.withSnapshot(latestSnapshotId) + .withPartitionFilter(partitionFilter) + .plan() + .files(); + for (ManifestEntry entry : currentEntries) { + changesWithOverwrite.add( + new ManifestEntry( + ValueKind.DELETE, + entry.partition(), + entry.bucket(), + entry.totalBuckets(), + entry.file())); + } + } + changesWithOverwrite.addAll(changes); + + if (tryCommitOnce(changesWithOverwrite, hash, commitKind, latestSnapshotId, false)) { + break; + } } } @@ -209,114 +262,125 @@ public class FileStoreCommitImpl implements FileStoreCommit { return changes; } - private void tryCommit( - List<ManifestEntry> changes, String hash, Snapshot.CommitKind commitKind) { - while (true) { - Long latestSnapshotId = pathFactory.latestSnapshotId(); - long newSnapshotId = - latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1; - Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId); - Path tmpSnapshotPath = pathFactory.toTmpSnapshotPath(newSnapshotId); + private boolean tryCommitOnce( + List<ManifestEntry> changes, + String hash, + Snapshot.CommitKind commitKind, + Long latestSnapshotId, + boolean checkDeletedFiles) { + long newSnapshotId = + latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshotId + 1; + Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId); + Path tmpSnapshotPath = pathFactory.toTmpSnapshotPath(newSnapshotId); - Snapshot latestSnapshot = null; - if (latestSnapshotId != null) { - noConflictsOrFail(latestSnapshotId, changes); - latestSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId)); + if (LOG.isDebugEnabled()) { + LOG.debug("Ready to commit changes to snapshot #" + newSnapshotId); + for (ManifestEntry entry : changes) { + LOG.debug(" * " + entry.toString()); } + } - Snapshot newSnapshot; - String manifestListName = null; - List<ManifestFileMeta> oldMetas = new ArrayList<>(); - List<ManifestFileMeta> newMetas = new ArrayList<>(); - try { - if (latestSnapshot != null) { - // read all previous manifest files - oldMetas.addAll(manifestList.read(latestSnapshot.manifestList())); - } - // merge manifest files with changes - newMetas.addAll( - ManifestFileMeta.merge( - oldMetas, - changes, - manifestFile, - fileStoreOptions.manifestSuggestedSize.getBytes())); - // prepare snapshot file - manifestListName = manifestList.write(newMetas); - newSnapshot = - new Snapshot( - newSnapshotId, - manifestListName, - commitUser, - hash, - commitKind, - System.currentTimeMillis()); - FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson()); - } catch (Throwable e) { - // fails when preparing for commit, we should clean up - cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, newMetas); - throw new RuntimeException( - String.format( - "Exception occurs when preparing snapshot #%d (path %s) by user %s " - + "with hash %s and kind %s. Clean up.", - newSnapshotId, - newSnapshotPath.toString(), - commitUser, - hash, - commitKind.name()), - e); + Snapshot latestSnapshot = null; + if (latestSnapshotId != null) { + if (checkDeletedFiles) { + noConflictsOrFail(latestSnapshotId, changes); } + latestSnapshot = Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId)); + } - boolean success; - try { - FileSystem fs = tmpSnapshotPath.getFileSystem(); - // atomic rename - if (lock != null) { - success = - lock.runWithLock( - () -> - // fs.rename may not returns false if target file - // already exists, or even not atomic - // as we're relying on external locking, we can first - // check if file exist then rename to work around this - // case - !fs.exists(newSnapshotPath) - && fs.rename(tmpSnapshotPath, newSnapshotPath)); - } else { - success = fs.rename(tmpSnapshotPath, newSnapshotPath); - } - } catch (Throwable e) { - // exception when performing the atomic rename, - // we cannot clean up because we can't determine the success - throw new RuntimeException( - String.format( - "Exception occurs when committing snapshot #%d (path %s) by user %s " - + "with hash %s and kind %s. " - + "Cannot clean up because we can't determine the success.", - newSnapshotId, - newSnapshotPath.toString(), - commitUser, - hash, - commitKind.name()), - e); + Snapshot newSnapshot; + String manifestListName = null; + List<ManifestFileMeta> oldMetas = new ArrayList<>(); + List<ManifestFileMeta> newMetas = new ArrayList<>(); + try { + if (latestSnapshot != null) { + // read all previous manifest files + oldMetas.addAll(manifestList.read(latestSnapshot.manifestList())); } + // merge manifest files with changes + newMetas.addAll( + ManifestFileMeta.merge( + oldMetas, + changes, + manifestFile, + fileStoreOptions.manifestSuggestedSize.getBytes())); + // prepare snapshot file + manifestListName = manifestList.write(newMetas); + newSnapshot = + new Snapshot( + newSnapshotId, + manifestListName, + commitUser, + hash, + commitKind, + System.currentTimeMillis()); + FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson()); + } catch (Throwable e) { + // fails when preparing for commit, we should clean up + cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, newMetas); + throw new RuntimeException( + String.format( + "Exception occurs when preparing snapshot #%d (path %s) by user %s " + + "with hash %s and kind %s. Clean up.", + newSnapshotId, + newSnapshotPath.toString(), + commitUser, + hash, + commitKind.name()), + e); + } - if (success) { - return; + boolean success; + try { + FileSystem fs = tmpSnapshotPath.getFileSystem(); + // atomic rename + if (lock != null) { + success = + lock.runWithLock( + () -> + // fs.rename may not returns false if target file + // already exists, or even not atomic + // as we're relying on external locking, we can first + // check if file exist then rename to work around this + // case + !fs.exists(newSnapshotPath) + && fs.rename(tmpSnapshotPath, newSnapshotPath)); + } else { + success = fs.rename(tmpSnapshotPath, newSnapshotPath); } - - // atomic rename fails, clean up and try again - LOG.warn( + } catch (Throwable e) { + // exception when performing the atomic rename, + // we cannot clean up because we can't determine the success + throw new RuntimeException( String.format( - "Atomic rename failed for snapshot #%d (path %s) by user %s " + "Exception occurs when committing snapshot #%d (path %s) by user %s " + "with hash %s and kind %s. " - + "Clean up and try again.", + + "Cannot clean up because we can't determine the success.", newSnapshotId, newSnapshotPath.toString(), commitUser, hash, - commitKind.name())); - cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, newMetas); + commitKind.name()), + e); + } + + if (success) { + return true; } + + // atomic rename fails, clean up and try again + LOG.warn( + String.format( + "Atomic rename failed for snapshot #%d (path %s) by user %s " + + "with hash %s and kind %s. " + + "Clean up and try again.", + newSnapshotId, + newSnapshotPath.toString(), + commitUser, + hash, + commitKind.name())); + cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, newMetas); + return false; } private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) { diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java new file mode 100644 index 0000000..a27542c --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/TypeUtils.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.binary.BinaryStringDataUtil; +import org.apache.flink.table.store.file.predicate.And; +import org.apache.flink.table.store.file.predicate.Equal; +import org.apache.flink.table.store.file.predicate.Literal; +import org.apache.flink.table.store.file.predicate.Predicate; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.List; +import java.util.Map; + +/** Utils for parsing among different types. */ +public class TypeUtils { + + public static Predicate partitionMapToPredicate( + Map<String, String> partition, RowType partitionType) { + List<String> fieldNames = partitionType.getFieldNames(); + Predicate predicate = null; + for (Map.Entry<String, String> entry : partition.entrySet()) { + int idx = fieldNames.indexOf(entry.getKey()); + LogicalType type = partitionType.getTypeAt(idx); + Literal literal = new Literal(type, castFromString(entry.getValue(), type)); + if (predicate == null) { + predicate = new Equal(idx, literal); + } else { + predicate = new And(predicate, new Equal(idx, literal)); + } + } + return predicate; + } + + private static Object castFromString(String s, LogicalType type) { + BinaryStringData str = BinaryStringData.fromString(s); + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return str; + case BOOLEAN: + return BinaryStringDataUtil.toBoolean(str); + case BINARY: + case VARBINARY: + // this implementation does not match the new behavior of StringToBinaryCastRule, + // change this if needed + return s.getBytes(); + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + return BinaryStringDataUtil.toDecimal( + str, decimalType.getPrecision(), decimalType.getScale()); + case TINYINT: + return BinaryStringDataUtil.toByte(str); + case SMALLINT: + return BinaryStringDataUtil.toShort(str); + case INTEGER: + return BinaryStringDataUtil.toInt(str); + case BIGINT: + return BinaryStringDataUtil.toLong(str); + case FLOAT: + return BinaryStringDataUtil.toFloat(str); + case DOUBLE: + return BinaryStringDataUtil.toDouble(str); + case DATE: + return BinaryStringDataUtil.toDate(str); + case TIME_WITHOUT_TIME_ZONE: + return BinaryStringDataUtil.toTime(str); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return BinaryStringDataUtil.toTimestamp(str); + default: + throw new UnsupportedOperationException("Unsupported type " + type.toString()); + } + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java index 3f6ad2f..bb73ea7 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java @@ -34,7 +34,9 @@ import org.apache.flink.table.types.logical.VarCharType; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; /** Random {@link KeyValue} generator. */ @@ -159,6 +161,13 @@ public class TestKeyValueGenerator { .copy(); } + public static Map<String, String> toPartitionMap(BinaryRowData partition) { + Map<String, String> map = new HashMap<>(); + map.put("dt", partition.getString(0).toString()); + map.put("hr", String.valueOf(partition.getInt(1))); + return map; + } + public void sort(List<KeyValue> kvs) { kvs.sort( (a, b) -> { diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java similarity index 57% rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java index 0851ff4..927e211 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java @@ -30,8 +30,10 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,13 +50,13 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link FileStoreCommitImpl}. */ -public abstract class FileStoreCommitTestBase { +public class FileStoreCommitTest { - private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitTestBase.class); + private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitTest.class); private final FileFormat avro = FileFormat.fromIdentifier( - FileStoreCommitTestBase.class.getClassLoader(), "avro", new Configuration()); + FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration()); private TestKeyValueGenerator gen; @TempDir java.nio.file.Path tempDir; @@ -64,21 +66,26 @@ public abstract class FileStoreCommitTestBase { gen = new TestKeyValueGenerator(); Path root = new Path(tempDir.toString()); root.getFileSystem().mkdirs(new Path(root + "/snapshot")); + // for failure tests + FailingAtomicRenameFileSystem.resetFailCounter(100); + FailingAtomicRenameFileSystem.setFailPossibility(5000); } - protected abstract String getScheme(); - - @RepeatedTest(10) - public void testSingleCommitUser() throws Exception { - testRandomConcurrentNoConflict(1); + @ParameterizedTest + @ValueSource( + strings = {TestAtomicRenameFileSystem.SCHEME, FailingAtomicRenameFileSystem.SCHEME}) + public void testSingleCommitUser(String scheme) throws Exception { + testRandomConcurrentNoConflict(1, scheme); } - @RepeatedTest(10) - public void testManyCommitUsersNoConflict() throws Exception { - testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) + 2); + @ParameterizedTest + @ValueSource( + strings = {TestAtomicRenameFileSystem.SCHEME, FailingAtomicRenameFileSystem.SCHEME}) + public void testManyCommitUsersNoConflict(String scheme) throws Exception { + testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) + 2, scheme); } - protected void testRandomConcurrentNoConflict(int numThreads) throws Exception { + protected void testRandomConcurrentNoConflict(int numThreads, String scheme) throws Exception { // prepare test data Map<BinaryRowData, List<KeyValue>> data = generateData(ThreadLocalRandom.current().nextInt(1000) + 1); @@ -88,11 +95,6 @@ public abstract class FileStoreCommitTestBase { .flatMap(Collection::stream) .collect(Collectors.toList()), "input"); - Map<BinaryRowData, BinaryRowData> expected = - OperationTestUtils.toKvMap( - data.values().stream() - .flatMap(Collection::stream) - .collect(Collectors.toList())); List<Map<BinaryRowData, List<KeyValue>>> dataPerThread = new ArrayList<>(); for (int i = 0; i < numThreads; i++) { @@ -110,15 +112,26 @@ public abstract class FileStoreCommitTestBase { TestCommitThread thread = new TestCommitThread( dataPerThread.get(i), - OperationTestUtils.createPathFactory(getScheme(), tempDir.toString()), + OperationTestUtils.createPathFactory(scheme, tempDir.toString()), OperationTestUtils.createPathFactory( TestAtomicRenameFileSystem.SCHEME, tempDir.toString())); thread.start(); threads.add(thread); } + + // calculate expected results + Map<BinaryRowData, List<KeyValue>> threadResults = new HashMap<>(); for (TestCommitThread thread : threads) { thread.join(); + for (Map.Entry<BinaryRowData, List<KeyValue>> entry : thread.getResult().entrySet()) { + threadResults.put(entry.getKey(), entry.getValue()); + } } + Map<BinaryRowData, BinaryRowData> expected = + OperationTestUtils.toKvMap( + threadResults.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList())); // read actual data and compare FileStorePathFactory safePathFactory = @@ -136,6 +149,79 @@ public abstract class FileStoreCommitTestBase { assertThat(actual).isEqualTo(expected); } + @Test + public void testOverwritePartialCommit() throws Exception { + Map<BinaryRowData, List<KeyValue>> data1 = + generateData(ThreadLocalRandom.current().nextInt(1000) + 1); + logData( + () -> + data1.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()), + "data1"); + + FileStorePathFactory pathFactory = + OperationTestUtils.createPathFactory( + TestAtomicRenameFileSystem.SCHEME, tempDir.toString()); + OperationTestUtils.commitData( + data1.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), + gen::getPartition, + kv -> 0, + avro, + pathFactory); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + String dtToOverwrite = + new ArrayList<>(data1.keySet()) + .get(random.nextInt(data1.size())) + .getString(0) + .toString(); + Map<String, String> partitionToOverwrite = new HashMap<>(); + partitionToOverwrite.put("dt", dtToOverwrite); + if (LOG.isDebugEnabled()) { + LOG.debug("dtToOverwrite " + dtToOverwrite); + } + + Map<BinaryRowData, List<KeyValue>> data2 = + generateData(ThreadLocalRandom.current().nextInt(1000) + 1); + // remove all records not belonging to dtToOverwrite + data2.entrySet().removeIf(e -> !dtToOverwrite.equals(e.getKey().getString(0).toString())); + logData( + () -> + data2.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()), + "data2"); + OperationTestUtils.overwriteData( + data2.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), + gen::getPartition, + kv -> 0, + avro, + pathFactory, + partitionToOverwrite); + + List<KeyValue> expectedKvs = new ArrayList<>(); + for (Map.Entry<BinaryRowData, List<KeyValue>> entry : data1.entrySet()) { + if (dtToOverwrite.equals(entry.getKey().getString(0).toString())) { + continue; + } + expectedKvs.addAll(entry.getValue()); + } + data2.values().forEach(expectedKvs::addAll); + gen.sort(expectedKvs); + Map<BinaryRowData, BinaryRowData> expected = OperationTestUtils.toKvMap(expectedKvs); + + List<KeyValue> actualKvs = + OperationTestUtils.readKvsFromSnapshot( + pathFactory.latestSnapshotId(), avro, pathFactory); + gen.sort(actualKvs); + Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs); + + logData(() -> kvMapToKvList(expected), "expected"); + logData(() -> kvMapToKvList(actual), "actual"); + assertThat(actual).isEqualTo(expected); + } + private Map<BinaryRowData, List<KeyValue>> generateData(int numRecords) { Map<BinaryRowData, List<KeyValue>> data = new HashMap<>(); for (int i = 0; i < numRecords; i++) { @@ -162,30 +248,4 @@ public abstract class FileStoreCommitTestBase { } LOG.debug("========== End of " + name + " =========="); } - - /** Tests for {@link FileStoreCommitImpl} with {@link TestAtomicRenameFileSystem}. */ - public static class WithTestAtomicRenameFileSystem extends FileStoreCommitTestBase { - - @Override - protected String getScheme() { - return TestAtomicRenameFileSystem.SCHEME; - } - } - - /** Tests for {@link FileStoreCommitImpl} with {@link FailingAtomicRenameFileSystem}. */ - public static class WithFailingAtomicRenameFileSystem extends FileStoreCommitTestBase { - - @BeforeEach - @Override - public void beforeEach() throws IOException { - super.beforeEach(); - FailingAtomicRenameFileSystem.resetFailCounter(100); - FailingAtomicRenameFileSystem.setFailPossibility(5000); - } - - @Override - protected String getScheme() { - return FailingAtomicRenameFileSystem.SCHEME; - } - } } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java index 16a7ab1..de8f410 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java @@ -46,7 +46,7 @@ public class FileStoreExpireTest { private final FileFormat avro = FileFormat.fromIdentifier( - FileStoreCommitTestBase.class.getClassLoader(), "avro", new Configuration()); + FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration()); private TestKeyValueGenerator gen; @TempDir java.nio.file.Path tempDir; @@ -187,7 +187,7 @@ public class FileStoreExpireTest { } allData.addAll(data); List<Snapshot> snapshots = - OperationTestUtils.writeAndCommitData( + OperationTestUtils.commitData( data, gen::getPartition, kv -> 0, avro, pathFactory); for (int j = 0; j < snapshots.size(); j++) { snapshotPositions.add(allData.size()); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java index 8a12007..0d1a76c 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java @@ -53,7 +53,7 @@ public class FileStoreScanTest { private final FileFormat avro = FileFormat.fromIdentifier( - FileStoreCommitTestBase.class.getClassLoader(), "avro", new Configuration()); + FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration()); private TestKeyValueGenerator gen; @TempDir java.nio.file.Path tempDir; @@ -199,7 +199,7 @@ public class FileStoreScanTest { private Snapshot writeData(List<KeyValue> kvs) throws Exception { List<Snapshot> snapshots = - OperationTestUtils.writeAndCommitData( + OperationTestUtils.commitData( kvs, gen::getPartition, this::getBucket, avro, pathFactory); return snapshots.get(snapshots.size() - 1); } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java index ef93d26..6a62421 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java @@ -28,7 +28,6 @@ import org.apache.flink.table.store.file.KeyValue; import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.table.store.file.TestKeyValueGenerator; import org.apache.flink.table.store.file.manifest.ManifestCommittable; -import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer; import org.apache.flink.table.store.file.manifest.ManifestEntry; import org.apache.flink.table.store.file.manifest.ManifestFile; import org.apache.flink.table.store.file.manifest.ManifestList; @@ -40,6 +39,7 @@ import org.apache.flink.table.store.file.mergetree.sst.SstFile; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReaderIterator; import org.apache.flink.table.store.file.utils.RecordWriter; +import org.apache.flink.util.function.QuadFunction; import java.io.IOException; import java.util.ArrayList; @@ -51,6 +51,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiConsumer; import java.util.function.Function; /** Utils for operation tests. */ @@ -85,18 +86,13 @@ public class OperationTestUtils { public static FileStoreCommit createCommit( FileFormat fileFormat, FileStorePathFactory pathFactory) { - ManifestCommittableSerializer serializer = - new ManifestCommittableSerializer( - TestKeyValueGenerator.PARTITION_TYPE, - TestKeyValueGenerator.KEY_TYPE, - TestKeyValueGenerator.ROW_TYPE); ManifestFile.Factory testManifestFileFactory = createManifestFileFactory(fileFormat, pathFactory); ManifestList.Factory testManifestListFactory = createManifestListFactory(fileFormat, pathFactory); return new FileStoreCommitImpl( UUID.randomUUID().toString(), - serializer, + TestKeyValueGenerator.PARTITION_TYPE, pathFactory, testManifestFileFactory, testManifestListFactory, @@ -153,13 +149,52 @@ public class OperationTestUtils { TestKeyValueGenerator.PARTITION_TYPE, fileFormat, pathFactory); } - public static List<Snapshot> writeAndCommitData( + public static List<Snapshot> commitData( List<KeyValue> kvs, Function<KeyValue, BinaryRowData> partitionCalculator, Function<KeyValue, Integer> bucketCalculator, FileFormat fileFormat, FileStorePathFactory pathFactory) throws Exception { + return commitDataImpl( + kvs, + partitionCalculator, + bucketCalculator, + fileFormat, + pathFactory, + FileStoreWrite::createWriter, + (commit, committable) -> commit.commit(committable, Collections.emptyMap())); + } + + public static List<Snapshot> overwriteData( + List<KeyValue> kvs, + Function<KeyValue, BinaryRowData> partitionCalculator, + Function<KeyValue, Integer> bucketCalculator, + FileFormat fileFormat, + FileStorePathFactory pathFactory, + Map<String, String> partition) + throws Exception { + return commitDataImpl( + kvs, + partitionCalculator, + bucketCalculator, + fileFormat, + pathFactory, + FileStoreWrite::createEmptyWriter, + (commit, committable) -> + commit.overwrite(partition, committable, Collections.emptyMap())); + } + + private static List<Snapshot> commitDataImpl( + List<KeyValue> kvs, + Function<KeyValue, BinaryRowData> partitionCalculator, + Function<KeyValue, Integer> bucketCalculator, + FileFormat fileFormat, + FileStorePathFactory pathFactory, + QuadFunction<FileStoreWrite, BinaryRowData, Integer, ExecutorService, RecordWriter> + createWriterFunction, + BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction) + throws Exception { FileStoreWrite write = createWrite(fileFormat, pathFactory); Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>(); for (KeyValue kv : kvs) { @@ -171,7 +206,8 @@ public class OperationTestUtils { (b, w) -> { if (w == null) { ExecutorService service = Executors.newSingleThreadExecutor(); - return write.createWriter(partition, bucket, service); + return createWriterFunction.apply( + write, partition, bucket, service); } else { return w; } @@ -194,7 +230,7 @@ public class OperationTestUtils { if (snapshotIdBeforeCommit == null) { snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1; } - commit.commit(committable, Collections.emptyMap()); + commitFunction.accept(commit, committable); Long snapshotIdAfterCommit = pathFactory.latestSnapshotId(); if (snapshotIdAfterCommit == null) { snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1; diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java index 5d96cf5..8033b40 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.store.file.FileFormat; import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.TestKeyValueGenerator; import org.apache.flink.table.store.file.manifest.ManifestCommittable; import org.apache.flink.table.store.file.mergetree.MergeTreeWriter; import org.apache.flink.table.store.file.utils.FileStorePathFactory; @@ -44,6 +45,7 @@ public class TestCommitThread extends Thread { private static final Logger LOG = LoggerFactory.getLogger(TestCommitThread.class); private final Map<BinaryRowData, List<KeyValue>> data; + private final Map<BinaryRowData, List<KeyValue>> result; private final Map<BinaryRowData, MergeTreeWriter> writers; private final FileStoreWrite write; @@ -54,46 +56,33 @@ public class TestCommitThread extends Thread { FileStorePathFactory testPathFactory, FileStorePathFactory safePathFactory) { this.data = data; + this.result = new HashMap<>(); this.writers = new HashMap<>(); FileFormat avro = FileFormat.fromIdentifier( - FileStoreCommitTestBase.class.getClassLoader(), - "avro", - new Configuration()); + FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration()); this.write = OperationTestUtils.createWrite(avro, safePathFactory); this.commit = OperationTestUtils.createCommit(avro, testPathFactory); } + public Map<BinaryRowData, List<KeyValue>> getResult() { + return result; + } + @Override public void run() { while (!data.isEmpty()) { - ManifestCommittable committable; try { - committable = createCommittable(); + if (ThreadLocalRandom.current().nextInt(5) == 0) { + // 20% probability to overwrite + doOverwrite(); + } else { + doCommit(); + } } catch (Exception e) { throw new RuntimeException(e); } - - boolean shouldCheckFilter = false; - while (true) { - try { - if (shouldCheckFilter) { - if (commit.filterCommitted(Collections.singletonList(committable)) - .isEmpty()) { - break; - } - } - commit.commit(committable, Collections.emptyMap()); - break; - } catch (Throwable e) { - if (LOG.isDebugEnabled()) { - LOG.warn("Failed to commit because of exception, try again", e); - } - writers.clear(); - shouldCheckFilter = true; - } - } } for (MergeTreeWriter writer : writers.values()) { @@ -105,29 +94,87 @@ public class TestCommitThread extends Thread { } } - private ManifestCommittable createCommittable() throws Exception { + private void doCommit() throws Exception { int numWrites = ThreadLocalRandom.current().nextInt(3) + 1; for (int i = 0; i < numWrites && !data.isEmpty(); i++) { writeData(); } - ManifestCommittable committable = new ManifestCommittable(); for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : writers.entrySet()) { committable.add(entry.getKey(), 0, entry.getValue().prepareCommit()); } - return committable; + + runWithRetry(committable, () -> commit.commit(committable, Collections.emptyMap())); + } + + private void doOverwrite() throws Exception { + BinaryRowData partition = overwriteData(); + ManifestCommittable committable = new ManifestCommittable(); + committable.add(partition, 0, writers.get(partition).prepareCommit()); + + runWithRetry( + committable, + () -> + commit.overwrite( + TestKeyValueGenerator.toPartitionMap(partition), + committable, + Collections.emptyMap())); + } + + private void runWithRetry(ManifestCommittable committable, Runnable runnable) { + boolean shouldCheckFilter = false; + while (true) { + try { + if (shouldCheckFilter) { + if (commit.filterCommitted(Collections.singletonList(committable)).isEmpty()) { + break; + } + } + runnable.run(); + break; + } catch (Throwable e) { + if (LOG.isDebugEnabled()) { + LOG.warn("Failed to commit because of exception, try again", e); + } + writers.clear(); + shouldCheckFilter = true; + } + } } private void writeData() throws Exception { List<KeyValue> changes = new ArrayList<>(); BinaryRowData partition = pickData(changes); + result.compute(partition, (p, l) -> l == null ? new ArrayList<>() : l).addAll(changes); + MergeTreeWriter writer = - writers.compute(partition, (k, v) -> v == null ? createWriter(k) : v); + writers.compute(partition, (p, w) -> w == null ? createWriter(p, false) : w); for (KeyValue kv : changes) { writer.write(kv.valueKind(), kv.key(), kv.value()); } } + private BinaryRowData overwriteData() throws Exception { + List<KeyValue> changes = new ArrayList<>(); + BinaryRowData partition = pickData(changes); + List<KeyValue> resultOfPartition = + result.compute(partition, (p, l) -> l == null ? new ArrayList<>() : l); + resultOfPartition.clear(); + resultOfPartition.addAll(changes); + + if (writers.containsKey(partition)) { + MergeTreeWriter oldWriter = writers.get(partition); + oldWriter.close(); + } + MergeTreeWriter writer = createWriter(partition, true); + writers.put(partition, writer); + for (KeyValue kv : changes) { + writer.write(kv.valueKind(), kv.key(), kv.value()); + } + + return partition; + } + private BinaryRowData pickData(List<KeyValue> changes) { List<BinaryRowData> keys = new ArrayList<>(data.keySet()); BinaryRowData partition = keys.get(ThreadLocalRandom.current().nextInt(keys.size())); @@ -142,7 +189,7 @@ public class TestCommitThread extends Thread { return partition; } - private MergeTreeWriter createWriter(BinaryRowData partition) { + private MergeTreeWriter createWriter(BinaryRowData partition, boolean empty) { ExecutorService service = Executors.newSingleThreadExecutor( r -> { @@ -150,6 +197,10 @@ public class TestCommitThread extends Thread { t.setName(Thread.currentThread().getName() + "-writer-service-pool"); return t; }); - return (MergeTreeWriter) write.createWriter(partition, 0, service); + if (empty) { + return (MergeTreeWriter) write.createEmptyWriter(partition, 0, service); + } else { + return (MergeTreeWriter) write.createWriter(partition, 0, service); + } } } diff --git a/flink-table-store-core/src/test/resources/log4j2-test.xml b/flink-table-store-core/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000..ae98052 --- /dev/null +++ b/flink-table-store-core/src/test/resources/log4j2-test.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> + </Console> + </Appenders> + <Loggers> + <!-- <Logger name="org.apache.flink.table.store.file.operation" level="DEBUG" /> --> + </Loggers> +</Configuration> \ No newline at end of file