This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 1c358ef30 [lake] pass table custom table property to lake writer
(#1521)
1c358ef30 is described below
commit 1c358ef30153420d899993bc08509c31652d39f9
Author: xx789 <[email protected]>
AuthorDate: Wed Aug 13 09:54:32 2025 +0800
[lake] pass table custom table property to lake writer (#1521)
---
.../com/alibaba/fluss/lake/writer/WriterInitContext.java | 9 +++++++++
.../fluss/flink/tiering/source/TieringSplitReader.java | 3 ++-
.../fluss/flink/tiering/source/TieringWriterInitContext.java | 12 +++++++++++-
.../alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java | 6 ++++++
4 files changed, 28 insertions(+), 2 deletions(-)
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java
b/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java
index 117b6dcdc..94db67e87 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/WriterInitContext.java
@@ -24,6 +24,8 @@ import com.alibaba.fluss.metadata.TablePath;
import javax.annotation.Nullable;
+import java.util.Map;
+
/**
* The WriterInitContext interface provides the context needed to create a
LakeWriter. It includes
* methods to obtain the table path, table bucket, and an optional partition.
@@ -61,4 +63,11 @@ public interface WriterInitContext {
* @return the table schema
*/
Schema schema();
+
+ /**
+ * Returns the table custom properties.
+ *
+ * @return the table custom properties
+ */
+ Map<String, String> customProperties();
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
index e3b0107a4..bf2cdae97 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
@@ -309,7 +309,8 @@ public class TieringSplitReader<WriteResult>
currentTablePath,
bucket,
partitionName,
- currentTable.getTableInfo().getSchema()));
+ currentTable.getTableInfo().getSchema(),
+
currentTable.getTableInfo().getCustomProperties().toMap()));
lakeWriters.put(bucket, lakeWriter);
}
return lakeWriter;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java
index 3a44a5f1d..adbe6d603 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringWriterInitContext.java
@@ -24,6 +24,8 @@ import com.alibaba.fluss.metadata.TablePath;
import javax.annotation.Nullable;
+import java.util.Map;
+
/** The implementation of {@link WriterInitContext}. */
public class TieringWriterInitContext implements WriterInitContext {
@@ -31,16 +33,19 @@ public class TieringWriterInitContext implements
WriterInitContext {
private final TableBucket tableBucket;
private final Schema schema;
@Nullable private final String partition;
+ private final Map<String, String> customProperties;
public TieringWriterInitContext(
TablePath tablePath,
TableBucket tableBucket,
@Nullable String partition,
- Schema schema) {
+ Schema schema,
+ Map<String, String> customProperties) {
this.tablePath = tablePath;
this.tableBucket = tableBucket;
this.partition = partition;
this.schema = schema;
+ this.customProperties = customProperties;
}
@Override
@@ -63,4 +68,9 @@ public class TieringWriterInitContext implements
WriterInitContext {
public Schema schema() {
return schema;
}
+
+ @Override
+ public Map<String, String> customProperties() {
+ return customProperties;
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 93fd23a2f..b98f8aceb 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -716,6 +716,12 @@ class PaimonTieringTest {
return partition;
}
+ @Override
+ public Map<String, String> customProperties() {
+ // don't care about table custom properties for Paimon
lake writer
+ return new HashMap<>();
+ }
+
@Override
public com.alibaba.fluss.metadata.Schema schema() {
throw new UnsupportedOperationException(