This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f757b5258c [core] Support tags in Iceberg compatibility (#5570)
f757b5258c is described below

commit f757b5258cd0f64ed27aeb432240fb5bbb379284
Author: Nick Del Nano <[email protected]>
AuthorDate: Mon May 19 00:22:00 2025 -0600

    [core] Support tags in Iceberg compatibility (#5570)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  10 +-
 .../src/main/java/org/apache/paimon/FileStore.java |   4 +-
 .../paimon/iceberg/IcebergCommitCallback.java      | 143 +++++++++++++++++-
 .../paimon/iceberg/metadata/IcebergMetadata.java   |  26 +++-
 .../apache/paimon/iceberg/metadata/IcebergRef.java |  94 ++++++++++++
 .../paimon/privilege/PrivilegedFileStore.java      |   8 +-
 .../paimon/table/AbstractFileStoreTable.java       |  10 +-
 .../org/apache/paimon/table/sink/TagCallback.java  |   5 +
 .../org/apache/paimon/tag/TagBatchCreation.java    |  11 +-
 .../java/org/apache/paimon/utils/TagManager.java   |   2 +-
 .../paimon/iceberg/IcebergCompatibilityTest.java   | 166 +++++++++++++++++++++
 .../paimon/tag/SuccessFileTagCallBackTest.java     |   4 +-
 .../flink/iceberg/Flink116IcebergITCase.java       |  10 ++
 .../flink/iceberg/Flink117IcebergITCase.java       |  12 +-
 .../flink/procedure/ExpireTagsProcedure.java       |   2 +-
 .../flink/procedure/ExpireTagsProcedure.java       |   2 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   2 +-
 .../flink/iceberg/FlinkIcebergITCaseBase.java      | 164 ++++++++++++++++++++
 .../AutoTagForSavepointCommitterOperatorTest.java  |   4 +-
 .../spark/procedure/ExpireTagsProcedure.java       |   5 +-
 20 files changed, 650 insertions(+), 34 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 1b0f07efb3..929d760b0b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -450,17 +450,17 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
     }
 
     @Override
-    public TagAutoManager newTagCreationManager() {
+    public TagAutoManager newTagCreationManager(FileStoreTable table) {
         return TagAutoManager.create(
                 options,
                 snapshotManager(),
                 newTagManager(),
                 newTagDeletion(),
-                createTagCallbacks());
+                createTagCallbacks(table));
     }
 
     @Override
-    public List<TagCallback> createTagCallbacks() {
+    public List<TagCallback> createTagCallbacks(FileStoreTable table) {
         List<TagCallback> callbacks = new 
ArrayList<>(CallbackUtils.loadTagCallbacks(options));
         String partitionField = options.tagToPartitionField();
 
@@ -473,6 +473,10 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
         if (options.tagCreateSuccessFile()) {
             callbacks.add(new SuccessFileTagCallback(fileIO, 
newTagManager().tagDirectory()));
         }
+        if 
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
+                != IcebergOptions.StorageType.DISABLED) {
+            callbacks.add(new IcebergCommitCallback(table, ""));
+        }
         return callbacks;
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 4d50021b3c..34e04cfabb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -111,13 +111,13 @@ public interface FileStore<T> {
             Duration checkInterval,
             PartitionExpireStrategy expireStrategy);
 
-    TagAutoManager newTagCreationManager();
+    TagAutoManager newTagCreationManager(FileStoreTable table);
 
     ServiceManager newServiceManager();
 
     boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
 
-    List<TagCallback> createTagCallbacks();
+    List<TagCallback> createTagCallbacks(FileStoreTable table);
 
     void setManifestCache(SegmentsCache<Path> manifestCache);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 9808d378f6..dc45fbe4f5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -37,6 +37,7 @@ import org.apache.paimon.iceberg.metadata.IcebergDataField;
 import org.apache.paimon.iceberg.metadata.IcebergMetadata;
 import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
 import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.iceberg.metadata.IcebergRef;
 import org.apache.paimon.iceberg.metadata.IcebergSchema;
 import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
 import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
@@ -49,6 +50,7 @@ import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ScanMode;
@@ -61,6 +63,9 @@ import org.apache.paimon.utils.ManifestReadThreadPool;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -83,7 +88,9 @@ import java.util.stream.Collectors;
  * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg 
readers can read
  * Paimon's {@link RawFile}.
  */
-public class IcebergCommitCallback implements CommitCallback {
+public class IcebergCommitCallback implements CommitCallback, TagCallback {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergCommitCallback.class);
 
     // see org.apache.iceberg.hadoop.Util
     private static final String VERSION_HINT_FILENAME = "version-hint.text";
@@ -272,6 +279,13 @@ public class IcebergCommitCallback implements 
CommitCallback {
                         
pathFactory.toManifestListPath(manifestListFileName).toString(),
                         schemaId);
 
+        Map<String, IcebergRef> icebergTags =
+                table.tagManager().tags().entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        entry -> entry.getValue().get(0),
+                                        entry -> new 
IcebergRef(entry.getKey().id())));
+
         String tableUuid = UUID.randomUUID().toString();
         IcebergMetadata metadata =
                 new IcebergMetadata(
@@ -289,7 +303,8 @@ public class IcebergCommitCallback implements 
CommitCallback {
                                         // not sure why, this is a result 
tested by hand
                                         IcebergPartitionField.FIRST_FIELD_ID - 
1),
                         Collections.singletonList(snapshot),
-                        (int) snapshotId);
+                        (int) snapshotId,
+                        icebergTags);
 
         Path metadataPath = pathFactory.toMetadataPath(snapshotId);
         table.fileIO().tryToWriteAtomic(metadataPath, metadata.toJson());
