This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c358ef30 [lake] pass table custom table property to lake writer 
(#1521)
1c358ef30 is described below

commit 1c358ef30153420d899993bc08509c31652d39f9
Author: xx789 <[email protected]>
AuthorDate: Wed Aug 13 09:54:32 2025 +0800

    [lake] pass table custom table property to lake writer (#1521)
---
 .../com/alibaba/fluss/lake/writer/WriterInitContext.java     |  9 +++++++++
 .../fluss/flink/tiering/source/TieringSplitReader.java       |  3 ++-
 .../fluss/flink/tiering/source/TieringWriterInitContext.java | 12 +++++++++++-
 .../alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java |  6 ++++++
 4 files changed, 28 insertions(+), 2 deletions(-)

diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java
index 117b6dcdc..94db67e87 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java
@@ -24,6 +24,8 @@ import com.alibaba.fluss.metadata.TablePath;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
+
 /**
  * The WriterInitContext interface provides the context needed to create a 
LakeWriter. It includes
  * methods to obtain the table path, table bucket, and an optional partition.
@@ -61,4 +63,11 @@ public interface WriterInitContext {
      * @return the table schema
      */
     Schema schema();
+
+    /**
+     * Returns the table custom properties.
+     *
+     * @return the table custom properties
+     */
+    Map<String, String> customProperties();
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
index e3b0107a4..bf2cdae97 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
@@ -309,7 +309,8 @@ public class TieringSplitReader<WriteResult>
                                     currentTablePath,
                                     bucket,
                                     partitionName,
-                                    currentTable.getTableInfo().getSchema()));
+                                    currentTable.getTableInfo().getSchema(),
+                                    
currentTable.getTableInfo().getCustomProperties().toMap()));
             lakeWriters.put(bucket, lakeWriter);
         }
         return lakeWriter;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java
index 3a44a5f1d..adbe6d603 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java
@@ -24,6 +24,8 @@ import com.alibaba.fluss.metadata.TablePath;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
+
 /** The implementation of {@link WriterInitContext}. */
 public class TieringWriterInitContext implements WriterInitContext {
 
@@ -31,16 +33,19 @@ public class TieringWriterInitContext implements 
WriterInitContext {
     private final TableBucket tableBucket;
     private final Schema schema;
     @Nullable private final String partition;
+    private final Map<String, String> customProperties;
 
     public TieringWriterInitContext(
             TablePath tablePath,
             TableBucket tableBucket,
             @Nullable String partition,
-            Schema schema) {
+            Schema schema,
+            Map<String, String> customProperties) {
         this.tablePath = tablePath;
         this.tableBucket = tableBucket;
         this.partition = partition;
         this.schema = schema;
+        this.customProperties = customProperties;
     }
 
     @Override
@@ -63,4 +68,9 @@ public class TieringWriterInitContext implements 
WriterInitContext {
     public Schema schema() {
         return schema;
     }
+
+    @Override
+    public Map<String, String> customProperties() {
+        return customProperties;
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 93fd23a2f..b98f8aceb 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -716,6 +716,12 @@ class PaimonTieringTest {
                         return partition;
                     }
 
+                    @Override
+                    public Map<String, String> customProperties() {
+                        // don't care about table custom properties for Paimon 
lake writer
+                        return new HashMap<>();
+                    }
+
                     @Override
                     public com.alibaba.fluss.metadata.Schema schema() {
                         throw new UnsupportedOperationException(

Reply via email to