This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a6e463329 [core] Fix that some tag creations haven't handle tag 
callbacks (#2550)
a6e463329 is described below

commit a6e463329131cbece788e9dfdc9ff25d09ca7535
Author: yuzelin <[email protected]>
AuthorDate: Thu Dec 21 13:27:17 2023 +0800

    [core] Fix that some tag creations haven't handle tag callbacks (#2550)
---
 .../java/org/apache/paimon/AbstractFileStore.java  | 31 ++++++++-
 .../org/apache/paimon/AppendOnlyFileStore.java     |  6 +-
 .../src/main/java/org/apache/paimon/FileStore.java |  4 ++
 .../java/org/apache/paimon/KeyValueFileStore.java  |  6 +-
 .../paimon/table/AbstractFileStoreTable.java       | 76 ++--------------------
 .../paimon/table/AppendOnlyFileStoreTable.java     |  3 +-
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  3 +-
 .../apache/paimon/table/sink/CallbackUtils.java    | 75 +++++++++++++++++++++
 .../org/apache/paimon/tag/TagAutoCreation.java     | 15 +++--
 .../java/org/apache/paimon/utils/TagManager.java   | 11 +++-
 .../test/java/org/apache/paimon/TestFileStore.java |  5 +-
 .../apache/paimon/operation/FileDeletionTest.java  | 28 ++++----
 .../operation/UncleanedFileStoreExpireTest.java    |  2 +-
 .../sink/AutoTagForSavepointCommitterOperator.java | 18 +++--
 .../flink/sink/BatchWriteGeneratorTagOperator.java |  2 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  3 +-
 .../AutoTagForSavepointCommitterOperatorTest.java  |  6 +-
 17 files changed, 187 insertions(+), 107 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index cf6809b95..c5a976823 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -25,6 +25,8 @@ import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.FileStoreCommitImpl;
 import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.PartitionExpire;
@@ -32,6 +34,9 @@ import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.CallbackUtils;
+import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoCreation;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -42,7 +47,9 @@ import org.apache.paimon.utils.TagManager;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 
 /**
  * Base {@link FileStore} implementation.
@@ -56,6 +63,7 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
     protected final long schemaId;
     protected final CoreOptions options;
     protected final RowType partitionType;
+    private final CatalogEnvironment catalogEnvironment;
 
     @Nullable private final SegmentsCache<String> writeManifestCache;
 
@@ -64,12 +72,14 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
             SchemaManager schemaManager,
             long schemaId,
             CoreOptions options,
-            RowType partitionType) {
+            RowType partitionType,
+            CatalogEnvironment catalogEnvironment) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schemaId = schemaId;
         this.options = options;
         this.partitionType = partitionType;
+        this.catalogEnvironment = catalogEnvironment;
         MemorySize writeManifestCache = options.writeManifestCache();
         this.writeManifestCache =
                 writeManifestCache.getBytes() == 0
@@ -229,6 +239,23 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
     @Nullable
     public TagAutoCreation newTagCreationManager() {
         return TagAutoCreation.create(
-                options, snapshotManager(), newTagManager(), newTagDeletion());
+                options,
+                snapshotManager(),
+                newTagManager(),
+                newTagDeletion(),
+                createTagCallbacks());
+    }
+
+    @Override
+    public List<TagCallback> createTagCallbacks() {
+        List<TagCallback> callbacks = new 
ArrayList<>(CallbackUtils.loadTagCallbacks(options));
+        String partitionField = options.tagToPartitionField();
+        MetastoreClient.Factory metastoreClientFactory =
+                catalogEnvironment.metastoreClientFactory();
+        if (partitionField != null && metastoreClientFactory != null) {
+            callbacks.add(
+                    new 
AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));
+        }
+        return callbacks;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index a36d02bd8..ec1e7cb58 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -29,6 +29,7 @@ import org.apache.paimon.operation.ScanBucketFilter;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.types.RowType;
 
 import java.util.Comparator;
@@ -53,8 +54,9 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
             RowType partitionType,
             RowType bucketKeyType,
             RowType rowType,
-            String tableName) {
-        super(fileIO, schemaManager, schemaId, options, partitionType);
+            String tableName,
+            CatalogEnvironment catalogEnvironment) {
+        super(fileIO, schemaManager, schemaId, options, partitionType, 
catalogEnvironment);
         this.bucketKeyType = bucketKeyType;
         this.rowType = rowType;
         this.tableName = tableName;
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index e67cf9f1e..f044102dc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -31,6 +31,7 @@ import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoCreation;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -40,6 +41,7 @@ import org.apache.paimon.utils.TagManager;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.List;
 
 /**
  * File store interface.
@@ -89,4 +91,6 @@ public interface FileStore<T> extends Serializable {
     TagAutoCreation newTagCreationManager();
 
     boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
+
+    List<TagCallback> createTagCallbacks();
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 98b88323f..43aadcbfd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -34,6 +34,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.KeyComparatorSupplier;
@@ -79,8 +80,9 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             RowType valueType,
             KeyValueFieldsExtractor keyValueFieldsExtractor,
             MergeFunctionFactory<KeyValue> mfFactory,
-            String tableName) {
-        super(fileIO, schemaManager, schemaId, options, partitionType);
+            String tableName,
+            CatalogEnvironment catalogEnvironment) {
+        super(fileIO, schemaManager, schemaId, options, partitionType, 
catalogEnvironment);
         this.crossPartitionUpdate = crossPartitionUpdate;
         this.bucketKeyType = bucketKeyType;
         this.keyType = keyType;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 14bc66316..41f0b447f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -34,12 +34,12 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaValidation;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.CallbackUtils;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableCommitImpl;
-import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
 import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerStreamTableScanImpl;
@@ -50,15 +50,12 @@ import 
org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
 import org.apache.paimon.tag.TagPreview;
-import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -284,7 +281,8 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     }
 
     private List<CommitCallback> createCommitCallbacks() {
-        List<CommitCallback> callbacks = new 
ArrayList<>(loadCommitCallbacks());
+        List<CommitCallback> callbacks =
+                new 
ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions()));
         CoreOptions options = coreOptions();
         MetastoreClient.Factory metastoreClientFactory =
                 catalogEnvironment.metastoreClientFactory();
@@ -308,62 +306,6 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         return callbacks;
     }
 
-    private List<TagCallback> createTagCallbacks() {
-        List<TagCallback> callbacks = new ArrayList<>(loadTagCallbacks());
-        String partitionField = coreOptions().tagToPartitionField();
-        MetastoreClient.Factory metastoreClientFactory =
-                catalogEnvironment.metastoreClientFactory();
-        if (partitionField != null && metastoreClientFactory != null) {
-            callbacks.add(
-                    new 
AddPartitionTagCallback(metastoreClientFactory.create(), partitionField));
-        }
-        return callbacks;
-    }
-
-    private List<TagCallback> loadTagCallbacks() {
-        return loadCallbacks(coreOptions().tagCallbacks(), TagCallback.class);
-    }
-
-    private List<CommitCallback> loadCommitCallbacks() {
-        return loadCallbacks(coreOptions().commitCallbacks(), 
CommitCallback.class);
-    }
-
-    @SuppressWarnings("unchecked")
-    private <T> List<T> loadCallbacks(Map<String, String> clazzParamMaps, 
Class<T> expectClass) {
-        List<T> result = new ArrayList<>();
-
-        for (Map.Entry<String, String> classParamEntry : 
clazzParamMaps.entrySet()) {
-            String className = classParamEntry.getKey();
-            String param = classParamEntry.getValue();
-
-            Class<?> clazz;
-            try {
-                clazz = Class.forName(className, true, 
this.getClass().getClassLoader());
-            } catch (ClassNotFoundException e) {
-                throw new RuntimeException(e);
-            }
-
-            Preconditions.checkArgument(
-                    expectClass.isAssignableFrom(clazz),
-                    "Class " + clazz + " must implement " + expectClass);
-
-            try {
-                if (param == null) {
-                    result.add((T) clazz.newInstance());
-                } else {
-                    result.add((T) 
clazz.getConstructor(String.class).newInstance(param));
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(
-                        "Failed to initialize commit callback "
-                                + className
-                                + (param == null ? "" : " with param " + 
param),
-                        e);
-            }
-        }
-        return result;
-    }
-
     private Optional<TableSchema> tryTimeTravel(Options options) {
         CoreOptions coreOptions = new CoreOptions(options);
 
@@ -419,17 +361,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 fromSnapshotId);
 
         Snapshot snapshot = snapshotManager.snapshot(fromSnapshotId);
-        tagManager().createTag(snapshot, tagName);
-
-        List<TagCallback> callbacks = Collections.emptyList();
-        try {
-            callbacks = createTagCallbacks();
-            callbacks.forEach(callback -> callback.notifyCreation(tagName));
-        } finally {
-            for (TagCallback tagCallback : callbacks) {
-                IOUtils.closeQuietly(tagCallback);
-            }
-        }
+        tagManager().createTag(snapshot, tagName, 
store().createTagCallbacks());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index dc865d93c..5f2c479e8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -81,7 +81,8 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
                             tableSchema.logicalPartitionType(),
                             tableSchema.logicalBucketKeyType(),
                             tableSchema.logicalRowType(),
-                            name());
+                            name(),
+                            catalogEnvironment);
         }
         return lazyStore;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 84a586066..fc4db704d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -109,7 +109,8 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
                             rowType,
                             extractor,
                             mfFactory,
-                            name());
+                            name(),
+                            catalogEnvironment);
         }
         return lazyStore;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
new file mode 100644
index 000000000..7d8a0a849
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Utils to load callbacks. */
+public class CallbackUtils {
+
+    public static List<TagCallback> loadTagCallbacks(CoreOptions coreOptions) {
+        return loadCallbacks(coreOptions.tagCallbacks(), TagCallback.class);
+    }
+
+    public static List<CommitCallback> loadCommitCallbacks(CoreOptions 
coreOptions) {
+        return loadCallbacks(coreOptions.commitCallbacks(), 
CommitCallback.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> List<T> loadCallbacks(
+            Map<String, String> clazzParamMaps, Class<T> expectClass) {
+        List<T> result = new ArrayList<>();
+
+        for (Map.Entry<String, String> classParamEntry : 
clazzParamMaps.entrySet()) {
+            String className = classParamEntry.getKey();
+            String param = classParamEntry.getValue();
+
+            Class<?> clazz;
+            try {
+                clazz = Class.forName(className, true, 
CallbackUtils.class.getClassLoader());
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+
+            Preconditions.checkArgument(
+                    expectClass.isAssignableFrom(clazz),
+                    "Class " + clazz + " must implement " + expectClass);
+
+            try {
+                if (param == null) {
+                    result.add((T) clazz.newInstance());
+                } else {
+                    result.add((T) 
clazz.getConstructor(String.class).newInstance(param));
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        "Failed to initialize commit callback "
+                                + className
+                                + (param == null ? "" : " with param " + 
param),
+                        e);
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index ba7069bd8..3b15d4891 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -21,6 +21,7 @@ package org.apache.paimon.tag;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -29,6 +30,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.time.LocalDateTime;
+import java.util.List;
 import java.util.Optional;
 import java.util.SortedMap;
 
@@ -45,6 +47,7 @@ public class TagAutoCreation {
     private final TagPeriodHandler periodHandler;
     private final Duration delay;
     private final Integer numRetainedMax;
+    private final List<TagCallback> callbacks;
 
     private LocalDateTime nextTag;
     private long nextSnapshot;
@@ -56,7 +59,8 @@ public class TagAutoCreation {
             TagTimeExtractor timeExtractor,
             TagPeriodHandler periodHandler,
             Duration delay,
-            Integer numRetainedMax) {
+            Integer numRetainedMax,
+            List<TagCallback> callbacks) {
         this.snapshotManager = snapshotManager;
         this.tagManager = tagManager;
         this.tagDeletion = tagDeletion;
@@ -64,6 +68,7 @@ public class TagAutoCreation {
         this.periodHandler = periodHandler;
         this.delay = delay;
         this.numRetainedMax = numRetainedMax;
+        this.callbacks = callbacks;
 
         this.periodHandler.validateDelay(delay);
 
@@ -127,7 +132,7 @@ public class TagAutoCreation {
                 || isAfterOrEqual(time.minus(delay), 
periodHandler.nextTagTime(nextTag))) {
             LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time);
             String tagName = periodHandler.timeToTag(thisTag);
-            tagManager.createTag(snapshot, tagName);
+            tagManager.createTag(snapshot, tagName, callbacks);
             nextTag = periodHandler.nextTagTime(thisTag);
 
             if (numRetainedMax != null) {
@@ -156,7 +161,8 @@ public class TagAutoCreation {
             CoreOptions options,
             SnapshotManager snapshotManager,
             TagManager tagManager,
-            TagDeletion tagDeletion) {
+            TagDeletion tagDeletion,
+            List<TagCallback> callbacks) {
         TagTimeExtractor extractor = 
TagTimeExtractor.createForAutoTag(options);
         if (extractor == null) {
             return null;
@@ -168,6 +174,7 @@ public class TagAutoCreation {
                 extractor,
                 TagPeriodHandler.create(options),
                 options.tagCreationDelay(),
-                options.tagNumRetainedMax());
+                options.tagNumRetainedMax(),
+                callbacks);
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index bd526ee2b..a83e201d5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +67,7 @@ public class TagManager {
     }
 
     /** Create a tag from given snapshot and save it in the storage. */
-    public void createTag(Snapshot snapshot, String tagName) {
+    public void createTag(Snapshot snapshot, String tagName, List<TagCallback> 
callbacks) {
         checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is 
blank.", tagName);
         checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", 
tagName);
         checkArgument(
@@ -85,6 +86,14 @@ public class TagManager {
                             tagName, newTagPath),
                     e);
         }
+
+        try {
+            callbacks.forEach(callback -> callback.notifyCreation(tagName));
+        } finally {
+            for (TagCallback tagCallback : callbacks) {
+                IOUtils.closeQuietly(tagCallback);
+            }
+        }
     }
 
     public void deleteTag(
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index fbf058f7f..6bd62a54b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -39,11 +39,13 @@ import org.apache.paimon.operation.FileStoreCommitImpl;
 import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.FileStoreRead;
 import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
@@ -113,7 +115,8 @@ public class TestFileStore extends KeyValueFileStore {
                 valueType,
                 keyValueFieldsExtractor,
                 mfFactory,
-                (new Path(root)).getName());
+                (new Path(root)).getName(),
+                new CatalogEnvironment(Lock.emptyFactory(), null, null));
         this.root = root;
         this.fileIO = FileIOFinder.find(new Path(root));
         this.keySerializer = new InternalRowSerializer(keyType);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index d150ca457..97708bfc2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -76,11 +76,13 @@ public class FileDeletionTest {
 
     private long commitIdentifier;
     private String root;
+    private TagManager tagManager;
 
     @BeforeEach
     public void setup() throws Exception {
         commitIdentifier = 0L;
         root = tempDir.toString();
+        tagManager = null;
     }
 
     /**
@@ -221,7 +223,7 @@ public class FileDeletionTest {
     @Test
     public void testExpireWithExistingTags() throws Exception {
         TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4);
-        TagManager tagManager = new TagManager(fileIO, store.options().path());
+        tagManager = new TagManager(fileIO, store.options().path());
         SnapshotManager snapshotManager = store.snapshotManager();
         TestKeyValueGenerator gen =
                 new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -237,7 +239,7 @@ public class FileDeletionTest {
 
         // step 2: commit -A (by clean bucket 0) and create tag1
         cleanBucket(store, gen.getPartition(gen.next()), 0);
-        tagManager.createTag(snapshotManager.snapshot(2), "tag1");
+        createTag(snapshotManager.snapshot(2), "tag1");
         assertThat(tagManager.tagExists("tag1")).isTrue();
 
         // step 3: commit C to bucket 2
@@ -248,7 +250,7 @@ public class FileDeletionTest {
 
         // step 4: commit -B (by clean bucket 1) and create tag2
         cleanBucket(store, partition, 1);
-        tagManager.createTag(snapshotManager.snapshot(4), "tag2");
+        createTag(snapshotManager.snapshot(4), "tag2");
         assertThat(tagManager.tagExists("tag2")).isTrue();
 
         // step 5: commit D to bucket 3
@@ -299,7 +301,7 @@ public class FileDeletionTest {
     @Test
     public void testExpireWithUpgradeAndTags() throws Exception {
         TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
-        TagManager tagManager = new TagManager(fileIO, store.options().path());
+        tagManager = new TagManager(fileIO, store.options().path());
         SnapshotManager snapshotManager = store.snapshotManager();
         TestKeyValueGenerator gen =
                 new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -328,7 +330,7 @@ public class FileDeletionTest {
         // snapshot 3: commit -A (by clean bucket 0)
         cleanBucket(store, gen.getPartition(gen.next()), 0);
 
-        tagManager.createTag(snapshotManager.snapshot(1), "tag1");
+        createTag(snapshotManager.snapshot(1), "tag1");
         store.newExpire(1, 1, Long.MAX_VALUE).expire();
 
         // check data file and manifests
@@ -353,7 +355,7 @@ public class FileDeletionTest {
     @Test
     public void testDeleteTagWithSnapshot() throws Exception {
         TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 3);
-        TagManager tagManager = new TagManager(fileIO, store.options().path());
+        tagManager = new TagManager(fileIO, store.options().path());
         SnapshotManager snapshotManager = store.snapshotManager();
         TestKeyValueGenerator gen =
                 new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -385,7 +387,7 @@ public class FileDeletionTest {
                 Arrays.asList(snapshot1.baseManifestList(), 
snapshot1.deltaManifestList());
 
         // create tag1
-        tagManager.createTag(snapshot1, "tag1");
+        createTag(snapshot1, "tag1");
 
         // expire snapshot 1, 2
         store.newExpire(1, 1, Long.MAX_VALUE).expire();
@@ -426,7 +428,7 @@ public class FileDeletionTest {
     @Test
     public void testDeleteTagWithOtherTag() throws Exception {
         TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 3);
-        TagManager tagManager = new TagManager(fileIO, store.options().path());
+        tagManager = new TagManager(fileIO, store.options().path());
         SnapshotManager snapshotManager = store.snapshotManager();
         TestKeyValueGenerator gen =
                 new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
@@ -459,9 +461,9 @@ public class FileDeletionTest {
                 Arrays.asList(snapshot2.baseManifestList(), 
snapshot2.deltaManifestList());
 
         // create tags
-        tagManager.createTag(snapshotManager.snapshot(1), "tag1");
-        tagManager.createTag(snapshotManager.snapshot(2), "tag2");
-        tagManager.createTag(snapshotManager.snapshot(4), "tag3");
+        createTag(snapshotManager.snapshot(1), "tag1");
+        createTag(snapshotManager.snapshot(2), "tag2");
+        createTag(snapshotManager.snapshot(4), "tag3");
 
         // expire snapshot 1, 2, 3, 4
         store.newExpire(1, 1, Long.MAX_VALUE).expire();
@@ -703,4 +705,8 @@ public class FileDeletionTest {
                         store.snapshotManager().latestSnapshot(),
                         null);
     }
+
+    private void createTag(Snapshot snapshot, String tagName) {
+        tagManager.createTag(snapshot, tagName, Collections.emptyList());
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
index ecaf5ae8e..a0794168f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java
@@ -97,7 +97,7 @@ public class UncleanedFileStoreExpireTest extends 
FileStoreExpireTestBase {
         // create tags for each snapshot
         for (int id = 1; id <= latestSnapshotId; id++) {
             Snapshot snapshot = snapshotManager.snapshot(id);
-            tagManager.createTag(snapshot, "tag" + id);
+            tagManager.createTag(snapshot, "tag" + id, 
Collections.emptyList());
         }
 
         // randomly expire snapshots
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
index 02c5b9606..da3425e9b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.utils.SerializableSupplier;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -71,13 +72,17 @@ public class AutoTagForSavepointCommitterOperator<CommitT, 
GlobalCommitT>
 
     private final SerializableSupplier<TagDeletion> tagDeletionFactory;
 
+    private final SerializableSupplier<List<TagCallback>> callbacksSupplier;
+
     private final NavigableSet<Long> identifiersForTags;
 
-    protected SnapshotManager snapshotManager;
+    private transient SnapshotManager snapshotManager;
+
+    private transient TagManager tagManager;
 
-    protected TagManager tagManager;
+    private transient TagDeletion tagDeletion;
 
-    protected TagDeletion tagDeletion;
+    private transient List<TagCallback> callbacks;
 
     private transient ListState<Long> identifiersForTagsState;
 
@@ -85,11 +90,13 @@ public class AutoTagForSavepointCommitterOperator<CommitT, 
GlobalCommitT>
             CommitterOperator<CommitT, GlobalCommitT> commitOperator,
             SerializableSupplier<SnapshotManager> snapshotManagerFactory,
             SerializableSupplier<TagManager> tagManagerFactory,
-            SerializableSupplier<TagDeletion> tagDeletionFactory) {
+            SerializableSupplier<TagDeletion> tagDeletionFactory,
+            SerializableSupplier<List<TagCallback>> callbacksSupplier) {
         this.commitOperator = commitOperator;
         this.tagManagerFactory = tagManagerFactory;
         this.snapshotManagerFactory = snapshotManagerFactory;
         this.tagDeletionFactory = tagDeletionFactory;
+        this.callbacksSupplier = callbacksSupplier;
         this.identifiersForTags = new TreeSet<>();
     }
 
@@ -102,6 +109,7 @@ public class AutoTagForSavepointCommitterOperator<CommitT, 
GlobalCommitT>
             snapshotManager = snapshotManagerFactory.get();
             tagManager = tagManagerFactory.get();
             tagDeletion = tagDeletionFactory.get();
+            callbacks = callbacksSupplier.get();
 
             identifiersForTagsState =
                     commitOperator
@@ -159,7 +167,7 @@ public class AutoTagForSavepointCommitterOperator<CommitT, 
GlobalCommitT>
         for (Snapshot snapshot : snapshotForTags) {
             String tagName = SAVEPOINT_TAG_PREFIX + 
snapshot.commitIdentifier();
             if (!tagManager.tagExists(tagName)) {
-                tagManager.createTag(snapshot, tagName);
+                tagManager.createTag(snapshot, tagName, callbacks);
             }
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
index dff75ff29..36ae32d15 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -117,7 +117,7 @@ public class BatchWriteGeneratorTagOperator<CommitT, 
GlobalCommitT>
                 tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
             }
             // Create a new tag
-            tagManager.createTag(snapshot, tagName);
+            tagManager.createTag(snapshot, tagName, 
table.store().createTagCallbacks());
             // Expire the tag
             expireTag();
         } catch (Exception e) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 480d41ae8..7d3cecdbf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -202,7 +202,8 @@ public abstract class FlinkSink<T> implements Serializable {
                             (CommitterOperator<Committable, 
ManifestCommittable>) committerOperator,
                             table::snapshotManager,
                             table::tagManager,
-                            () -> table.store().newTagDeletion());
+                            () -> table.store().newTagDeletion(),
+                            () -> table.store().createTagCallbacks());
         }
         if (conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.BATCH
                 && table.coreOptions().tagCreationMode() == 
TagCreationMode.BATCH) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
index 60839805f..880e052c7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java
@@ -206,7 +206,8 @@ public class AutoTagForSavepointCommitterOperatorTest 
extends CommitterOperatorT
                         super.createCommitterOperator(table, commitUser, 
committableStateManager),
                 table::snapshotManager,
                 table::tagManager,
-                () -> table.store().newTagDeletion());
+                () -> table.store().newTagDeletion(),
+                () -> table.store().createTagCallbacks());
     }
 
     @Override
@@ -221,6 +222,7 @@ public class AutoTagForSavepointCommitterOperatorTest 
extends CommitterOperatorT
                                 table, commitUser, committableStateManager, 
initializeFunction),
                 table::snapshotManager,
                 table::tagManager,
-                () -> table.store().newTagDeletion());
+                () -> table.store().newTagDeletion(),
+                () -> table.store().createTagCallbacks());
     }
 }


Reply via email to