@@ -439,7 +454,8 @@ public class IcebergCommitCallback implements 
CommitCallback {
                         baseMetadata.partitionSpecs(),
                         baseMetadata.lastPartitionId(),
                         snapshots,
-                        (int) snapshotId);
+                        (int) snapshotId,
+                        baseMetadata.refs());
 
         Path metadataPath = pathFactory.toMetadataPath(snapshotId);
         table.fileIO().tryToWriteAtomic(metadataPath, metadata.toJson());
@@ -798,6 +814,127 @@ public class IcebergCommitCallback implements 
CommitCallback {
         }
     }
 
+    @Override
+    public void notifyCreation(String tagName) {
+        throw new UnsupportedOperationException(
+                "IcebergCommitCallback notifyCreation requires a snapshot ID");
+    }
+
+    @Override
+    public void notifyCreation(String tagName, long snapshotId) {
+        try {
+            Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
+            if (latestSnapshot == null) {
+                LOG.info(
+                        "Latest Iceberg snapshot not found when creating tag 
{} for snapshot {}. Unable to create tag.",
+                        tagName,
+                        snapshotId);
+                return;
+            }
+
+            Path baseMetadataPath = 
pathFactory.toMetadataPath(latestSnapshot.id());
+            if (!table.fileIO().exists(baseMetadataPath)) {
+                LOG.info(
+                        "Iceberg metadata file {} not found when creating tag 
{} for snapshot {}. Unable to create tag.",
+                        baseMetadataPath,
+                        tagName,
+                        snapshotId);
+                return;
+            }
+
+            IcebergMetadata baseMetadata =
+                    IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath);
+
+            baseMetadata.refs().put(tagName, new IcebergRef(snapshotId));
+
+            IcebergMetadata metadata =
+                    new IcebergMetadata(
+                            baseMetadata.tableUuid(),
+                            baseMetadata.location(),
+                            baseMetadata.currentSnapshotId(),
+                            baseMetadata.lastColumnId(),
+                            baseMetadata.schemas(),
+                            baseMetadata.currentSchemaId(),
+                            baseMetadata.partitionSpecs(),
+                            baseMetadata.lastPartitionId(),
+                            baseMetadata.snapshots(),
+                            baseMetadata.currentSnapshotId(),
+                            baseMetadata.refs());
+
+            /*
+            Overwrite the latest metadata file
+            Currently the Paimon table snapshot id value is the same as the 
Iceberg metadata
+            version number. Tag creation overwrites the latest metadata file 
to maintain this.
+            There is no need to update the catalog after overwrite.
+             */
+            table.fileIO().overwriteFileUtf8(baseMetadataPath, 
metadata.toJson());
+            LOG.info(
+                    "Iceberg metadata file {} overwritten to add tag {} for 
snapshot {}.",
+                    baseMetadataPath,
+                    tagName,
+                    snapshotId);
+
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to create tag " + tagName, 
e);
+        }
+    }
+
+    @Override
+    public void notifyDeletion(String tagName) {
+        try {
+            Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
+            if (latestSnapshot == null) {
+                LOG.info(
+                        "Latest Iceberg snapshot not found when deleting tag 
{}. Unable to delete tag.",
+                        tagName);
+                return;
+            }
+
+            Path baseMetadataPath = 
pathFactory.toMetadataPath(latestSnapshot.id());
+            if (!table.fileIO().exists(baseMetadataPath)) {
+                LOG.info(
+                        "Iceberg metadata file {} not found when deleting tag 
{}. Unable to delete tag.",
+                        baseMetadataPath,
+                        tagName);
+                return;
+            }
+
+            IcebergMetadata baseMetadata =
+                    IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath);
+
+            baseMetadata.refs().remove(tagName);
+
+            IcebergMetadata metadata =
+                    new IcebergMetadata(
+                            baseMetadata.tableUuid(),
+                            baseMetadata.location(),
+                            baseMetadata.currentSnapshotId(),
+                            baseMetadata.lastColumnId(),
+                            baseMetadata.schemas(),
+                            baseMetadata.currentSchemaId(),
+                            baseMetadata.partitionSpecs(),
+                            baseMetadata.lastPartitionId(),
+                            baseMetadata.snapshots(),
+                            baseMetadata.currentSnapshotId(),
+                            baseMetadata.refs());
+
+            /*
+            Overwrite the latest metadata file
+            Currently the Paimon table snapshot id value is the same as the 
Iceberg metadata
+            version number. Tag creation overwrites the latest metadata file 
to maintain this.
+            There is no need to update the catalog after overwrite.
+             */
+            table.fileIO().overwriteFileUtf8(baseMetadataPath, 
metadata.toJson());
+            LOG.info(
+                    "Iceberg metadata file {} overwritten to delete tag {}.",
+                    baseMetadataPath,
+                    tagName);
+
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to create tag " + tagName, 
e);
+        }
+    }
+
     // 
