This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f757b5258c [core] Support tags in Iceberg compatibility (#5570)
f757b5258c is described below
commit f757b5258cd0f64ed27aeb432240fb5bbb379284
Author: Nick Del Nano <[email protected]>
AuthorDate: Mon May 19 00:22:00 2025 -0600
[core] Support tags in Iceberg compatibility (#5570)
---
.../java/org/apache/paimon/AbstractFileStore.java | 10 +-
.../src/main/java/org/apache/paimon/FileStore.java | 4 +-
.../paimon/iceberg/IcebergCommitCallback.java | 143 +++++++++++++++++-
.../paimon/iceberg/metadata/IcebergMetadata.java | 26 +++-
.../apache/paimon/iceberg/metadata/IcebergRef.java | 94 ++++++++++++
.../paimon/privilege/PrivilegedFileStore.java | 8 +-
.../paimon/table/AbstractFileStoreTable.java | 10 +-
.../org/apache/paimon/table/sink/TagCallback.java | 5 +
.../org/apache/paimon/tag/TagBatchCreation.java | 11 +-
.../java/org/apache/paimon/utils/TagManager.java | 2 +-
.../paimon/iceberg/IcebergCompatibilityTest.java | 166 +++++++++++++++++++++
.../paimon/tag/SuccessFileTagCallBackTest.java | 4 +-
.../flink/iceberg/Flink116IcebergITCase.java | 10 ++
.../flink/iceberg/Flink117IcebergITCase.java | 12 +-
.../flink/procedure/ExpireTagsProcedure.java | 2 +-
.../flink/procedure/ExpireTagsProcedure.java | 2 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 2 +-
.../flink/iceberg/FlinkIcebergITCaseBase.java | 164 ++++++++++++++++++++
.../AutoTagForSavepointCommitterOperatorTest.java | 4 +-
.../spark/procedure/ExpireTagsProcedure.java | 5 +-
20 files changed, 650 insertions(+), 34 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 1b0f07efb3..929d760b0b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -450,17 +450,17 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
}
@Override
- public TagAutoManager newTagCreationManager() {
+ public TagAutoManager newTagCreationManager(FileStoreTable table) {
return TagAutoManager.create(
options,
snapshotManager(),
newTagManager(),
newTagDeletion(),
- createTagCallbacks());
+ createTagCallbacks(table));
}
@Override
- public List<TagCallback> createTagCallbacks() {
+ public List<TagCallback> createTagCallbacks(FileStoreTable table) {
List<TagCallback> callbacks = new
ArrayList<>(CallbackUtils.loadTagCallbacks(options));
String partitionField = options.tagToPartitionField();
@@ -473,6 +473,10 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
if (options.tagCreateSuccessFile()) {
callbacks.add(new SuccessFileTagCallback(fileIO,
newTagManager().tagDirectory()));
}
+ if
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
+ != IcebergOptions.StorageType.DISABLED) {
+ callbacks.add(new IcebergCommitCallback(table, ""));
+ }
return callbacks;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 4d50021b3c..34e04cfabb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -111,13 +111,13 @@ public interface FileStore<T> {
Duration checkInterval,
PartitionExpireStrategy expireStrategy);
- TagAutoManager newTagCreationManager();
+ TagAutoManager newTagCreationManager(FileStoreTable table);
ServiceManager newServiceManager();
boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
- List<TagCallback> createTagCallbacks();
+ List<TagCallback> createTagCallbacks(FileStoreTable table);
void setManifestCache(SegmentsCache<Path> manifestCache);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 9808d378f6..dc45fbe4f5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -37,6 +37,7 @@ import org.apache.paimon.iceberg.metadata.IcebergDataField;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.iceberg.metadata.IcebergRef;
import org.apache.paimon.iceberg.metadata.IcebergSchema;
import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
@@ -49,6 +50,7 @@ import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ScanMode;
@@ -61,6 +63,9 @@ import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -83,7 +88,9 @@ import java.util.stream.Collectors;
* A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg
readers can read
* Paimon's {@link RawFile}.
*/
-public class IcebergCommitCallback implements CommitCallback {
+public class IcebergCommitCallback implements CommitCallback, TagCallback {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergCommitCallback.class);
// see org.apache.iceberg.hadoop.Util
private static final String VERSION_HINT_FILENAME = "version-hint.text";
@@ -272,6 +279,13 @@ public class IcebergCommitCallback implements
CommitCallback {
pathFactory.toManifestListPath(manifestListFileName).toString(),
schemaId);
+ Map<String, IcebergRef> icebergTags =
+ table.tagManager().tags().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> entry.getValue().get(0),
+ entry -> new
IcebergRef(entry.getKey().id())));
+
String tableUuid = UUID.randomUUID().toString();
IcebergMetadata metadata =
new IcebergMetadata(
@@ -289,7 +303,8 @@ public class IcebergCommitCallback implements
CommitCallback {
// not sure why, this is a result
tested by hand
IcebergPartitionField.FIRST_FIELD_ID -
1),
Collections.singletonList(snapshot),
- (int) snapshotId);
+ (int) snapshotId,
+ icebergTags);
Path metadataPath = pathFactory.toMetadataPath(snapshotId);
table.fileIO().tryToWriteAtomic(metadataPath, metadata.toJson());
@@ -439,7 +454,8 @@ public class IcebergCommitCallback implements
CommitCallback {
baseMetadata.partitionSpecs(),
baseMetadata.lastPartitionId(),
snapshots,
- (int) snapshotId);
+ (int) snapshotId,
+ baseMetadata.refs());
Path metadataPath = pathFactory.toMetadataPath(snapshotId);
table.fileIO().tryToWriteAtomic(metadataPath, metadata.toJson());
@@ -798,6 +814,127 @@ public class IcebergCommitCallback implements
CommitCallback {
}
}
+ @Override
+ public void notifyCreation(String tagName) {
+ throw new UnsupportedOperationException(
+ "IcebergCommitCallback notifyCreation requires a snapshot ID");
+ }
+
+ @Override
+ public void notifyCreation(String tagName, long snapshotId) {
+ try {
+ Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
+ if (latestSnapshot == null) {
+ LOG.info(
+ "Latest Iceberg snapshot not found when creating tag
{} for snapshot {}. Unable to create tag.",
+ tagName,
+ snapshotId);
+ return;
+ }
+
+ Path baseMetadataPath =
pathFactory.toMetadataPath(latestSnapshot.id());
+ if (!table.fileIO().exists(baseMetadataPath)) {
+ LOG.info(
+ "Iceberg metadata file {} not found when creating tag
{} for snapshot {}. Unable to create tag.",
+ baseMetadataPath,
+ tagName,
+ snapshotId);
+ return;
+ }
+
+ IcebergMetadata baseMetadata =
+ IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath);
+
+ baseMetadata.refs().put(tagName, new IcebergRef(snapshotId));
+
+ IcebergMetadata metadata =
+ new IcebergMetadata(
+ baseMetadata.tableUuid(),
+ baseMetadata.location(),
+ baseMetadata.currentSnapshotId(),
+ baseMetadata.lastColumnId(),
+ baseMetadata.schemas(),
+ baseMetadata.currentSchemaId(),
+ baseMetadata.partitionSpecs(),
+ baseMetadata.lastPartitionId(),
+ baseMetadata.snapshots(),
+ baseMetadata.currentSnapshotId(),
+ baseMetadata.refs());
+
+ /*
+ Overwrite the latest metadata file
+ Currently the Paimon table snapshot id value is the same as the
Iceberg metadata
+ version number. Tag creation overwrites the latest metadata file
to maintain this.
+ There is no need to update the catalog after overwrite.
+ */
+ table.fileIO().overwriteFileUtf8(baseMetadataPath,
metadata.toJson());
+ LOG.info(
+ "Iceberg metadata file {} overwritten to add tag {} for
snapshot {}.",
+ baseMetadataPath,
+ tagName,
+ snapshotId);
+
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create tag " + tagName,
e);
+ }
+ }
+
+ @Override
+ public void notifyDeletion(String tagName) {
+ try {
+ Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
+ if (latestSnapshot == null) {
+ LOG.info(
+ "Latest Iceberg snapshot not found when deleting tag
{}. Unable to delete tag.",
+ tagName);
+ return;
+ }
+
+ Path baseMetadataPath =
pathFactory.toMetadataPath(latestSnapshot.id());
+ if (!table.fileIO().exists(baseMetadataPath)) {
+ LOG.info(
+ "Iceberg metadata file {} not found when deleting tag
{}. Unable to delete tag.",
+ baseMetadataPath,
+ tagName);
+ return;
+ }
+
+ IcebergMetadata baseMetadata =
+ IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath);
+
+ baseMetadata.refs().remove(tagName);
+
+ IcebergMetadata metadata =
+ new IcebergMetadata(
+ baseMetadata.tableUuid(),
+ baseMetadata.location(),
+ baseMetadata.currentSnapshotId(),
+ baseMetadata.lastColumnId(),
+ baseMetadata.schemas(),
+ baseMetadata.currentSchemaId(),
+ baseMetadata.partitionSpecs(),
+ baseMetadata.lastPartitionId(),
+ baseMetadata.snapshots(),
+ baseMetadata.currentSnapshotId(),
+ baseMetadata.refs());
+
+ /*
+ Overwrite the latest metadata file
+ Currently the Paimon table snapshot id value is the same as the
Iceberg metadata
+ version number. Tag creation overwrites the latest metadata file
to maintain this.
+ There is no need to update the catalog after overwrite.
+ */
+ table.fileIO().overwriteFileUtf8(baseMetadataPath,
metadata.toJson());
+ LOG.info(
+ "Iceberg metadata file {} overwritten to delete tag {}.",
+ baseMetadataPath,
+ tagName);
+
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create tag " + tagName,
e);
+ }
+ }
+
//
-------------------------------------------------------------------------------------
// Utils
//
-------------------------------------------------------------------------------------
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
index fbaf806002..00c4c3de61 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -62,6 +62,7 @@ public class IcebergMetadata {
private static final String FIELD_SNAPSHOTS = "snapshots";
private static final String FIELD_CURRENT_SNAPSHOT_ID =
"current-snapshot-id";
private static final String FIELD_PROPERTIES = "properties";
+ private static final String FIELD_REFS = "refs";
@JsonProperty(FIELD_FORMAT_VERSION)
private final int formatVersion;
@@ -112,6 +113,10 @@ public class IcebergMetadata {
@Nullable
private final Map<String, String> properties;
+ @JsonProperty(FIELD_REFS)
+ @Nullable
+ private final Map<String, IcebergRef> refs;
+
public IcebergMetadata(
String tableUuid,
String location,
@@ -122,7 +127,8 @@ public class IcebergMetadata {
List<IcebergPartitionSpec> partitionSpecs,
int lastPartitionId,
List<IcebergSnapshot> snapshots,
- long currentSnapshotId) {
+ long currentSnapshotId,
+ @Nullable Map<String, IcebergRef> refs) {
this(
CURRENT_FORMAT_VERSION,
tableUuid,
@@ -139,7 +145,8 @@ public class IcebergMetadata {
IcebergSortOrder.ORDER_ID,
snapshots,
currentSnapshotId,
- new HashMap<>());
+ new HashMap<>(),
+ refs);
}
@JsonCreator
@@ -159,7 +166,8 @@ public class IcebergMetadata {
@JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) int defaultSortOrderId,
@JsonProperty(FIELD_SNAPSHOTS) List<IcebergSnapshot> snapshots,
@JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) long currentSnapshotId,
- @JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String>
properties) {
+ @JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String>
properties,
+ @JsonProperty(FIELD_REFS) @Nullable Map<String, IcebergRef> refs) {
this.formatVersion = formatVersion;
this.tableUuid = tableUuid;
this.location = location;
@@ -176,6 +184,7 @@ public class IcebergMetadata {
this.snapshots = snapshots;
this.currentSnapshotId = currentSnapshotId;
this.properties = properties;
+ this.refs = refs;
}
@JsonGetter(FIELD_FORMAT_VERSION)
@@ -258,6 +267,11 @@ public class IcebergMetadata {
return properties == null ? new HashMap<>() : properties;
}
+ @JsonGetter(FIELD_REFS)
+ public Map<String, IcebergRef> refs() {
+ return refs == null ? new HashMap<>() : refs;
+ }
+
public IcebergSnapshot currentSnapshot() {
for (IcebergSnapshot snapshot : snapshots) {
if (snapshot.snapshotId() == currentSnapshotId) {
@@ -302,7 +316,8 @@ public class IcebergMetadata {
sortOrders,
defaultSortOrderId,
snapshots,
- currentSnapshotId);
+ currentSnapshotId,
+ refs);
}
@Override
@@ -329,6 +344,7 @@ public class IcebergMetadata {
&& Objects.equals(sortOrders, that.sortOrders)
&& defaultSortOrderId == that.defaultSortOrderId
&& Objects.equals(snapshots, that.snapshots)
- && currentSnapshotId == that.currentSnapshotId;
+ && currentSnapshotId == that.currentSnapshotId
+ && Objects.equals(refs, that.refs);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergRef.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergRef.java
new file mode 100644
index 0000000000..c808fcd7b7
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergRef.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * Iceberg's ref metadata.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#refs">Iceberg spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergRef {
+
+ private static final String FIELD_SNAPSHOT_ID = "snapshot-id";
+ private static final String FIELD_TYPE = "type";
+ private static final String FIELD_MAX_REF_AGE_MS = "max-ref-age-ms";
+
+ @JsonProperty(FIELD_SNAPSHOT_ID)
+ private final long snapshotId;
+
+ @JsonProperty(FIELD_TYPE)
+ private final String type;
+
+ @JsonProperty(FIELD_MAX_REF_AGE_MS)
+ @Nullable
+ private final Long maxRefAgeMs;
+
+ @JsonCreator
+ public IcebergRef(@JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId) {
+ this.snapshotId = snapshotId;
+ this.type = "tag"; // Only type supported is tag
+ this.maxRefAgeMs =
+ Long.MAX_VALUE; // Tags are expired by Paimon, not by Iceberg
compatibility. So
+ // this value is set to a default value of Long.MAX_VALUE.
+ }
+
+ @JsonGetter(FIELD_SNAPSHOT_ID)
+ public long snapshotId() {
+ return snapshotId;
+ }
+
+ @JsonGetter(FIELD_TYPE)
+ public String type() {
+ return type;
+ }
+
+ @JsonGetter(FIELD_MAX_REF_AGE_MS)
+ public Long maxRefAgeMs() {
+ return maxRefAgeMs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IcebergRef)) {
+ return false;
+ }
+ IcebergRef that = (IcebergRef) o;
+ return snapshotId == that.snapshotId
+ && type.equals(that.type)
+ && maxRefAgeMs == that.maxRefAgeMs;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(snapshotId, type, maxRefAgeMs);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index ebcac8252c..a6084350ba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -202,9 +202,9 @@ public class PrivilegedFileStore<T> implements FileStore<T>
{
}
@Override
- public TagAutoManager newTagCreationManager() {
+ public TagAutoManager newTagCreationManager(FileStoreTable table) {
privilegeChecker.assertCanInsert(identifier);
- return wrapped.newTagCreationManager();
+ return wrapped.newTagCreationManager(table);
}
@Override
@@ -220,8 +220,8 @@ public class PrivilegedFileStore<T> implements FileStore<T>
{
}
@Override
- public List<TagCallback> createTagCallbacks() {
- return wrapped.createTagCallbacks();
+ public List<TagCallback> createTagCallbacks(FileStoreTable table) {
+ return wrapped.createTagCallbacks(table);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index fdeef2f5f0..d7b0af322d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -448,7 +448,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
store().newCommit(commitUser, this),
newExpireRunnable(),
options.writeOnly() ? null :
store().newPartitionExpire(commitUser, this),
- options.writeOnly() ? null : store().newTagCreationManager(),
+ options.writeOnly() ? null :
store().newTagCreationManager(this),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path, snapshotManager().branch()),
options.snapshotExpireExecutionMode(),
@@ -568,7 +568,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
private void createTag(String tagName, Snapshot fromSnapshot, @Nullable
Duration timeRetained) {
tagManager()
.createTag(
- fromSnapshot, tagName, timeRetained,
store().createTagCallbacks(), false);
+ fromSnapshot,
+ tagName,
+ timeRetained,
+ store().createTagCallbacks(this),
+ false);
}
@Override
@@ -596,7 +600,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
tagName,
store().newTagDeletion(),
snapshotManager(),
- store().createTagCallbacks());
+ store().createTagCallbacks(this));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
index 1d20bb89db..d99451633e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
@@ -27,5 +27,10 @@ public interface TagCallback extends AutoCloseable {
void notifyCreation(String tagName);
+ // Iceberg tag callbacks require snapshotId
+ default void notifyCreation(String tagName, long snapshotId) {
+ notifyCreation(tagName);
+ }
+
void notifyDeletion(String tagName);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
index 8afeaa6216..f788b38ec0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
@@ -69,13 +69,13 @@ public class TagBatchCreation {
try {
// If the tag already exists, delete the tag
tagManager.deleteTag(
- tagName, tagDeletion, snapshotManager,
table.store().createTagCallbacks());
+ tagName, tagDeletion, snapshotManager,
table.store().createTagCallbacks(table));
// Create a new tag
tagManager.createTag(
snapshot,
tagName,
table.coreOptions().tagDefaultTimeRetained(),
- table.store().createTagCallbacks(),
+ table.store().createTagCallbacks(table),
false);
} catch (Exception e) {
LOG.warn(
@@ -85,7 +85,10 @@ public class TagBatchCreation {
e);
if (tagManager.tagExists(tagName)) {
tagManager.deleteTag(
- tagName, tagDeletion, snapshotManager,
table.store().createTagCallbacks());
+ tagName,
+ tagDeletion,
+ snapshotManager,
+ table.store().createTagCallbacks(table));
}
}
// Expire the tag
@@ -113,7 +116,7 @@ public class TagBatchCreation {
toBeDeleted,
tagDeletion,
snapshotManager,
- table.store().createTagCallbacks());
+ table.store().createTagCallbacks(table));
tagCount--;
if (tagCount == tagNumRetainedMax) {
break;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index cbebb76ed9..132ea8bb9c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -146,7 +146,7 @@ public class TagManager {
if (callbacks != null) {
try {
- callbacks.forEach(callback ->
callback.notifyCreation(tagName));
+ callbacks.forEach(callback -> callback.notifyCreation(tagName,
snapshot.id()));
} finally {
for (TagCallback tagCallback : callbacks) {
IOUtils.closeQuietly(tagCallback);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 964ff3a7a7..8d5f3e997f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -38,6 +38,7 @@ import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
import org.apache.paimon.iceberg.manifest.IcebergManifestList;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergRef;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
@@ -769,6 +770,171 @@ public class IcebergCompatibilityTest {
}
}
+ /*
+ Create snapshots
+ Create tags
+ Verify tags
+ Delete a tag
+ Verify tags
+ */
+ @Test
+ public void testCreateAndDeleteTags() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ FileStoreTable table =
+ createPaimonTable(
+ rowType, Collections.emptyList(),
Collections.singletonList("k"), 1);
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 10));
+ write.write(GenericRow.of(2, 20));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1,
10)", "Record(2, 20)");
+
+ write.write(GenericRow.of(3, 30));
+ write.write(GenericRow.of(4, 40));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(1, 10)", "Record(2, 20)", "Record(3, 30)",
"Record(4, 40)");
+
+ String tagV1 = "v1";
+ table.createTag(tagV1, 1);
+ String tagV3 = "v3";
+ table.createTag(tagV3, 3);
+
+ long latestSnapshotId = table.snapshotManager().latestSnapshotId();
+ Map<String, IcebergRef> refs = getRefsFromSnapshot(table,
latestSnapshotId);
+
+ assertThat(refs.size() == 2).isTrue();
+
+ assertThat(refs.get(tagV1).snapshotId() == 1).isTrue();
+ assertThat(refs.get(tagV1).type().equals("tag")).isTrue(); // constant
+ assertThat(refs.get(tagV1).maxRefAgeMs() == Long.MAX_VALUE).isTrue();
// constant
+
+ assertThat(refs.get(tagV3).snapshotId() == latestSnapshotId).isTrue();
+
+ assertThat(
+ getIcebergResult(
+ icebergTable ->
+ IcebergGenerics.read(icebergTable)
+ .useSnapshot(
+
icebergTable.refs().get(tagV1).snapshotId())
+ .build(),
+ Record::toString))
+ .containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)");
+ assertThat(
+ getIcebergResult(
+ icebergTable ->
+ IcebergGenerics.read(icebergTable)
+ .useSnapshot(
+
icebergTable.refs().get(tagV3).snapshotId())
+ .build(),
+ Record::toString))
+ .containsExactlyInAnyOrder(
+ "Record(1, 10)", "Record(2, 20)", "Record(3, 30)",
"Record(4, 40)");
+
+ table.deleteTag(tagV1);
+
+ Map<String, IcebergRef> refsAfterDelete = getRefsFromSnapshot(table,
latestSnapshotId);
+
+ assertThat(refsAfterDelete.size() == 1).isTrue();
+ assertThat(refsAfterDelete.get(tagV3).snapshotId() ==
latestSnapshotId).isTrue();
+
+ assertThat(
+ getIcebergResult(
+ icebergTable ->
+ IcebergGenerics.read(icebergTable)
+ .useSnapshot(
+
icebergTable.refs().get(tagV3).snapshotId())
+ .build(),
+ Record::toString))
+ .containsExactlyInAnyOrder(
+ "Record(1, 10)", "Record(2, 20)", "Record(3, 30)",
"Record(4, 40)");
+
+ write.close();
+ commit.close();
+ }
+
+ /*
+ Create a snapshot and tag
+ Delete Iceberg metadata
+ Commit again and verify that Iceberg metadata and tags are created
+ */
+ @Test
+ public void testTagsCreateMetadataWithoutBase() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"k", "v"});
+ FileStoreTable table =
+ createPaimonTable(
+ rowType, Collections.emptyList(),
Collections.singletonList("k"), 1);
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ write.write(GenericRow.of(1, 10));
+ write.write(GenericRow.of(2, 20));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ String tagV1 = "v1";
+ table.createTag(tagV1, 1);
+
+ assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1,
10)", "Record(2, 20)");
+
+ write.write(GenericRow.of(3, 30));
+ write.write(GenericRow.of(4, 40));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder(
+ "Record(1, 10)", "Record(2, 20)", "Record(3, 30)",
"Record(4, 40)");
+
+ Map<String, IcebergRef> refs =
+ getRefsFromSnapshot(table,
table.snapshotManager().latestSnapshotId());
+
+ assertThat(refs.size() == 1).isTrue();
+ assertThat(refs.get(tagV1).snapshotId() == 1).isTrue();
+
+ assertThat(
+ getIcebergResult(
+ icebergTable ->
+ IcebergGenerics.read(icebergTable)
+ .useSnapshot(
+
icebergTable.refs().get(tagV1).snapshotId())
+ .build(),
+ Record::toString))
+ .containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)");
+
+ // Delete Iceberg metadata so that metadata is created without base
+ table.fileIO().deleteDirectoryQuietly(new Path(table.location(),
"metadata"));
+
+ write.write(GenericRow.of(5, 50));
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit.commit(4, write.prepareCommit(true, 4));
+
+ Map<String, IcebergRef> refsAfterMetadataDelete =
+ getRefsFromSnapshot(table,
table.snapshotManager().latestSnapshotId());
+ assertThat(refsAfterMetadataDelete.size() == 1).isTrue();
+ assertThat(refsAfterMetadataDelete.get(tagV1).snapshotId() ==
1).isTrue();
+ }
+
+ private Map<String, IcebergRef> getRefsFromSnapshot(FileStoreTable table,
long snapshotId) {
+ return IcebergMetadata.fromPath(
+ table.fileIO(),
+ new Path(table.location(), "metadata/v" + snapshotId +
".metadata.json"))
+ .refs();
+ }
+
// ------------------------------------------------------------------------
// Random Tests
// ------------------------------------------------------------------------
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
index 34dd118818..bb0b119b0d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
@@ -57,7 +57,7 @@ public class SuccessFileTagCallBackTest extends
PrimaryKeyTableTestBase {
TagManager tagManager = fileStore.newTagManager();
SuccessFileTagCallback successFileTagCallback = null;
- List<TagCallback> tagCallbacks = fileStore.createTagCallbacks();
+ List<TagCallback> tagCallbacks = fileStore.createTagCallbacks(table);
assertThat(tagCallbacks).hasAtLeastOneElementOfType(SuccessFileTagCallback.class);
for (TagCallback tagCallback : tagCallbacks) {
@@ -89,7 +89,7 @@ public class SuccessFileTagCallBackTest extends
PrimaryKeyTableTestBase {
TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
FileStore<?> fileStore = table.store();
TagManager tagManager = fileStore.newTagManager();
- List<TagCallback> tagCallbacks = fileStore.createTagCallbacks();
+ List<TagCallback> tagCallbacks = fileStore.createTagCallbacks(table);
assertThat(tagCallbacks).doesNotHaveAnyElementsOfTypes(SuccessFileTagCallback.class);
commit.commit(new ManifestCommittable(0,
utcMills("2023-07-18T12:12:00")));
diff --git
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
index 3001fefe4b..dc4810a902 100644
---
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
+++
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
@@ -26,4 +26,14 @@ public class Flink116IcebergITCase extends
FlinkIcebergITCaseBase {
// Flink 1.16 (or maybe Calcite?) will mistakenly cast the result to
VARCHAR(5),
// so we skip this test in Flink 1.16.
}
+
+ @Override
+ public void testCreateTags(String format) throws Exception {
+ // Flink 1.16 does not support create_tag procedure so we skip this
test.
+ }
+
+ @Override
+ public void testDeleteTags(String format) throws Exception {
+ // Flink 1.16 does not support delete_tag procedure so we skip this
test.
+ }
}
diff --git
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
index 3628043bdf..7a6f0271ad 100644
---
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
+++
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
@@ -19,4 +19,14 @@
package org.apache.paimon.flink.iceberg;
/** IT cases for Paimon Iceberg compatibility in Flink 1.17. */
-public class Flink117IcebergITCase extends FlinkIcebergITCaseBase {}
+public class Flink117IcebergITCase extends FlinkIcebergITCaseBase {
+ @Override
+ public void testCreateTags(String format) throws Exception {
+ // Flink 1.17 does not support create_tag procedure so we skip this
test.
+ }
+
+ @Override
+ public void testDeleteTags(String format) throws Exception {
+ // Flink 1.17 does not support delete_tag procedure so we skip this
test.
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
index 183e39dc66..0d33553b7a 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
@@ -43,7 +43,7 @@ public class ExpireTagsProcedure extends ProcedureBase {
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
TagTimeExpire tagTimeExpire =
-
fileStoreTable.store().newTagCreationManager().getTagTimeExpire();
+
fileStoreTable.store().newTagCreationManager(fileStoreTable).getTagTimeExpire();
if (olderThanStr != null) {
LocalDateTime olderThanTime =
DateTimeUtils.parseTimestampData(olderThanStr, 3,
TimeZone.getDefault())
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
index 3d8af1de70..df287219b4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
@@ -53,7 +53,7 @@ public class ExpireTagsProcedure extends ProcedureBase {
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
TagTimeExpire tagTimeExpire =
-
fileStoreTable.store().newTagCreationManager().getTagTimeExpire();
+
fileStoreTable.store().newTagCreationManager(fileStoreTable).getTagTimeExpire();
if (olderThanStr != null) {
LocalDateTime olderThanTime =
DateTimeUtils.parseTimestampData(olderThanStr, 3,
TimeZone.getDefault())
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index d99dd255e4..ccceaa1a54 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -289,7 +289,7 @@ public abstract class FlinkSink<T> implements Serializable {
table::snapshotManager,
table::tagManager,
() -> table.store().newTagDeletion(),
- () -> table.store().createTagCallbacks(),
+ () -> table.store().createTagCallbacks(table),
table.coreOptions().tagDefaultTimeRetained());
}
if (conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.BATCH
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
index 9202cfb8fe..14ad801607 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
@@ -459,6 +459,170 @@ public abstract class FlinkIcebergITCaseBase extends
AbstractTestBase {
Row.of(2, "pear", 2000L), Row.of(3, "watermelon",
3000L));
}
+ @ParameterizedTest
+ @ValueSource(strings = {"orc", "parquet", "avro"})
+ public void testCreateTags(String format) throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
tableEnvironmentBuilder().batchMode().parallelism(2).build();
+ tEnv.executeSql(
+ "CREATE CATALOG paimon WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql(
+ "CREATE TABLE paimon.`default`.T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v1 INT,\n"
+ + " v2 STRING,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'metadata.iceberg.storage' = 'hadoop-catalog',\n"
+ // make sure all changes are visible in iceberg
metadata
+ + " 'full-compaction.delta-commits' = '1',\n"
+ + " 'file.format' = '"
+ + format
+ + "'\n"
+ + ")");
+ tEnv.executeSql(
+ "INSERT INTO paimon.`default`.T VALUES "
+ + "(1, 10, 100, 'apple'), "
+ + "(1, 11, 110, 'banana'), "
+ + "(2, 20, 200, 'cat'), "
+ + "(2, 21, 210, 'dog')")
+ .await();
+
+ tEnv.executeSql(
+ "INSERT INTO paimon.`default`.T VALUES "
+ + "(1, 10, 101, 'red'), "
+ + "(1, 12, 121, 'green'), "
+ + "(2, 20, 201, 'blue'), "
+ + "(2, 22, 221, 'yellow')")
+ .await();
+
+ tEnv.executeSql(
+ "CREATE CATALOG iceberg WITH (\n"
+ + " 'type' = 'iceberg',\n"
+ + " 'catalog-type' = 'hadoop',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "/iceberg',\n"
+ + " 'cache-enabled' = 'false'\n"
+ + ")");
+
+ tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag1', 1)");
+
+ assertThat(
+ collect(
+ tEnv.executeSql(
+ "SELECT v1, k, v2, pt FROM
iceberg.`default`.T /*+ OPTIONS('tag'='tag1') */ ORDER BY pt, k")))
+ .containsExactly(
+ Row.of(100, 10, "apple", 1),
+ Row.of(110, 11, "banana", 1),
+ Row.of(200, 20, "cat", 2),
+ Row.of(210, 21, "dog", 2));
+
+ // Snapshot ID 4 due to full-compaction.delta-commits=1
+ tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag2', 4)");
+
+ assertThat(
+ collect(
+ tEnv.executeSql(
+ "SELECT v1, k, v2, pt FROM
iceberg.`default`.T /*+ OPTIONS('tag'='tag2') */ ORDER BY pt, k")))
+ .containsExactly(
+ Row.of(101, 10, "red", 1),
+ Row.of(110, 11, "banana", 1),
+ Row.of(121, 12, "green", 1),
+ Row.of(201, 20, "blue", 2),
+ Row.of(210, 21, "dog", 2),
+ Row.of(221, 22, "yellow", 2));
+
+ assertThat(
+ collect(
+ tEnv.executeSql(
+ "SELECT name, type, snapshot_id FROM
iceberg.`default`.T$refs")))
+ .containsExactlyInAnyOrder(Row.of("tag1", "TAG", 1L),
Row.of("tag2", "TAG", 4L));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"orc", "parquet", "avro"})
+ public void testDeleteTags(String format) throws Exception {
+ String warehouse = getTempDirPath();
+ TableEnvironment tEnv =
tableEnvironmentBuilder().batchMode().parallelism(2).build();
+ tEnv.executeSql(
+ "CREATE CATALOG paimon WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql(
+ "CREATE TABLE paimon.`default`.T (\n"
+ + " pt INT,\n"
+ + " k INT,\n"
+ + " v1 INT,\n"
+ + " v2 STRING,\n"
+ + " PRIMARY KEY (pt, k) NOT ENFORCED\n"
+ + ") PARTITIONED BY (pt) WITH (\n"
+ + " 'metadata.iceberg.storage' = 'hadoop-catalog',\n"
+ // make sure all changes are visible in iceberg
metadata
+ + " 'full-compaction.delta-commits' = '1',\n"
+ + " 'file.format' = '"
+ + format
+ + "'\n"
+ + ")");
+ tEnv.executeSql(
+ "INSERT INTO paimon.`default`.T VALUES "
+ + "(1, 10, 100, 'apple'), "
+ + "(1, 11, 110, 'banana'), "
+ + "(2, 20, 200, 'cat'), "
+ + "(2, 21, 210, 'dog')")
+ .await();
+
+ tEnv.executeSql(
+ "INSERT INTO paimon.`default`.T VALUES "
+ + "(1, 10, 101, 'red'), "
+ + "(1, 12, 121, 'green'), "
+ + "(2, 20, 201, 'blue'), "
+ + "(2, 22, 221, 'yellow')")
+ .await();
+
+ tEnv.executeSql(
+ "CREATE CATALOG iceberg WITH (\n"
+ + " 'type' = 'iceberg',\n"
+ + " 'catalog-type' = 'hadoop',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "/iceberg',\n"
+ + " 'cache-enabled' = 'false'\n"
+ + ")");
+
+ tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag1', 1)");
+ // Snapshot ID 4 due to full-compaction.delta-commits=1
+ tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag2', 4)");
+
+ // Delete tag
+ tEnv.executeSql("CALL paimon.sys.delete_tag('default.T', 'tag2')");
+
+ assertThat(
+ collect(
+ tEnv.executeSql(
+ "SELECT v1, k, v2, pt FROM
iceberg.`default`.T /*+ OPTIONS('tag'='tag1') */ ORDER BY pt, k")))
+ .containsExactly(
+ Row.of(100, 10, "apple", 1),
+ Row.of(110, 11, "banana", 1),
+ Row.of(200, 20, "cat", 2),
+ Row.of(210, 21, "dog", 2));
+
+ assertThat(
+ collect(
+ tEnv.executeSql(
+ "SELECT name, type, snapshot_id FROM
iceberg.`default`.T$refs")))
+ .containsExactlyInAnyOrder(Row.of("tag1", "TAG", 1L));
+ }
+
private List<Row> collect(TableResult result) throws Exception {
List<Row> rows = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
index ee930a06fc..d046687f69 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
@@ -210,7 +210,7 @@ public class AutoTagForSavepointCommitterOperatorTest
extends CommitterOperatorT
table::snapshotManager,
table::tagManager,
() -> table.store().newTagDeletion(),
- () -> table.store().createTagCallbacks(),
+ () -> table.store().createTagCallbacks(table),
table.store().options().tagDefaultTimeRetained());
}
@@ -228,7 +228,7 @@ public class AutoTagForSavepointCommitterOperatorTest
extends CommitterOperatorT
table::snapshotManager,
table::tagManager,
() -> table.store().newTagDeletion(),
- () -> table.store().createTagCallbacks(),
+ () -> table.store().createTagCallbacks(table),
table.store().options().tagDefaultTimeRetained());
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
index d75ca5ee0a..8f61adb56c 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
@@ -74,7 +74,10 @@ public class ExpireTagsProcedure extends BaseProcedure {
table -> {
FileStoreTable fileStoreTable = (FileStoreTable) table;
TagTimeExpire tagTimeExpire =
-
fileStoreTable.store().newTagCreationManager().getTagTimeExpire();
+ fileStoreTable
+ .store()
+ .newTagCreationManager(fileStoreTable)
+ .getTagTimeExpire();
if (olderThanStr != null) {
LocalDateTime olderThanTime =
DateTimeUtils.parseTimestampData(