This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9844638c4 feat: make FanoutWriter writer configurable (#1962)
9844638c4 is described below

commit 9844638c48e2d4b123c9df4f89330cfd1e8bfcba
Author: Alan Tang <[email protected]>
AuthorDate: Tue Jan 6 10:43:11 2026 +0800

    feat: make FanoutWriter writer configurable (#1962)
    
    ## Which issue does this PR close?
    
    - Closes #1834.
    
    ## What changes are included in this PR?
    
    
    - Fellow on #1872.
    
    ## Are these changes tested?
    
    ---------
    
    Signed-off-by: StandingMan <[email protected]>
---
 crates/iceberg/src/spec/table_properties.rs        | 12 +++++++++++
 .../datafusion/src/physical_plan/write.rs          | 24 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/crates/iceberg/src/spec/table_properties.rs 
b/crates/iceberg/src/spec/table_properties.rs
index 497545601..413604f51 100644
--- a/crates/iceberg/src/spec/table_properties.rs
+++ b/crates/iceberg/src/spec/table_properties.rs
@@ -49,6 +49,8 @@ pub struct TableProperties {
     pub write_format_default: String,
     /// The target file size for files.
     pub write_target_file_size_bytes: usize,
+    /// Whether to use `FanoutWriter` for partitioned tables.
+    pub write_datafusion_fanout_enabled: bool,
 }
 
 impl TableProperties {
@@ -137,6 +139,11 @@ impl TableProperties {
     pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = 
"write.target-file-size-bytes";
     /// Default target file size
     pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 
1024 * 1024; // 512 MB
+    /// Whether to use `FanoutWriter` for partitioned tables (handles unsorted 
data).
+    /// If false, uses `ClusteredWriter` (requires sorted data, more memory 
efficient).
+    pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = 
"write.datafusion.fanout.enabled";
+    /// Default value for fanout writer enabled
+    pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
 }
 
 impl TryFrom<&HashMap<String, String>> for TableProperties {
@@ -175,6 +182,11 @@ impl TryFrom<&HashMap<String, String>> for TableProperties 
{
                 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
                 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
             )?,
+            write_datafusion_fanout_enabled: parse_property(
+                props,
+                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
+                
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
+            )?,
         })
     }
 }
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs 
b/crates/integrations/datafusion/src/physical_plan/write.rs
index 9eb53c235..fdfddf877 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -266,8 +266,28 @@ impl ExecutionPlan for IcebergWriteExec {
         let data_file_writer_builder = 
DataFileWriterBuilder::new(rolling_writer_builder);
 
         // Create TaskWriter
-        // TODO: Make fanout_enabled configurable via table properties
-        let fanout_enabled = true;
+        let fanout_enabled = self
+            .table
+            .metadata()
+            .properties()
+            .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
+            .map(|value| {
+                value
+                    .parse::<bool>()
+                    .map_err(|e| {
+                        Error::new(
+                            ErrorKind::DataInvalid,
+                            format!(
+                                "Invalid value for {}, expected 'true' or 
'false'",
+                                
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
+                            ),
+                        )
+                        .with_source(e)
+                    })
+                    .map_err(to_datafusion_error)
+            })
+            .transpose()?
+            
.unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
         let schema = self.table.metadata().current_schema().clone();
         let partition_spec = 
self.table.metadata().default_partition_spec().clone();
         let task_writer = TaskWriter::try_new(

Reply via email to