This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.0 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 65e9ba80ddb6f96c454a2038ed8702693bcc620f Author: yuzelin <[email protected]> AuthorDate: Fri Jan 3 15:29:25 2025 +0800 [core] Refactor TagManager to remove unnecessary tag existence check (#4820) --- .../paimon/utils/SupplierWithIOException.java | 29 ++++++++ .../apache/paimon/operation/OrphanFilesClean.java | 12 +-- .../paimon/table/AbstractFileStoreTable.java | 8 +- .../snapshot/IncrementalTagStartingScanner.java | 33 +++----- .../snapshot/StaticFromTagStartingScanner.java | 2 +- .../table/source/snapshot/TimeTravelUtil.java | 2 +- .../org/apache/paimon/table/system/TagsTable.java | 25 +++---- .../org/apache/paimon/tag/TagAutoCreation.java | 5 +- .../org/apache/paimon/utils/BranchManager.java | 4 +- .../java/org/apache/paimon/utils/TagManager.java | 87 ++++++++++++---------- .../paimon/operation/ExpireSnapshotsTest.java | 3 +- .../apache/paimon/operation/FileDeletionTest.java | 10 +-- .../paimon/table/FileStoreTableTestBase.java | 17 ++--- .../paimon/table/IndexFileExpireTableTest.java | 6 +- .../org/apache/paimon/tag/TagAutoManagerTest.java | 6 +- .../org/apache/paimon/utils/TagManagerTest.java | 11 ++- .../org/apache/paimon/utils/TraceableFileIO.java | 32 ++------ .../ProcedurePositionalArgumentsITCase.java | 2 +- .../apache/paimon/flink/clone/PickFilesUtil.java | 11 +-- .../sink/AutoTagForSavepointCommitterOperator.java | 5 +- .../flink/sink/BatchWriteGeneratorTagOperator.java | 9 +-- .../action/CreateTagFromWatermarkActionITTest.java | 19 +++-- .../paimon/flink/action/ExpireTagsActionTest.java | 2 +- .../paimon/flink/action/ReplaceTagActionTest.java | 12 +-- .../CreateTagFromTimestampProcedureITCase.java | 2 +- .../CreateTagFromWatermarkProcedureITCase.java | 2 +- .../flink/procedure/ExpireTagsProcedureITCase.java | 4 +- .../flink/procedure/ReplaceTagProcedureITCase.java | 2 +- .../sink/BatchWriteGeneratorTagOperatorTest.java | 3 +- .../CreateTagFromTimestampProcedureTest.scala | 2 +- .../spark/procedure/ExpireTagsProcedureTest.scala | 4 +- 31 files changed, 189 insertions(+), 182 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/SupplierWithIOException.java b/paimon-common/src/main/java/org/apache/paimon/utils/SupplierWithIOException.java new file mode 100644 index 0000000000..f5c79817e4 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/SupplierWithIOException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.io.IOException; +import java.util.function.Supplier; + +/** A {@link Supplier} with {@link IOException}. */ +@FunctionalInterface +public interface SupplierWithIOException<T> { + + T get() throws IOException; +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index fc2e1200f0..c2b9be4c27 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -37,6 +37,7 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SerializableConsumer; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.SupplierWithIOException; import org.apache.paimon.utils.TagManager; import org.slf4j.Logger; @@ -322,13 +323,13 @@ public abstract class OrphanFilesClean implements Serializable { * {@link FileNotFoundException}, return default value. Finally, if retry times reaches the * limits, rethrow the IOException. */ - protected static <T> T retryReadingFiles(ReaderWithIOException<T> reader, T defaultValue) + protected static <T> T retryReadingFiles(SupplierWithIOException<T> reader, T defaultValue) throws IOException { int retryNumber = 0; IOException caught = null; while (retryNumber++ < READ_FILE_RETRY_NUM) { try { - return reader.read(); + return reader.get(); } catch (FileNotFoundException e) { return defaultValue; } catch (IOException e) { @@ -349,13 +350,6 @@ public abstract class OrphanFilesClean implements Serializable { return status.getModificationTime() < olderThanMillis; } - /** A helper functional interface for method {@link #retryReadingFiles}. */ - @FunctionalInterface - protected interface ReaderWithIOException<T> { - - T read() throws IOException; - } - public static SerializableConsumer<Path> createFileCleaner( Catalog catalog, @Nullable Boolean dryRun) { SerializableConsumer<Path> fileCleaner; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 7e008698c4..935469a819 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -541,7 +541,7 @@ abstract class AbstractFileStoreTable implements FileStoreTable { } private Optional<TableSchema> travelToTag(String tagName, Options options) { - return travelToSnapshot(tagManager().taggedSnapshot(tagName), options); + return travelToSnapshot(tagManager().getOrThrow(tagName).trimToSnapshot(), options); } private Optional<TableSchema> travelToSnapshot(long snapshotId, Options options) { @@ -633,7 +633,9 @@ abstract class AbstractFileStoreTable implements FileStoreTable { } private void createTag(String tagName, Snapshot fromSnapshot, @Nullable Duration timeRetained) { - tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks()); + tagManager() + .createTag( + fromSnapshot, tagName, timeRetained, store().createTagCallbacks(), false); } @Override @@ -689,7 +691,7 @@ abstract class AbstractFileStoreTable implements FileStoreTable { TagManager tagManager = tagManager(); checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' doesn't exist.", tagName); - Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName); + Snapshot taggedSnapshot = tagManager.getOrThrow(tagName).trimToSnapshot(); rollbackHelper().cleanLargerThan(taggedSnapshot); try { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index 2cdf5bff9d..e08ac9f44c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -25,36 +25,27 @@ import org.apache.paimon.utils.TagManager; /** {@link StartingScanner} for incremental changes by tag. */ public class IncrementalTagStartingScanner extends AbstractStartingScanner { - private final String start; - private final String end; + private final Snapshot start; + private final Snapshot end; public IncrementalTagStartingScanner( - SnapshotManager snapshotManager, String start, String end) { + SnapshotManager snapshotManager, String startTagName, String endTagName) { super(snapshotManager); - this.start = start; - this.end = end; TagManager tagManager = new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - Snapshot startingSnapshot = tagManager.taggedSnapshot(start); - if (startingSnapshot != null) { - this.startingSnapshotId = startingSnapshot.id(); - } - } - - @Override - public Result scan(SnapshotReader reader) { - TagManager tagManager = - new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - Snapshot tag1 = tagManager.taggedSnapshot(start); - Snapshot tag2 = tagManager.taggedSnapshot(end); - - if (tag2.id() <= tag1.id()) { + start = tagManager.getOrThrow(startTagName).trimToSnapshot(); + end = tagManager.getOrThrow(endTagName).trimToSnapshot(); + if (end.id() <= start.id()) { throw new IllegalArgumentException( String.format( "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s", - end, tag2.id(), start, tag1.id())); + endTagName, end.id(), startTagName, start.id())); } + this.startingSnapshotId = start.id(); + } - return StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1)); + @Override + public Result scan(SnapshotReader reader) { + return StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start)); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java index 4fa070299f..b22e17e9a0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java @@ -43,7 +43,7 @@ public class StaticFromTagStartingScanner extends ReadPlanStartingScanner { public SnapshotReader configure(SnapshotReader snapshotReader) { TagManager tagManager = new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - Snapshot snapshot = tagManager.taggedSnapshot(tagName); + Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot(); return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java index 4c8b41aa42..5b4ee4e58c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java @@ -122,6 +122,6 @@ public class TimeTravelUtil { String tagName = options.scanTagName(); TagManager tagManager = new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - return tagManager.taggedSnapshot(tagName); + return tagManager.getOrThrow(tagName).trimToSnapshot(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index 9aafdb5983..8f28be8af2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -59,6 +59,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -227,27 +228,25 @@ public class TagsTable implements ReadonlyTable { && ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString && predicate.visit(LeafPredicateExtractor.INSTANCE).get(TAG_NAME) != null) { String equalValue = ((LeafPredicate) predicate).literals().get(0).toString(); - if (tagManager.tagExists(equalValue)) { - predicateMap.put(equalValue, tagManager.tag(equalValue)); - } + tagManager.get(equalValue).ifPresent(tag -> predicateMap.put(equalValue, tag)); } if (predicate instanceof CompoundPredicate) { CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; // optimize for IN filter if ((compoundPredicate.function()) instanceof Or) { + List<String> tagNames = new ArrayList<>(); InPredicateVisitor.extractInElements(predicate, TAG_NAME) .ifPresent( - leafs -> - leafs.forEach( - leaf -> { - String leftName = leaf.toString(); - if (tagManager.tagExists(leftName)) { - predicateMap.put( - leftName, - tagManager.tag(leftName)); - } - })); + e -> + e.stream() + .map(Object::toString) + .forEach(tagNames::add)); + tagNames.forEach( + name -> + tagManager + .get(name) + .ifPresent(value -> predicateMap.put(name, value))); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 3989786bd2..5232e89a8b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -167,9 +167,8 @@ public class TagAutoCreation { } String tagName = periodHandler.timeToTag(thisTag); LOG.info("The tag name is {}.", tagName); - if (!tagManager.tagExists(tagName)) { - tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks); - } + // shouldn't throw exception when tag exists + tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks, true); nextTag = periodHandler.nextTagTime(thisTag); LOG.info("The next tag time after this is {}.", nextTag); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 2ea5f542f4..7dfb30e698 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -107,9 +107,7 @@ public class BranchManager { public void createBranch(String branchName, String tagName) { validateBranch(branchName); - checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); - - Snapshot snapshot = tagManager.taggedSnapshot(tagName); + Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot(); try { // Copy the corresponding tag, snapshot and schema files into the branch directory diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 4019395d8d..4713703bbd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -41,6 +41,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -97,31 +98,34 @@ public class TagManager { /** Create a tag from given snapshot and save it in the storage. */ public void createTag( - Snapshot snapshot, String tagName, Duration timeRetained, List<TagCallback> callbacks) { - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName); - checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName); + Snapshot snapshot, + String tagName, + Duration timeRetained, + List<TagCallback> callbacks, + boolean ignoreIfExists) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank."); + if (tagExists(tagName)) { + checkArgument(ignoreIfExists, "Tag '%s' already exists.", tagName); + return; + } createOrReplaceTag(snapshot, tagName, timeRetained, callbacks); } /** Replace a tag from given snapshot and save it in the storage. */ public void replaceTag(Snapshot snapshot, String tagName, Duration timeRetained) { - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName); - checkArgument(tagExists(tagName), "Tag name '%s' does not exist.", tagName); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank."); + checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); createOrReplaceTag(snapshot, tagName, timeRetained, null); } - public void createOrReplaceTag( + private void createOrReplaceTag( Snapshot snapshot, String tagName, @Nullable Duration timeRetained, @Nullable List<TagCallback> callbacks) { - // When timeRetained is not defined, please do not write the tagCreatorTime field, - // as this will cause older versions (<= 0.7) of readers to be unable to read this - // tag. - // When timeRetained is defined, it is fine, because timeRetained is the new - // feature. + // When timeRetained is not defined, please do not write the tagCreatorTime field, as this + // will cause older versions (<= 0.7) of readers to be unable to read this tag. + // When timeRetained is defined, it is fine, because timeRetained is the new feature. String content = timeRetained != null ? Tag.fromSnapshotAndTagTtl(snapshot, timeRetained, LocalDateTime.now()) @@ -152,17 +156,17 @@ public class TagManager { } public void renameTag(String tagName, String targetTagName) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tagName), + "Original tag name shouldn't be blank."); + checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(targetTagName), + "New tag name shouldn't be blank."); + checkArgument(!tagExists(targetTagName), "Tag '%s' already exists.", tagName); + try { - if (!tagExists(tagName)) { - throw new RuntimeException( - String.format("The specified tag name [%s] does not exist.", tagName)); - } - if (tagExists(targetTagName)) { - throw new RuntimeException( - String.format( - "The specified target tag name [%s] existed, please set a non-existent tag name.", - targetTagName)); - } fileIO.rename(tagPath(tagName), tagPath(targetTagName)); } catch (IOException e) { throw new RuntimeException(e); @@ -172,7 +176,7 @@ public class TagManager { /** Make sure the tagNames are ALL tags of one snapshot. */ public void deleteAllTagsOfOneSnapshot( List<String> tagNames, TagDeletion tagDeletion, SnapshotManager snapshotManager) { - Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0)); + Snapshot taggedSnapshot = getOrThrow(tagNames.get(0)).trimToSnapshot(); List<Snapshot> taggedSnapshots; // skip file deletion if snapshot exists @@ -188,19 +192,20 @@ public class TagManager { doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion); } + /** Ignore errors if the tag doesn't exist. */ public void deleteTag( String tagName, TagDeletion tagDeletion, SnapshotManager snapshotManager, List<TagCallback> callbacks) { - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName); - if (!tagExists(tagName)) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank."); + Optional<Tag> tag = get(tagName); + if (!tag.isPresent()) { LOG.warn("Tag '{}' doesn't exist.", tagName); return; } - Snapshot taggedSnapshot = taggedSnapshot(tagName); + Snapshot taggedSnapshot = tag.get().trimToSnapshot(); List<Snapshot> taggedSnapshots; // skip file deletion if snapshot exists @@ -303,10 +308,21 @@ public class TagManager { } } - /** Get the tagged snapshot by name. */ - public Snapshot taggedSnapshot(String tagName) { - checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot(); + /** Return the tag or Optional.empty() if the tag file not found. */ + public Optional<Tag> get(String tagName) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank."); + try { + return Optional.of(Tag.tryFromPath(fileIO, tagPath(tagName))); + } catch (FileNotFoundException e) { + return Optional.empty(); + } + } + + /** Return the tag or throw exception indicating the tag not found. */ + public Tag getOrThrow(String tagName) { + return get(tagName) + .orElseThrow( + () -> new IllegalArgumentException("Tag '" + tagName + "' doesn't exist.")); } public long tagCount() { @@ -410,12 +426,7 @@ public class TagManager { } throw new RuntimeException( String.format( - "Didn't find tag with snapshot id '%s'.This is unexpected.", + "Didn't find tag with snapshot id '%s'. This is unexpected.", taggedSnapshot.id())); } - - /** Read tag for tagName. */ - public Tag tag(String tagName) { - return Tag.fromPath(fileIO, tagPath(tagName)); - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index abff820b2c..5811fc3721 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -151,7 +151,8 @@ public class ExpireSnapshotsTest { snapshot, "tag" + id, store.options().tagDefaultTimeRetained(), - Collections.emptyList()); + Collections.emptyList(), + false); } // randomly expire snapshots diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 3a5ee93daa..45b53ddba4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -313,7 +313,7 @@ public class FileDeletionTest { // check manifests ManifestList manifestList = store.manifestListFactory().create(); for (String tagName : Arrays.asList("tag1", "tag2")) { - Snapshot snapshot = tagManager.taggedSnapshot(tagName); + Snapshot snapshot = tagManager.getOrThrow(tagName); List<Path> manifestFilePaths = manifestList.readDataManifests(snapshot).stream() .map(ManifestFileMeta::fileName) @@ -367,7 +367,7 @@ public class FileDeletionTest { FileStorePathFactory pathFactory = store.pathFactory(); assertPathExists(fileIO, pathFactory.bucketPath(partition, 0)); - Snapshot tag1 = tagManager.taggedSnapshot("tag1"); + Snapshot tag1 = tagManager.getOrThrow("tag1"); ManifestList manifestList = store.manifestListFactory().create(); List<Path> manifestFilePaths = manifestList.readDataManifests(tag1).stream() @@ -519,8 +519,8 @@ public class FileDeletionTest { assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 1)); // check manifests - Snapshot tag1 = tagManager.taggedSnapshot("tag1"); - Snapshot tag3 = tagManager.taggedSnapshot("tag3"); + Snapshot tag1 = tagManager.getOrThrow("tag1"); + Snapshot tag3 = tagManager.getOrThrow("tag3"); List<ManifestFileMeta> existing = manifestList.readDataManifests(tag1); existing.addAll(manifestList.readDataManifests(tag3)); for (ManifestFileMeta manifestFileMeta : snapshot2Data) { @@ -805,6 +805,6 @@ public class FileDeletionTest { } private void createTag(Snapshot snapshot, String tagName, Duration timeRetained) { - tagManager.createTag(snapshot, tagName, timeRetained, Collections.emptyList()); + tagManager.createTag(snapshot, tagName, timeRetained, Collections.emptyList(), false); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 75e284a68c..ecb42d7669 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1080,7 +1080,7 @@ public abstract class FileStoreTableTestBase { assertThat(tagManager.tagExists("test-tag")).isTrue(); // verify that test-tag is equal to snapshot 2 - Snapshot tagged = tagManager.taggedSnapshot("test-tag"); + Snapshot tagged = tagManager.getOrThrow("test-tag").trimToSnapshot(); Snapshot snapshot2 = table.snapshotManager().snapshot(2); assertThat(tagged.equals(snapshot2)).isTrue(); } @@ -1103,7 +1103,7 @@ public abstract class FileStoreTableTestBase { TagManager tagManager = new TagManager(new TraceableFileIO(), tablePath); assertThat(tagManager.tagExists("test-tag")).isTrue(); // verify that test-tag is equal to snapshot 1 - Snapshot tagged = tagManager.taggedSnapshot("test-tag"); + Snapshot tagged = tagManager.getOrThrow("test-tag").trimToSnapshot(); Snapshot snapshot1 = table.snapshotManager().snapshot(1); assertThat(tagged.equals(snapshot1)).isTrue(); // snapshot 2 @@ -1116,7 +1116,7 @@ public abstract class FileStoreTableTestBase { // verify that tag file exist assertThat(tagManager.tagExists("test-tag-2")).isTrue(); // verify that test-tag is equal to snapshot 1 - Snapshot tag2 = tagManager.taggedSnapshot("test-tag-2"); + Snapshot tag2 = tagManager.getOrThrow("test-tag-2").trimToSnapshot(); assertThat(tag2.equals(snapshot1)).isTrue(); } } @@ -1138,9 +1138,9 @@ public abstract class FileStoreTableTestBase { assertThat(tagManager.tagExists("test-tag")).isTrue(); // Create again failed if tag existed Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 1)) - .hasMessageContaining("Tag name 'test-tag' already exists."); + .hasMessageContaining("Tag 'test-tag' already exists."); Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 2)) - .hasMessageContaining("Tag name 'test-tag' already exists."); + .hasMessageContaining("Tag 'test-tag' already exists."); } } @@ -1165,7 +1165,7 @@ public abstract class FileStoreTableTestBase { assertThat(tagManager.tagExists("test-tag")).isTrue(); // verify that test-tag is equal to snapshot 2 - Snapshot tagged = tagManager.taggedSnapshot("test-tag"); + Snapshot tagged = tagManager.getOrThrow("test-tag").trimToSnapshot(); Snapshot snapshot2 = table.snapshotManager().snapshot(2); assertThat(tagged.equals(snapshot2)).isTrue(); @@ -1220,7 +1220,7 @@ public abstract class FileStoreTableTestBase { assertThatThrownBy(() -> table.createBranch("branch-1", "tag1")) .satisfies( anyCauseMatches( - IllegalArgumentException.class, "Tag name 'tag1' not exists.")); + IllegalArgumentException.class, "Tag 'tag1' doesn't exist.")); assertThatThrownBy(() -> table.createBranch("branch0", "test-tag")) .satisfies( @@ -1409,8 +1409,7 @@ public abstract class FileStoreTableTestBase { assertThatThrownBy(() -> table.createTag("", 1)) .satisfies( anyCauseMatches( - IllegalArgumentException.class, - String.format("Tag name '%s' is blank", ""))); + IllegalArgumentException.class, "Tag name shouldn't be blank")); } @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java index 4ad634a433..3040e11175 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java @@ -107,8 +107,8 @@ public class IndexFileExpireTableTest extends PrimaryKeyTableTestBase { assertThat(indexManifestSize()).isEqualTo(3); TagManager tagManager = new TagManager(LocalFileIO.create(), table.location()); - checkIndexFiles(tagManager.taggedSnapshot("tag3")); - checkIndexFiles(tagManager.taggedSnapshot("tag5")); + checkIndexFiles(tagManager.getOrThrow("tag3")); + checkIndexFiles(tagManager.getOrThrow("tag5")); } @Test @@ -135,7 +135,7 @@ public class IndexFileExpireTableTest extends PrimaryKeyTableTestBase { TagManager tagManager = new TagManager(LocalFileIO.create(), table.location()); checkIndexFiles(7); - checkIndexFiles(tagManager.taggedSnapshot("tag5")); + checkIndexFiles(tagManager.getOrThrow("tag5")); assertThat(indexFileSize()).isEqualTo(4); assertThat(indexManifestSize()).isEqualTo(2); } diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java index 1bebbe5fb5..4dfa802f81 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java @@ -392,7 +392,8 @@ public class TagAutoManagerTest extends PrimaryKeyTableTestBase { snapshot1, "non-auto-create-tag-shoule-expire", Duration.ofMillis(500), - Collections.emptyList()); + Collections.emptyList(), + false); Snapshot snapshot2 = new Snapshot( @@ -416,7 +417,8 @@ public class TagAutoManagerTest extends PrimaryKeyTableTestBase { snapshot2, "non-auto-create-tag-shoule-not-expire", Duration.ofDays(1), - Collections.emptyList()); + Collections.emptyList(), + false); // test expire old tag by time-retained Thread.sleep(1000); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java index 3e702b9b2c..abe9ee0b8a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java @@ -91,9 +91,10 @@ public class TagManagerTest { snapshotManager.snapshot(1), "tag", store.options().tagDefaultTimeRetained(), - Collections.emptyList()); + Collections.emptyList(), + false); assertThat(tagManager.tagExists("tag")).isTrue(); - Snapshot snapshot = tagManager.taggedSnapshot("tag"); + Snapshot snapshot = tagManager.getOrThrow("tag").trimToSnapshot(); String snapshotJson = snapshot.toJson(); Assertions.assertTrue( !snapshotJson.contains("tagCreateTime") @@ -119,7 +120,11 @@ public class TagManagerTest { commitData(store, commitIdentifier++, writers); tagManager.createTag( - snapshotManager.snapshot(1), "tag", Duration.ofDays(1), Collections.emptyList()); + snapshotManager.snapshot(1), + "tag", + Duration.ofDays(1), + Collections.emptyList(), + false); assertThat(tagManager.tagExists("tag")).isTrue(); List<Pair<Tag, String>> tags = tagManager.tagObjects(); Assertions.assertEquals(1, tags.size()); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java b/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java index eb616a9ab2..ef811bb712 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java @@ -37,7 +37,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -62,15 +61,7 @@ public class TraceableFileIO implements FileIO { @Override public PositionOutputStream newOutputStream(Path f, boolean overwrite) throws IOException { - return createOutputStream( - f, - () -> { - try { - return originalFs.newOutputStream(f, overwrite); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + return createOutputStream(f, () -> originalFs.newOutputStream(f, overwrite)); } @Override @@ -85,30 +76,22 @@ public class TraceableFileIO implements FileIO { @Override public SeekableInputStream newInputStream(Path f) throws IOException { - return createInputStream( - f, - () -> { - try { - return originalFs.newInputStream(f); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + return createInputStream(f, () -> originalFs.newInputStream(f)); } private PositionOutputStream createOutputStream( - Path f, Supplier<PositionOutputStream> streamOpener) throws IOException { + Path f, SupplierWithIOException<PositionOutputStream> streamOpener) throws IOException { - final Supplier<OutStream> wrappedStreamOpener = + final SupplierWithIOException<OutStream> wrappedStreamOpener = () -> new OutStream(ThreadUtils.currentStackString(), f, streamOpener.get(), this); return createStream(wrappedStreamOpener, OPEN_OUTPUT_STREAMS); } private SeekableInputStream createInputStream( - Path f, Supplier<SeekableInputStream> streamOpener) throws IOException { + Path f, SupplierWithIOException<SeekableInputStream> streamOpener) throws IOException { - final Supplier<InStream> wrappedStreamOpener = + final SupplierWithIOException<InStream> wrappedStreamOpener = () -> new InStream(ThreadUtils.currentStackString(), f, streamOpener.get(), this); return createStream(wrappedStreamOpener, OPEN_INPUT_STREAMS); @@ -144,7 +127,8 @@ public class TraceableFileIO implements FileIO { return originalFs.exists(f); } - private <T> T createStream(final Supplier<T> streamOpener, final HashSet<T> openStreams) + private <T> T createStream( + final SupplierWithIOException<T> streamOpener, final HashSet<T> openStreams) throws IOException { // open the stream outside the lock. final T out = streamOpener.get(); diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java index f79d6fb716..e4d1738667 100644 --- a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java +++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java @@ -540,7 +540,7 @@ public class ProcedurePositionalArgumentsITCase extends CatalogITCaseBase { assertThat(paimonTable("T").snapshotManager().snapshotCount()).isEqualTo(2L); Assertions.assertThatThrownBy(() -> sql("CALL sys.replace_tag('default.T', 'test_tag')")) - .hasMessageContaining("Tag name 'test_tag' does not exist."); + .hasMessageContaining("Tag 'test_tag' doesn't exist."); sql("CALL sys.create_tag('default.T', 'test_tag')"); assertThat(sql("select tag_name,snapshot_id,time_retained from `T$tags`")) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java index 9de974d047..c36a6cd186 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java @@ -31,6 +31,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.SupplierWithIOException; import javax.annotation.Nullable; @@ -176,12 +177,12 @@ public class PickFilesUtil { } @Nullable - private static <T> T retryReadingFiles(ReaderWithIOException<T> reader) throws IOException { + private static <T> T retryReadingFiles(SupplierWithIOException<T> reader) throws IOException { int retryNumber = 0; IOException caught = null; while (retryNumber++ < READ_FILE_RETRY_NUM) { try { - return reader.read(); + return reader.get(); } catch (FileNotFoundException e) { return null; } catch (IOException e) { @@ -197,10 +198,4 @@ public class PickFilesUtil { throw caught; } - - /** A helper functional interface for method {@link #retryReadingFiles}. */ - @FunctionalInterface - interface ReaderWithIOException<T> { - T read() throws IOException; - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index 0822f04612..66d9781207 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -164,9 +164,8 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT> commitOperator.getCommitUser(), identifiers); for (Snapshot snapshot : snapshotForTags) { String tagName = SAVEPOINT_TAG_PREFIX + snapshot.commitIdentifier(); - if (!tagManager.tagExists(tagName)) { - tagManager.createTag(snapshot, tagName, tagTimeRetained, callbacks); - } + // shouldn't throw exception when tag exists + tagManager.createTag(snapshot, tagName, tagTimeRetained, callbacks, true); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index 1cbcc4b226..160765716e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -106,16 +106,15 @@ public class BatchWriteGeneratorTagOperator<CommitT, GlobalCommitT> + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); try { // If the tag already exists, delete the tag - if (tagManager.tagExists(tagName)) { - tagManager.deleteTag( - tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); - } + tagManager.deleteTag( + tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); // Create a new tag tagManager.createTag( snapshot, tagName, table.coreOptions().tagDefaultTimeRetained(), - table.store().createTagCallbacks()); + table.store().createTagCallbacks(), + false); // Expire the tag expireTag(); } catch (Exception e) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java index 1198eb47f4..ee42d2e177 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java @@ -80,8 +80,8 @@ public class CreateTagFromWatermarkActionITTest extends ActionITCaseBase { Long.toString(watermark2 - 1)) .run(); assertThat(table.tagManager().tagExists("tag2")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(watermark2); - assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis()).isEqualTo(commitTime2); + assertThat(table.tagManager().getOrThrow("tag2").watermark()).isEqualTo(watermark2); + assertThat(table.tagManager().getOrThrow("tag2").timeMillis()).isEqualTo(commitTime2); createAction( CreateTagFromWatermarkAction.class, @@ -98,8 +98,8 @@ public class CreateTagFromWatermarkActionITTest extends ActionITCaseBase { Long.toString(watermark2 + 1)) .run(); assertThat(table.tagManager().tagExists("tag3")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark3); - assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime3); + assertThat(table.tagManager().getOrThrow("tag3").watermark()).isEqualTo(watermark3); + assertThat(table.tagManager().getOrThrow("tag3").timeMillis()).isEqualTo(commitTime3); } @Test @@ -131,7 +131,7 @@ public class CreateTagFromWatermarkActionITTest extends ActionITCaseBase { assertThat(table.snapshotManager().snapshotExists(1)).isFalse(); - Snapshot tagSnapshot1 = table.tagManager().taggedSnapshot("tag1"); + Snapshot tagSnapshot1 = table.tagManager().getOrThrow("tag1"); long tagsCommitTime = tagSnapshot1.timeMillis(); long tagsWatermark = tagSnapshot1.watermark(); @@ -158,9 +158,8 @@ public class CreateTagFromWatermarkActionITTest extends ActionITCaseBase { Long.toString(tagsWatermark - 1)) .run(); assertThat(table.tagManager().tagExists("tag2")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(tagsWatermark); - assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis()) - .isEqualTo(tagsCommitTime); + assertThat(table.tagManager().getOrThrow("tag2").watermark()).isEqualTo(tagsWatermark); + assertThat(table.tagManager().getOrThrow("tag2").timeMillis()).isEqualTo(tagsCommitTime); createAction( CreateTagFromWatermarkAction.class, @@ -177,7 +176,7 @@ public class CreateTagFromWatermarkActionITTest extends ActionITCaseBase { Long.toString(watermark2 - 1)) .run(); assertThat(table.tagManager().tagExists("tag3")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark2); - assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime2); + assertThat(table.tagManager().getOrThrow("tag3").watermark()).isEqualTo(watermark2); + assertThat(table.tagManager().getOrThrow("tag3").timeMillis()).isEqualTo(commitTime2); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java index 7e204ca884..23f2f0261d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java @@ -92,7 +92,7 @@ public class ExpireTagsActionTest extends ActionITCaseBase { assertThat(table.tagManager().tagExists("tag-5")).isFalse(); // tag-3 as the base older_than time - LocalDateTime olderThanTime = table.tagManager().tag("tag-3").getTagCreateTime(); + LocalDateTime olderThanTime = table.tagManager().getOrThrow("tag-3").getTagCreateTime(); java.sql.Timestamp timestamp = new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond()); createAction( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java index 00b43b9e11..8b14afdd7d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java @@ -55,12 +55,12 @@ public class ReplaceTagActionTest extends ActionITCaseBase { () -> bEnv.executeSql( "CALL sys.replace_tag(`table` => 'default.T', tag => 'test_tag')")) - .hasMessageContaining("Tag name 'test_tag' does not exist."); + .hasMessageContaining("Tag 'test_tag' doesn't exist."); bEnv.executeSql("CALL sys.create_tag(`table` => 'default.T', tag => 'test_tag')"); assertThat(tagManager.tagExists("test_tag")).isEqualTo(true); - assertThat(tagManager.tag("test_tag").trimToSnapshot().id()).isEqualTo(2); - assertThat(tagManager.tag("test_tag").getTagTimeRetained()).isEqualTo(null); + assertThat(tagManager.getOrThrow("test_tag").id()).isEqualTo(2); + assertThat(tagManager.getOrThrow("test_tag").getTagTimeRetained()).isEqualTo(null); // replace tag with new time_retained createAction( @@ -77,7 +77,7 @@ public class ReplaceTagActionTest extends ActionITCaseBase { "--time_retained", "1 d") .run(); - assertThat(tagManager.tag("test_tag").getTagTimeRetained().toHours()).isEqualTo(24); + assertThat(tagManager.getOrThrow("test_tag").getTagTimeRetained().toHours()).isEqualTo(24); // replace tag with new snapshot and time_retained createAction( @@ -96,7 +96,7 @@ public class ReplaceTagActionTest extends ActionITCaseBase { "--time_retained", "2 d") .run(); - assertThat(tagManager.tag("test_tag").trimToSnapshot().id()).isEqualTo(1); - assertThat(tagManager.tag("test_tag").getTagTimeRetained().toHours()).isEqualTo(48); + assertThat(tagManager.getOrThrow("test_tag").id()).isEqualTo(1); + assertThat(tagManager.getOrThrow("test_tag").getTagTimeRetained().toHours()).isEqualTo(48); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java index 9e7aeb5ded..c2d2bcc10f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java @@ -133,7 +133,7 @@ public class CreateTagFromTimestampProcedureITCase extends CatalogITCaseBase { FileStoreTable table = paimonTable("T"); long earliestCommitTime = table.snapshotManager().earliestSnapshot().timeMillis(); - long tagSnapshotCommitTime = table.tagManager().taggedSnapshot("tag1").timeMillis(); + long tagSnapshotCommitTime = table.tagManager().getOrThrow("tag1").timeMillis(); assertThat(tagSnapshotCommitTime < earliestCommitTime).isTrue(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java index 8e659e75c8..9255aa9563 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java @@ -147,7 +147,7 @@ public class CreateTagFromWatermarkProcedureITCase extends CatalogITCaseBase { assertThat(table.snapshotManager().snapshotExists(1)).isFalse(); - Snapshot tagSnapshot1 = table.tagManager().taggedSnapshot("tag1"); + Snapshot tagSnapshot1 = table.tagManager().getOrThrow("tag1"); long tagsCommitTime = tagSnapshot1.timeMillis(); long tagsWatermark = tagSnapshot1.watermark(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java index 4a89531b22..90e5bc6702 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java @@ -102,7 +102,7 @@ public class ExpireTagsProcedureITCase extends CatalogITCaseBase { // tag-2 as the base older_than time. // tag-1 expired by its file creation time. - LocalDateTime olderThanTime1 = table.tagManager().tag("tag-2").getTagCreateTime(); + LocalDateTime olderThanTime1 = table.tagManager().getOrThrow("tag-2").getTagCreateTime(); java.sql.Timestamp timestamp1 = new java.sql.Timestamp( Timestamp.fromLocalDateTime(olderThanTime1).getMillisecond()); @@ -119,7 +119,7 @@ public class ExpireTagsProcedureITCase extends CatalogITCaseBase { // tag-4 as the base older_than time. // tag-2,tag-3,tag-5 expired, tag-5 reached its tagTimeRetained. - LocalDateTime olderThanTime2 = table.tagManager().tag("tag-4").getTagCreateTime(); + LocalDateTime olderThanTime2 = table.tagManager().getOrThrow("tag-4").getTagCreateTime(); java.sql.Timestamp timestamp2 = new java.sql.Timestamp( Timestamp.fromLocalDateTime(olderThanTime2).getMillisecond()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java index 8a4eb791a6..4ee4287b7f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java @@ -44,7 +44,7 @@ public class ReplaceTagProcedureITCase extends CatalogITCaseBase { () -> sql( "CALL sys.replace_tag(`table` => 'default.T', tag => 'test_tag')")) - .hasMessageContaining("Tag name 'test_tag' does not exist."); + .hasMessageContaining("Tag 'test_tag' doesn't exist."); sql("CALL sys.create_tag(`table` => 'default.T', tag => 'test_tag')"); assertThat(sql("select tag_name,snapshot_id,time_retained from `T$tags`")) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java index 68162832ea..10292b3a3f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java @@ -97,7 +97,8 @@ public class BatchWriteGeneratorTagOperatorTest extends CommitterOperatorTest { // Get tagName from tagManager. String tagName = tagManager.allTagNames().get(0); // The tag is consistent with the latest snapshot - assertThat(tagManager.taggedSnapshot(tagName)).isEqualTo(snapshotManager.latestSnapshot()); + assertThat(tagManager.getOrThrow(tagName).trimToSnapshot()) + .isEqualTo(snapshotManager.latestSnapshot()); // test tag expiration table.createTag("many-tags-test1"); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala index 1da05843dc..301b769288 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala @@ -143,7 +143,7 @@ class CreateTagFromTimestampProcedureTest extends PaimonSparkTestBase with Strea val table = loadTable("T") val latestCommitTime = table.snapshotManager.latestSnapshot().timeMillis - val tagsCommitTime = table.tagManager().taggedSnapshot("test_tag").timeMillis + val tagsCommitTime = table.tagManager().getOrThrow("test_tag").timeMillis assert(latestCommitTime > tagsCommitTime) // make snapshot 1 expire. diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala index 65c0f2b9a2..1ac9709c87 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala @@ -97,7 +97,7 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase { // tag-2 as the base older_than time. // tag-1 expired by its file creation time. - val olderThanTime1 = table.tagManager().tag("tag-2").getTagCreateTime + val olderThanTime1 = table.tagManager().getOrThrow("tag-2").getTagCreateTime val timestamp1 = new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime1).getMillisecond) checkAnswer( @@ -112,7 +112,7 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase { // tag-4 as the base older_than time. // tag-2,tag-3,tag-5 expired, tag-5 reached its tagTimeRetained. - val olderThanTime2 = table.tagManager().tag("tag-4").getTagCreateTime + val olderThanTime2 = table.tagManager().getOrThrow("tag-4").getTagCreateTime val timestamp2 = new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime2).getMillisecond) checkAnswer(
