lintingbin opened a new issue, #15225: URL: https://github.com/apache/iceberg/issues/15225
### Apache Iceberg version main (unreleased) ### Query engine Flink ### Please describe the bug 🐞 #### Problem Description When using `DynamicIcebergSink` without explicitly specifying a compression codec, the sink defaults to `gzip` even if the Iceberg table has `write.parquet.compression-codec` set to `zstd` (or any other codec). The table-level compression setting is being overridden by the default value from `FlinkWriteConf`. #### Root Cause In `DynamicIcebergSink.Builder.build()` ([code](https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java#L370-L382)): ```java private DynamicIcebergSink build() { // ... FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig); Map<String, String> writeProperties = SinkUtil.writeProperties(flinkWriteConf.dataFileFormat(), flinkWriteConf, null); // table is null // ... } ``` 1. `FlinkWriteConf` is created without a table reference, so `tableProperties` in `FlinkConfParser` is an empty map 2. `SinkUtil.writeProperties()` is called with `null` as the table parameter 3. `FlinkWriteConf.parquetCompressionCodec()` cannot read table properties, so it returns the default value (`gzip`) 4. In `DynamicWriter`, `commonWriteProperties.putAll()` overrides the table's compression setting: ```java Map<String, String> tableWriteProperties = Maps.newHashMap(table.properties()); tableWriteProperties.putAll(commonWriteProperties); // default gzip overrides table's zstd ``` #### Expected Behavior If the user doesn't explicitly set a compression codec through the sink builder, the table-level `write.parquet.compression-codec` setting should be respected. #### Related Discussion This issue was discussed in the comments of PR #13609 (https://github.com/apache/iceberg/pull/13609#issuecomment-3138811026), where @b-rick reported the same problem. @mxm suggested moving configuration code into the runtime path (e.g., into the writer) where the write properties can be correctly resolved based on user-provided properties, Flink config, table properties, and Iceberg-level defaults. #### Suggested Solution One possible fix is to modify `SinkUtil.writeProperties()` or create a new method that only sets compression properties when they are explicitly configured by the user (not when they are just default values). This way, table-level settings won't be overridden by defaults. Alternatively, as @mxm suggested, the configuration resolution logic could be moved to runtime in `DynamicWriter`, where the actual table is available and `FlinkWriteConf(table, writeOptions, readableConfig)` can be used to properly resolve the configuration hierarchy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
