lintingbin opened a new issue, #15225:
URL: https://github.com/apache/iceberg/issues/15225

   ### Apache Iceberg version
   
   main (unreleased)
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   #### Problem Description
   
   When using `DynamicIcebergSink` without explicitly specifying a compression 
codec, the sink defaults to `gzip` even if the Iceberg table has 
`write.parquet.compression-codec` set to `zstd` (or any other codec).
   
   The table-level compression setting is being overridden by the default value 
from `FlinkWriteConf`.
   
   #### Root Cause
   
   In `DynamicIcebergSink.Builder.build()` 
([code](https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java#L370-L382)):
   
   ```java
   private DynamicIcebergSink build() {
     // ...
     FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, 
readableConfig);
     Map<String, String> writeProperties =
         SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), 
flinkWriteConf, null);  // table is null
     // ...
   }
   ```
   
   1. `FlinkWriteConf` is created without a table reference, so 
`tableProperties` in `FlinkConfParser` is an empty map
   2. `SinkUtil.writeProperties()` is called with `null` as the table parameter
   3. `FlinkWriteConf.parquetCompressionCodec()` cannot read table properties, 
so it returns the default value (`gzip`)
   4. In `DynamicWriter`, `commonWriteProperties.putAll()` overrides the 
table's compression setting:
   
   ```java
   Map<String, String> tableWriteProperties = 
Maps.newHashMap(table.properties());
   tableWriteProperties.putAll(commonWriteProperties);  // default gzip 
overrides table's zstd
   ```
   
   #### Expected Behavior
   
   If the user doesn't explicitly set a compression codec through the sink 
builder, the table-level `write.parquet.compression-codec` setting should be 
respected.
   
   #### Related Discussion
   
   This issue was discussed in the comments of PR #13609 
(https://github.com/apache/iceberg/pull/13609#issuecomment-3138811026), where 
@b-rick reported the same problem. @mxm suggested moving configuration code 
into the runtime path (e.g., into the writer) where the write properties can be 
correctly resolved based on user-provided properties, Flink config, table 
properties, and Iceberg-level defaults.
   
   #### Suggested Solution
   
   One possible fix is to modify `SinkUtil.writeProperties()` or create a new 
method that only sets compression properties when they are explicitly 
configured by the user (not when they are just default values). This way, 
table-level settings won't be overridden by defaults.
   
   Alternatively, as @mxm suggested, the configuration resolution logic could 
be moved to runtime in `DynamicWriter`, where the actual table is available and 
`FlinkWriteConf(table, writeOptions, readableConfig)` can be used to properly 
resolve the configuration hierarchy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to