This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new af7be8440f [Core] Custom commit callback support to be initialized
with table (#6004)
af7be8440f is described below
commit af7be8440f504edac8ec097f926fe0b88af8301b
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 1 12:27:36 2025 +0800
[Core] Custom commit callback support to be initialized with table (#6004)
---
.../src/main/java/org/apache/paimon/AbstractFileStore.java | 2 +-
.../main/java/org/apache/paimon/table/sink/CallbackUtils.java | 9 +++++++--
.../main/java/org/apache/paimon/table/sink/CommitCallback.java | 3 +++
3 files changed, 11 insertions(+), 3 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index d998b290dd..11f67a2c10 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -357,7 +357,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
private List<CommitCallback> createCommitCallbacks(String commitUser,
FileStoreTable table) {
List<CommitCallback> callbacks =
- new ArrayList<>(CallbackUtils.loadCommitCallbacks(options));
+ new ArrayList<>(CallbackUtils.loadCommitCallbacks(options,
table));
if (options.partitionedTableInMetastore() &&
!schema.partitionKeys().isEmpty()) {
PartitionHandler partitionHandler =
catalogEnvironment.partitionHandler();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
index 7d8a0a849c..bec038718c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import java.util.ArrayList;
@@ -32,8 +33,12 @@ public class CallbackUtils {
return loadCallbacks(coreOptions.tagCallbacks(), TagCallback.class);
}
- public static List<CommitCallback> loadCommitCallbacks(CoreOptions
coreOptions) {
- return loadCallbacks(coreOptions.commitCallbacks(),
CommitCallback.class);
+ public static List<CommitCallback> loadCommitCallbacks(
+ CoreOptions coreOptions, FileStoreTable table) {
+ List<CommitCallback> commitCallbacks =
+ loadCallbacks(coreOptions.commitCallbacks(),
CommitCallback.class);
+ commitCallbacks.forEach(callback -> callback.setTable(table));
+ return commitCallbacks;
}
@SuppressWarnings("unchecked")
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
index 40e615198e..22e3ce753e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
@@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
import java.util.List;
@@ -44,4 +45,6 @@ public interface CommitCallback extends AutoCloseable {
Snapshot snapshot);
void retry(ManifestCommittable committable);
+
+ default void setTable(FileStoreTable table) {}
}