-------------------------------------------------------------------------------------
     // Utils
     // 
-------------------------------------------------------------------------------------
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
index fbaf806002..00c4c3de61 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -62,6 +62,7 @@ public class IcebergMetadata {
     private static final String FIELD_SNAPSHOTS = "snapshots";
     private static final String FIELD_CURRENT_SNAPSHOT_ID = 
"current-snapshot-id";
     private static final String FIELD_PROPERTIES = "properties";
+    private static final String FIELD_REFS = "refs";
 
     @JsonProperty(FIELD_FORMAT_VERSION)
     private final int formatVersion;
@@ -112,6 +113,10 @@ public class IcebergMetadata {
     @Nullable
     private final Map<String, String> properties;
 
+    @JsonProperty(FIELD_REFS)
+    @Nullable
+    private final Map<String, IcebergRef> refs;
+
     public IcebergMetadata(
             String tableUuid,
             String location,
@@ -122,7 +127,8 @@ public class IcebergMetadata {
             List<IcebergPartitionSpec> partitionSpecs,
             int lastPartitionId,
             List<IcebergSnapshot> snapshots,
-            long currentSnapshotId) {
+            long currentSnapshotId,
+            @Nullable Map<String, IcebergRef> refs) {
         this(
                 CURRENT_FORMAT_VERSION,
                 tableUuid,
@@ -139,7 +145,8 @@ public class IcebergMetadata {
                 IcebergSortOrder.ORDER_ID,
                 snapshots,
                 currentSnapshotId,
-                new HashMap<>());
+                new HashMap<>(),
+                refs);
     }
 
     @JsonCreator
@@ -159,7 +166,8 @@ public class IcebergMetadata {
             @JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) int defaultSortOrderId,
             @JsonProperty(FIELD_SNAPSHOTS) List<IcebergSnapshot> snapshots,
             @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) long currentSnapshotId,
-            @JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> 
properties) {
+            @JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> 
properties,
+            @JsonProperty(FIELD_REFS) @Nullable Map<String, IcebergRef> refs) {
         this.formatVersion = formatVersion;
         this.tableUuid = tableUuid;
         this.location = location;
@@ -176,6 +184,7 @@ public class IcebergMetadata {
         this.snapshots = snapshots;
         this.currentSnapshotId = currentSnapshotId;
         this.properties = properties;
+        this.refs = refs;
     }
 
     @JsonGetter(FIELD_FORMAT_VERSION)
@@ -258,6 +267,11 @@ public class IcebergMetadata {
         return properties == null ? new HashMap<>() : properties;
     }
 
+    @JsonGetter(FIELD_REFS)
+    public Map<String, IcebergRef> refs() {
+        return refs == null ? new HashMap<>() : refs;
+    }
+
     public IcebergSnapshot currentSnapshot() {
         for (IcebergSnapshot snapshot : snapshots) {
             if (snapshot.snapshotId() == currentSnapshotId) {
@@ -302,7 +316,8 @@ public class IcebergMetadata {
                 sortOrders,
                 defaultSortOrderId,
                 snapshots,
-                currentSnapshotId);
+                currentSnapshotId,
+                refs);
     }
 
     @Override
@@ -329,6 +344,7 @@ public class IcebergMetadata {
                 && Objects.equals(sortOrders, that.sortOrders)
                 && defaultSortOrderId == that.defaultSortOrderId
                 && Objects.equals(snapshots, that.snapshots)
-                && currentSnapshotId == that.currentSnapshotId;
+                && currentSnapshotId == that.currentSnapshotId
+                && Objects.equals(refs, that.refs);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergRef.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergRef.java
new file mode 100644
index 0000000000..c808fcd7b7
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergRef.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.metadata;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * Iceberg's ref metadata.
+ *
+ * <p>See <a href="https://iceberg.apache.org/spec/#refs";>Iceberg spec</a>.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class IcebergRef {
+
+    private static final String FIELD_SNAPSHOT_ID = "snapshot-id";
+    private static final String FIELD_TYPE = "type";
+    private static final String FIELD_MAX_REF_AGE_MS = "max-ref-age-ms";
+
+    @JsonProperty(FIELD_SNAPSHOT_ID)
+    private final long snapshotId;
+
+    @JsonProperty(FIELD_TYPE)
+    private final String type;
+
+    @JsonProperty(FIELD_MAX_REF_AGE_MS)
+    @Nullable
+    private final Long maxRefAgeMs;
+
+    @JsonCreator
+    public IcebergRef(@JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId) {
+        this.snapshotId = snapshotId;
+        this.type = "tag"; // Only type supported is tag
+        this.maxRefAgeMs =
+                Long.MAX_VALUE; // Tags are expired by Paimon, not by Iceberg 
compatibility. So
+        // this value is set to a default value of Long.MAX_VALUE.
+    }
+
+    @JsonGetter(FIELD_SNAPSHOT_ID)
+    public long snapshotId() {
+        return snapshotId;
+    }
+
+    @JsonGetter(FIELD_TYPE)
+    public String type() {
+        return type;
+    }
+
+    @JsonGetter(FIELD_MAX_REF_AGE_MS)
+    public Long maxRefAgeMs() {
+        return maxRefAgeMs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof IcebergRef)) {
+            return false;
+        }
+        IcebergRef that = (IcebergRef) o;
+        return snapshotId == that.snapshotId
+                && type.equals(that.type)
+                && maxRefAgeMs == that.maxRefAgeMs;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(snapshotId, type, maxRefAgeMs);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index ebcac8252c..a6084350ba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -202,9 +202,9 @@ public class PrivilegedFileStore<T> implements FileStore<T> 
{
     }
 
     @Override
-    public TagAutoManager newTagCreationManager() {
+    public TagAutoManager newTagCreationManager(FileStoreTable table) {
         privilegeChecker.assertCanInsert(identifier);
-        return wrapped.newTagCreationManager();
+        return wrapped.newTagCreationManager(table);
     }
 
     @Override
@@ -220,8 +220,8 @@ public class PrivilegedFileStore<T> implements FileStore<T> 
{
     }
 
     @Override
-    public List<TagCallback> createTagCallbacks() {
-        return wrapped.createTagCallbacks();
+    public List<TagCallback> createTagCallbacks(FileStoreTable table) {
+        return wrapped.createTagCallbacks(table);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index fdeef2f5f0..d7b0af322d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -448,7 +448,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 store().newCommit(commitUser, this),
                 newExpireRunnable(),
                 options.writeOnly() ? null : 
store().newPartitionExpire(commitUser, this),
-                options.writeOnly() ? null : store().newTagCreationManager(),
+                options.writeOnly() ? null : 
store().newTagCreationManager(this),
                 CoreOptions.fromMap(options()).consumerExpireTime(),
                 new ConsumerManager(fileIO, path, snapshotManager().branch()),
                 options.snapshotExpireExecutionMode(),
@@ -568,7 +568,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     private void createTag(String tagName, Snapshot fromSnapshot, @Nullable 
Duration timeRetained) {
         tagManager()
                 .createTag(
-                        fromSnapshot, tagName, timeRetained, 
store().createTagCallbacks(), false);
+                        fromSnapshot,
+                        tagName,
+                        timeRetained,
+                        store().createTagCallbacks(this),
+                        false);
     }
 
     @Override
@@ -596,7 +600,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                         tagName,
                         store().newTagDeletion(),
                         snapshotManager(),
-                        store().createTagCallbacks());
+                        store().createTagCallbacks(this));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
index 1d20bb89db..d99451633e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java
@@ -27,5 +27,10 @@ public interface TagCallback extends AutoCloseable {
 
     void notifyCreation(String tagName);
 
+    // Iceberg tag callbacks require snapshotId
+    default void notifyCreation(String tagName, long snapshotId) {
+        notifyCreation(tagName);
+    }
+
     void notifyDeletion(String tagName);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
index 8afeaa6216..f788b38ec0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
@@ -69,13 +69,13 @@ public class TagBatchCreation {
         try {
             // If the tag already exists, delete the tag
             tagManager.deleteTag(
-                    tagName, tagDeletion, snapshotManager, 
table.store().createTagCallbacks());
+                    tagName, tagDeletion, snapshotManager, 
table.store().createTagCallbacks(table));
             // Create a new tag
             tagManager.createTag(
                     snapshot,
                     tagName,
                     table.coreOptions().tagDefaultTimeRetained(),
-                    table.store().createTagCallbacks(),
+                    table.store().createTagCallbacks(table),
                     false);
         } catch (Exception e) {
             LOG.warn(
@@ -85,7 +85,10 @@ public class TagBatchCreation {
                     e);
             if (tagManager.tagExists(tagName)) {
                 tagManager.deleteTag(
-                        tagName, tagDeletion, snapshotManager, 
table.store().createTagCallbacks());
+                        tagName,
+                        tagDeletion,
+                        snapshotManager,
+                        table.store().createTagCallbacks(table));
             }
         }
         // Expire the tag
@@ -113,7 +116,7 @@ public class TagBatchCreation {
                                     toBeDeleted,
                                     tagDeletion,
                                     snapshotManager,
-                                    table.store().createTagCallbacks());
+                                    table.store().createTagCallbacks(table));
                             tagCount--;
                             if (tagCount == tagNumRetainedMax) {
                                 break;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index cbebb76ed9..132ea8bb9c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -146,7 +146,7 @@ public class TagManager {
 
         if (callbacks != null) {
             try {
-                callbacks.forEach(callback -> 
callback.notifyCreation(tagName));
+                callbacks.forEach(callback -> callback.notifyCreation(tagName, 
snapshot.id()));
             } finally {
                 for (TagCallback tagCallback : callbacks) {
                     IOUtils.closeQuietly(tagCallback);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 964ff3a7a7..8d5f3e997f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -38,6 +38,7 @@ import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
 import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
 import org.apache.paimon.iceberg.manifest.IcebergManifestList;
 import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergRef;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
@@ -769,6 +770,171 @@ public class IcebergCompatibilityTest {
         }
     }
 
+    /*
+    Create snapshots
+    Create tags
+    Verify tags
+    Delete a tag
+    Verify tags
+     */
+    @Test
+    public void testCreateAndDeleteTags() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType, Collections.emptyList(), 
Collections.singletonList("k"), 1);
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 10));
+        write.write(GenericRow.of(2, 20));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 
10)", "Record(2, 20)");
+
+        write.write(GenericRow.of(3, 30));
+        write.write(GenericRow.of(4, 40));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder(
+                        "Record(1, 10)", "Record(2, 20)", "Record(3, 30)", 
"Record(4, 40)");
+
+        String tagV1 = "v1";
+        table.createTag(tagV1, 1);
+        String tagV3 = "v3";
+        table.createTag(tagV3, 3);
+
+        long latestSnapshotId = table.snapshotManager().latestSnapshotId();
+        Map<String, IcebergRef> refs = getRefsFromSnapshot(table, 
latestSnapshotId);
+
+        assertThat(refs.size() == 2).isTrue();
+
+        assertThat(refs.get(tagV1).snapshotId() == 1).isTrue();
+        assertThat(refs.get(tagV1).type().equals("tag")).isTrue(); // constant
+        assertThat(refs.get(tagV1).maxRefAgeMs() == Long.MAX_VALUE).isTrue(); 
// constant
+
+        assertThat(refs.get(tagV3).snapshotId() == latestSnapshotId).isTrue();
+
+        assertThat(
+                        getIcebergResult(
+                                icebergTable ->
+                                        IcebergGenerics.read(icebergTable)
+                                                .useSnapshot(
+                                                        
icebergTable.refs().get(tagV1).snapshotId())
+                                                .build(),
+                                Record::toString))
+                .containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)");
+        assertThat(
+                        getIcebergResult(
+                                icebergTable ->
+                                        IcebergGenerics.read(icebergTable)
+                                                .useSnapshot(
+                                                        
icebergTable.refs().get(tagV3).snapshotId())
+                                                .build(),
+                                Record::toString))
+                .containsExactlyInAnyOrder(
+                        "Record(1, 10)", "Record(2, 20)", "Record(3, 30)", 
"Record(4, 40)");
+
+        table.deleteTag(tagV1);
+
+        Map<String, IcebergRef> refsAfterDelete = getRefsFromSnapshot(table, 
latestSnapshotId);
+
+        assertThat(refsAfterDelete.size() == 1).isTrue();
+        assertThat(refsAfterDelete.get(tagV3).snapshotId() == 
latestSnapshotId).isTrue();
+
+        assertThat(
+                        getIcebergResult(
+                                icebergTable ->
+                                        IcebergGenerics.read(icebergTable)
+                                                .useSnapshot(
+                                                        
icebergTable.refs().get(tagV3).snapshotId())
+                                                .build(),
+                                Record::toString))
+                .containsExactlyInAnyOrder(
+                        "Record(1, 10)", "Record(2, 20)", "Record(3, 30)", 
"Record(4, 40)");
+
+        write.close();
+        commit.close();
+    }
+
+    /*
+    Create a snapshot and tag
+    Delete Iceberg metadata
+    Commit again and verify that Iceberg metadata and tags are created
+     */
+    @Test
+    public void testTagsCreateMetadataWithoutBase() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType, Collections.emptyList(), 
Collections.singletonList("k"), 1);
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 10));
+        write.write(GenericRow.of(2, 20));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        String tagV1 = "v1";
+        table.createTag(tagV1, 1);
+
+        assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 
10)", "Record(2, 20)");
+
+        write.write(GenericRow.of(3, 30));
+        write.write(GenericRow.of(4, 40));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder(
+                        "Record(1, 10)", "Record(2, 20)", "Record(3, 30)", 
"Record(4, 40)");
+
+        Map<String, IcebergRef> refs =
+                getRefsFromSnapshot(table, 
table.snapshotManager().latestSnapshotId());
+
+        assertThat(refs.size() == 1).isTrue();
+        assertThat(refs.get(tagV1).snapshotId() == 1).isTrue();
+
+        assertThat(
+                        getIcebergResult(
+                                icebergTable ->
+                                        IcebergGenerics.read(icebergTable)
+                                                .useSnapshot(
+                                                        
icebergTable.refs().get(tagV1).snapshotId())
+                                                .build(),
+                                Record::toString))
+                .containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)");
+
+        // Delete Iceberg metadata so that metadata is created without base
+        table.fileIO().deleteDirectoryQuietly(new Path(table.location(), 
"metadata"));
+
+        write.write(GenericRow.of(5, 50));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(4, write.prepareCommit(true, 4));
+
+        Map<String, IcebergRef> refsAfterMetadataDelete =
+                getRefsFromSnapshot(table, 
table.snapshotManager().latestSnapshotId());
+        assertThat(refsAfterMetadataDelete.size() == 1).isTrue();
+        assertThat(refsAfterMetadataDelete.get(tagV1).snapshotId() == 
1).isTrue();
+    }
+
+    private Map<String, IcebergRef> getRefsFromSnapshot(FileStoreTable table, 
long snapshotId) {
+        return IcebergMetadata.fromPath(
+                        table.fileIO(),
+                        new Path(table.location(), "metadata/v" + snapshotId + 
".metadata.json"))
+                .refs();
+    }
+
     // ------------------------------------------------------------------------
     //  Random Tests
     // ------------------------------------------------------------------------
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
index 34dd118818..bb0b119b0d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/tag/SuccessFileTagCallBackTest.java
@@ -57,7 +57,7 @@ public class SuccessFileTagCallBackTest extends 
PrimaryKeyTableTestBase {
         TagManager tagManager = fileStore.newTagManager();
         SuccessFileTagCallback successFileTagCallback = null;
 
-        List<TagCallback> tagCallbacks = fileStore.createTagCallbacks();
+        List<TagCallback> tagCallbacks = fileStore.createTagCallbacks(table);
         
assertThat(tagCallbacks).hasAtLeastOneElementOfType(SuccessFileTagCallback.class);
 
         for (TagCallback tagCallback : tagCallbacks) {
@@ -89,7 +89,7 @@ public class SuccessFileTagCallBackTest extends 
PrimaryKeyTableTestBase {
         TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
         FileStore<?> fileStore = table.store();
         TagManager tagManager = fileStore.newTagManager();
-        List<TagCallback> tagCallbacks = fileStore.createTagCallbacks();
+        List<TagCallback> tagCallbacks = fileStore.createTagCallbacks(table);
         
assertThat(tagCallbacks).doesNotHaveAnyElementsOfTypes(SuccessFileTagCallback.class);
 
         commit.commit(new ManifestCommittable(0, 
utcMills("2023-07-18T12:12:00")));
diff --git 
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
index 3001fefe4b..dc4810a902 100644
--- 
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
+++ 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java
@@ -26,4 +26,14 @@ public class Flink116IcebergITCase extends 
FlinkIcebergITCaseBase {
         // Flink 1.16 (or maybe Calcite?) will mistakenly cast the result to 
VARCHAR(5),
         // so we skip this test in Flink 1.16.
     }
+
+    @Override
+    public void testCreateTags(String format) throws Exception {
+        // Flink 1.16 does not support create_tag procedure so we skip this 
test.
+    }
+
+    @Override
+    public void testDeleteTags(String format) throws Exception {
+        // Flink 1.16 does not support delete_tag procedure so we skip this 
test.
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
 
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
index 3628043bdf..7a6f0271ad 100644
--- 
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
+++ 
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java
@@ -19,4 +19,14 @@
 package org.apache.paimon.flink.iceberg;
 
 /** IT cases for Paimon Iceberg compatibility in Flink 1.17. */
-public class Flink117IcebergITCase extends FlinkIcebergITCaseBase {}
+public class Flink117IcebergITCase extends FlinkIcebergITCaseBase {
+    @Override
+    public void testCreateTags(String format) throws Exception {
+        // Flink 1.17 does not support create_tag procedure so we skip this 
test.
+    }
+
+    @Override
+    public void testDeleteTags(String format) throws Exception {
+        // Flink 1.17 does not support delete_tag procedure so we skip this 
test.
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
index 183e39dc66..0d33553b7a 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
@@ -43,7 +43,7 @@ public class ExpireTagsProcedure extends ProcedureBase {
             throws Catalog.TableNotExistException {
         FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
         TagTimeExpire tagTimeExpire =
-                
fileStoreTable.store().newTagCreationManager().getTagTimeExpire();
+                
fileStoreTable.store().newTagCreationManager(fileStoreTable).getTagTimeExpire();
         if (olderThanStr != null) {
             LocalDateTime olderThanTime =
                     DateTimeUtils.parseTimestampData(olderThanStr, 3, 
TimeZone.getDefault())
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
index 3d8af1de70..df287219b4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java
@@ -53,7 +53,7 @@ public class ExpireTagsProcedure extends ProcedureBase {
             throws Catalog.TableNotExistException {
         FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
         TagTimeExpire tagTimeExpire =
-                
fileStoreTable.store().newTagCreationManager().getTagTimeExpire();
+                
fileStoreTable.store().newTagCreationManager(fileStoreTable).getTagTimeExpire();
         if (olderThanStr != null) {
             LocalDateTime olderThanTime =
                     DateTimeUtils.parseTimestampData(olderThanStr, 3, 
TimeZone.getDefault())
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index d99dd255e4..ccceaa1a54 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -289,7 +289,7 @@ public abstract class FlinkSink<T> implements Serializable {
                             table::snapshotManager,
                             table::tagManager,
                             () -> table.store().newTagDeletion(),
-                            () -> table.store().createTagCallbacks(),
+                            () -> table.store().createTagCallbacks(table),
                             table.coreOptions().tagDefaultTimeRetained());
         }
         if (conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.BATCH
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
index 9202cfb8fe..14ad801607 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java
@@ -459,6 +459,170 @@ public abstract class FlinkIcebergITCaseBase extends 
AbstractTestBase {
                         Row.of(2, "pear", 2000L), Row.of(3, "watermelon", 
3000L));
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"orc", "parquet", "avro"})
+    public void testCreateTags(String format) throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv = 
tableEnvironmentBuilder().batchMode().parallelism(2).build();
+        tEnv.executeSql(
+                "CREATE CATALOG paimon WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql(
+                "CREATE TABLE paimon.`default`.T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v1 INT,\n"
+                        + "  v2 STRING,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'metadata.iceberg.storage' = 'hadoop-catalog',\n"
+                        // make sure all changes are visible in iceberg 
metadata
+                        + "  'full-compaction.delta-commits' = '1',\n"
+                        + "  'file.format' = '"
+                        + format
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql(
+                        "INSERT INTO paimon.`default`.T VALUES "
+                                + "(1, 10, 100, 'apple'), "
+                                + "(1, 11, 110, 'banana'), "
+                                + "(2, 20, 200, 'cat'), "
+                                + "(2, 21, 210, 'dog')")
+                .await();
+
+        tEnv.executeSql(
+                        "INSERT INTO paimon.`default`.T VALUES "
+                                + "(1, 10, 101, 'red'), "
+                                + "(1, 12, 121, 'green'), "
+                                + "(2, 20, 201, 'blue'), "
+                                + "(2, 22, 221, 'yellow')")
+                .await();
+
+        tEnv.executeSql(
+                "CREATE CATALOG iceberg WITH (\n"
+                        + "  'type' = 'iceberg',\n"
+                        + "  'catalog-type' = 'hadoop',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "/iceberg',\n"
+                        + "  'cache-enabled' = 'false'\n"
+                        + ")");
+
+        tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag1', 1)");
+
+        assertThat(
+                        collect(
+                                tEnv.executeSql(
+                                        "SELECT v1, k, v2, pt FROM 
iceberg.`default`.T /*+ OPTIONS('tag'='tag1') */ ORDER BY pt, k")))
+                .containsExactly(
+                        Row.of(100, 10, "apple", 1),
+                        Row.of(110, 11, "banana", 1),
+                        Row.of(200, 20, "cat", 2),
+                        Row.of(210, 21, "dog", 2));
+
+        // Snapshot ID 4 due to full-compaction.delta-commits=1
+        tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag2', 4)");
+
+        assertThat(
+                        collect(
+                                tEnv.executeSql(
+                                        "SELECT v1, k, v2, pt FROM 
iceberg.`default`.T /*+ OPTIONS('tag'='tag2') */ ORDER BY pt, k")))
+                .containsExactly(
+                        Row.of(101, 10, "red", 1),
+                        Row.of(110, 11, "banana", 1),
+                        Row.of(121, 12, "green", 1),
+                        Row.of(201, 20, "blue", 2),
+                        Row.of(210, 21, "dog", 2),
+                        Row.of(221, 22, "yellow", 2));
+
+        assertThat(
+                        collect(
+                                tEnv.executeSql(
+                                        "SELECT name, type, snapshot_id FROM 
iceberg.`default`.T$refs")))
+                .containsExactlyInAnyOrder(Row.of("tag1", "TAG", 1L), 
Row.of("tag2", "TAG", 4L));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"orc", "parquet", "avro"})
+    public void testDeleteTags(String format) throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv = 
tableEnvironmentBuilder().batchMode().parallelism(2).build();
+        tEnv.executeSql(
+                "CREATE CATALOG paimon WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql(
+                "CREATE TABLE paimon.`default`.T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v1 INT,\n"
+                        + "  v2 STRING,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'metadata.iceberg.storage' = 'hadoop-catalog',\n"
+                        // make sure all changes are visible in iceberg 
metadata
+                        + "  'full-compaction.delta-commits' = '1',\n"
+                        + "  'file.format' = '"
+                        + format
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql(
+                        "INSERT INTO paimon.`default`.T VALUES "
+                                + "(1, 10, 100, 'apple'), "
+                                + "(1, 11, 110, 'banana'), "
+                                + "(2, 20, 200, 'cat'), "
+                                + "(2, 21, 210, 'dog')")
+                .await();
+
+        tEnv.executeSql(
+                        "INSERT INTO paimon.`default`.T VALUES "
+                                + "(1, 10, 101, 'red'), "
+                                + "(1, 12, 121, 'green'), "
+                                + "(2, 20, 201, 'blue'), "
+                                + "(2, 22, 221, 'yellow')")
+                .await();
+
+        tEnv.executeSql(
+                "CREATE CATALOG iceberg WITH (\n"
+                        + "  'type' = 'iceberg',\n"
+                        + "  'catalog-type' = 'hadoop',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "/iceberg',\n"
+                        + "  'cache-enabled' = 'false'\n"
+                        + ")");
+
+        tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag1', 1)");
+        // Snapshot ID 4 due to full-compaction.delta-commits=1
+        tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag2', 4)");
+
+        // Delete tag
+        tEnv.executeSql("CALL paimon.sys.delete_tag('default.T', 'tag2')");
+
+        assertThat(
+                        collect(
+                                tEnv.executeSql(
+                                        "SELECT v1, k, v2, pt FROM 
iceberg.`default`.T /*+ OPTIONS('tag'='tag1') */ ORDER BY pt, k")))
+                .containsExactly(
+                        Row.of(100, 10, "apple", 1),
+                        Row.of(110, 11, "banana", 1),
+                        Row.of(200, 20, "cat", 2),
+                        Row.of(210, 21, "dog", 2));
+
+        assertThat(
+                        collect(
+                                tEnv.executeSql(
+                                        "SELECT name, type, snapshot_id FROM 
iceberg.`default`.T$refs")))
+                .containsExactlyInAnyOrder(Row.of("tag1", "TAG", 1L));
+    }
+
     private List<Row> collect(TableResult result) throws Exception {
         List<Row> rows = new ArrayList<>();
         try (CloseableIterator<Row> it = result.collect()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
index ee930a06fc..d046687f69 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
@@ -210,7 +210,7 @@ public class AutoTagForSavepointCommitterOperatorTest 
extends CommitterOperatorT
                 table::snapshotManager,
                 table::tagManager,
                 () -> table.store().newTagDeletion(),
-                () -> table.store().createTagCallbacks(),
+                () -> table.store().createTagCallbacks(table),
                 table.store().options().tagDefaultTimeRetained());
     }
 
@@ -228,7 +228,7 @@ public class AutoTagForSavepointCommitterOperatorTest 
extends CommitterOperatorT
                 table::snapshotManager,
                 table::tagManager,
                 () -> table.store().newTagDeletion(),
-                () -> table.store().createTagCallbacks(),
+                () -> table.store().createTagCallbacks(table),
                 table.store().options().tagDefaultTimeRetained());
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
index d75ca5ee0a..8f61adb56c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java
@@ -74,7 +74,10 @@ public class ExpireTagsProcedure extends BaseProcedure {
                 table -> {
                     FileStoreTable fileStoreTable = (FileStoreTable) table;
                     TagTimeExpire tagTimeExpire =
-                            
fileStoreTable.store().newTagCreationManager().getTagTimeExpire();
+                            fileStoreTable
+                                    .store()
+                                    .newTagCreationManager(fileStoreTable)
+                                    .getTagTimeExpire();
                     if (olderThanStr != null) {
                         LocalDateTime olderThanTime =
                                 DateTimeUtils.parseTimestampData(

Reply via email to