This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ff6cf54ed Flink: Support write options in FlinkSink builder (#3998)
ff6cf54ed is described below

commit ff6cf54eda99c741bd810d0addddba37f76e05b1
Author: liliwei <[email protected]>
AuthorDate: Mon Jul 4 04:09:43 2022 +0800

    Flink: Support write options in FlinkSink builder (#3998)
---
 docs/flink-getting-started.md                      |  21 +++
 .../org/apache/iceberg/flink/FlinkConfParser.java  | 201 +++++++++++++++++++++
 .../org/apache/iceberg/flink/FlinkWriteConf.java   | 105 +++++++++++
 .../apache/iceberg/flink/FlinkWriteOptions.java    |  57 ++++++
 .../org/apache/iceberg/flink/sink/FlinkSink.java   | 100 +++++-----
 .../flink/sink/RowDataTaskWriterFactory.java       |   4 +-
 .../iceberg/flink/sink/TestFlinkIcebergSink.java   |  51 ++++++
 .../flink/sink/TestIcebergStreamWriter.java        |  10 +-
 8 files changed, 488 insertions(+), 61 deletions(-)

diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index b3af43a82..c34dfe857 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -551,6 +551,27 @@ FlinkSink.forRowData(input)
 env.execute("Test Iceberg DataStream");
 ```
 
+## Write options
+
+Flink write options are passed when configuring the FlinkSink, like this:
+
+```
+FlinkSink.Builder builder = FlinkSink.forRow(dataStream, 
SimpleDataUtil.FLINK_SCHEMA)
+    .table(table)
+    .tableLoader(tableLoader)
+    .set("write-format", "orc")
+    .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
+```
+
+| Flink option           | Default                    | Description            
                                                                                
    |
+|------------------------| -------------------------- 
|------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default | File format to use for 
this write operation; parquet, avro, or orc                                     
    |
+| target-file-size-bytes | As per table property      | Overrides this table's 
write.target-file-size-bytes                                                    
    |
+| upsert-enabled         | Table write.upsert.enabled | Overrides this table's 
write.upsert.enabled                                                            
    |
+| overwrite-enabled      | false | Overwrite the table's data, overwrite mode 
shouldn't be enable when configuring to use UPSERT data stream. |
+| distribution-mode      | Table write.distribution-mode | Overrides this 
table's write.distribution-mode                                                 
            |
+
+
 ## Inspecting tables.
 
 Iceberg does not support inspecting table in flink sql now, we need to use 
[iceberg's Java API](../api) to read iceberg's meta data to get those table 
information.
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
new file mode 100644
index 000000000..e984f6875
--- /dev/null
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class FlinkConfParser {
+
+  private final Map<String, String> tableProperties;
+  private final Map<String, String> options;
+  private final ReadableConfig readableConfig;
+
+  FlinkConfParser(Table table, Map<String, String> options, ReadableConfig 
readableConfig) {
+    this.tableProperties = table.properties();
+    this.options = options;
+    this.readableConfig = readableConfig;
+  }
+
+  public BooleanConfParser booleanConf() {
+    return new BooleanConfParser();
+  }
+
+  public IntConfParser intConf() {
+    return new IntConfParser();
+  }
+
+  public LongConfParser longConf() {
+    return new LongConfParser();
+  }
+
+  public StringConfParser stringConf() {
+    return new StringConfParser();
+  }
+
+  class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
+    private Boolean defaultValue;
+
+    @Override
+    protected BooleanConfParser self() {
+      return this;
+    }
+
+    public BooleanConfParser defaultValue(boolean value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public BooleanConfParser defaultValue(String value) {
+      this.defaultValue = Boolean.parseBoolean(value);
+      return self();
+    }
+
+    public boolean parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot 
be null");
+      return parse(Boolean::parseBoolean, defaultValue);
+    }
+  }
+
+  class IntConfParser extends ConfParser<IntConfParser, Integer> {
+    private Integer defaultValue;
+
+    @Override
+    protected IntConfParser self() {
+      return this;
+    }
+
+    public IntConfParser defaultValue(int value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public int parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot 
be null");
+      return parse(Integer::parseInt, defaultValue);
+    }
+
+    public Integer parseOptional() {
+      return parse(Integer::parseInt, null);
+    }
+  }
+
+  class LongConfParser extends ConfParser<LongConfParser, Long> {
+    private Long defaultValue;
+
+    @Override
+    protected LongConfParser self() {
+      return this;
+    }
+
+    public LongConfParser defaultValue(long value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public long parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot 
be null");
+      return parse(Long::parseLong, defaultValue);
+    }
+
+    public Long parseOptional() {
+      return parse(Long::parseLong, null);
+    }
+  }
+
+  class StringConfParser extends ConfParser<StringConfParser, String> {
+    private String defaultValue;
+
+    @Override
+    protected StringConfParser self() {
+      return this;
+    }
+
+    public StringConfParser defaultValue(String value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public String parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot 
be null");
+      return parse(Function.identity(), defaultValue);
+    }
+
+    public String parseOptional() {
+      return parse(Function.identity(), null);
+    }
+  }
+
+  abstract class ConfParser<ThisT, T> {
+    private final List<String> optionNames = Lists.newArrayList();
+    private String tablePropertyName;
+    private ConfigOption<T> configOption;
+
+    protected abstract ThisT self();
+
+    public ThisT option(String name) {
+      this.optionNames.add(name);
+      return self();
+    }
+
+    public ThisT flinkConfig(ConfigOption<T> newConfigOption) {
+      this.configOption = newConfigOption;
+      return self();
+    }
+
+    public ThisT tableProperty(String name) {
+      this.tablePropertyName = name;
+      return self();
+    }
+
+    protected T parse(Function<String, T> conversion, T defaultValue) {
+      if (!optionNames.isEmpty()) {
+        for (String optionName : optionNames) {
+          String optionValue = options.get(optionName);
+          if (optionValue != null) {
+            return conversion.apply(optionValue);
+          }
+        }
+      }
+
+      if (configOption != null) {
+        T propertyValue = readableConfig.get(configOption);
+        if (propertyValue != null) {
+          return propertyValue;
+        }
+      }
+
+      if (tablePropertyName != null) {
+        String propertyValue = tableProperties.get(tablePropertyName);
+        if (propertyValue != null) {
+          return conversion.apply(propertyValue);
+        }
+      }
+
+      return defaultValue;
+    }
+  }
+}
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
new file mode 100644
index 000000000..ddb5f18c5
--- /dev/null
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+
+/**
+ * A class for common Iceberg configs for Flink writes.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is 
used (top to bottom):
+ * <ol>
+ *   <li>Write options</li>
+ *   <li>flink ReadableConfig</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in write options and takes precedence over 
all other configs.
+ * If no write option is provided, this class checks the flink configuration 
for any overrides.
+ * If no applicable value is found in the write options, this class uses the 
table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized.
+ */
+public class FlinkWriteConf {
+
+  private final FlinkConfParser confParser;
+
+  public FlinkWriteConf(Table table, Map<String, String> writeOptions, 
ReadableConfig readableConfig) {
+    this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
+  }
+
+  public boolean overwriteMode() {
+    return confParser.booleanConf()
+        .option(FlinkWriteOptions.OVERWRITE_MODE.key())
+        .flinkConfig(FlinkWriteOptions.OVERWRITE_MODE)
+        .defaultValue(FlinkWriteOptions.OVERWRITE_MODE.defaultValue())
+        .parse();
+  }
+
+  public boolean upsertMode() {
+    return confParser.booleanConf()
+        .option(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key())
+        .flinkConfig(FlinkWriteOptions.WRITE_UPSERT_ENABLED)
+        .tableProperty(TableProperties.UPSERT_ENABLED)
+        .defaultValue(TableProperties.UPSERT_ENABLED_DEFAULT)
+        .parse();
+  }
+
+  public FileFormat dataFileFormat() {
+    String valueAsString = confParser.stringConf()
+        .option(FlinkWriteOptions.WRITE_FORMAT.key())
+        .flinkConfig(FlinkWriteOptions.WRITE_FORMAT)
+        .tableProperty(TableProperties.DEFAULT_FILE_FORMAT)
+        .defaultValue(TableProperties.DEFAULT_FILE_FORMAT_DEFAULT)
+        .parse();
+    return FileFormat.valueOf(valueAsString.toUpperCase(Locale.ENGLISH));
+  }
+
+  public long targetDataFileSize() {
+    return confParser.longConf()
+        .option(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES.key())
+        .flinkConfig(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES)
+        .tableProperty(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES)
+        .defaultValue(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT)
+        .parse();
+  }
+
+  public DistributionMode distributionMode() {
+    String modeName = confParser.stringConf()
+        .option(FlinkWriteOptions.DISTRIBUTION_MODE.key())
+        .flinkConfig(FlinkWriteOptions.DISTRIBUTION_MODE)
+        .tableProperty(TableProperties.WRITE_DISTRIBUTION_MODE)
+        .defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_NONE)
+        .parse();
+    return DistributionMode.fromName(modeName);
+  }
+
+  public int workerPoolSize() {
+    return confParser.intConf()
+        .flinkConfig(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE)
+        
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
+        .parse();
+  }
+}
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
new file mode 100644
index 000000000..d0dc9c7fd
--- /dev/null
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Flink sink write options
+ */
+public class FlinkWriteOptions {
+
+  private FlinkWriteOptions() {
+  }
+
+  // File format for write operations(default: Table write.format.default )
+  public static final ConfigOption<String> WRITE_FORMAT =
+      ConfigOptions.key("write-format")
+          .stringType().noDefaultValue();
+
+  // Overrides this table's write.target-file-size-bytes
+  public static final ConfigOption<Long> TARGET_FILE_SIZE_BYTES =
+      ConfigOptions.key("target-file-size-bytes")
+          .longType().noDefaultValue();
+
+  // Overrides this table's write.upsert.enabled
+  public static final ConfigOption<Boolean> WRITE_UPSERT_ENABLED =
+      ConfigOptions.key("upsert-enabled")
+          .booleanType().noDefaultValue();
+
+  public static final ConfigOption<Boolean> OVERWRITE_MODE =
+      ConfigOptions.key("overwrite-enabled")
+          .booleanType().defaultValue(false);
+
+  // Overrides the table's write.distribution-mode
+  public static final ConfigOption<String> DISTRIBUTION_MODE =
+      ConfigOptions.key("distribution-mode")
+          .stringType().noDefaultValue();
+
+}
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 72f0580cf..56fafab06 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.flink.sink;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -43,14 +42,14 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteConf;
+import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.WriteResult;
@@ -60,18 +59,10 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
-import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
 
 public class FlinkSink {
   private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
@@ -132,14 +123,13 @@ public class FlinkSink {
     private TableLoader tableLoader;
     private Table table;
     private TableSchema tableSchema;
-    private boolean overwrite = false;
-    private DistributionMode distributionMode = null;
     private Integer writeParallelism = null;
-    private boolean upsert = false;
     private List<String> equalityFieldColumns = null;
     private String uidPrefix = null;
     private final Map<String, String> snapshotProperties = Maps.newHashMap();
     private ReadableConfig readableConfig = new Configuration();
+    private final Map<String, String> writeOptions = Maps.newHashMap();
+    private FlinkWriteConf flinkWriteConf = null;
 
     private Builder() {
     }
@@ -191,13 +181,31 @@ public class FlinkSink {
       return this;
     }
 
+    /**
+     * Set the write properties for Flink sink.
+     * View the supported properties in {@link FlinkWriteOptions}
+     */
+    public Builder set(String property, String value) {
+      writeOptions.put(property, value);
+      return this;
+    }
+
+    /**
+     * Set the write properties for Flink sink.
+     * View the supported properties in {@link FlinkWriteOptions}
+     */
+    public Builder setAll(Map<String, String> properties) {
+      writeOptions.putAll(properties);
+      return this;
+    }
+
     public Builder tableSchema(TableSchema newTableSchema) {
       this.tableSchema = newTableSchema;
       return this;
     }
 
     public Builder overwrite(boolean newOverwrite) {
-      this.overwrite = newOverwrite;
+      writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), 
Boolean.toString(newOverwrite));
       return this;
     }
 
@@ -216,7 +224,9 @@ public class FlinkSink {
     public Builder distributionMode(DistributionMode mode) {
       Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode),
           "Flink does not support 'range' write distribution mode now.");
-      this.distributionMode = mode;
+      if (mode != null) {
+        writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), 
mode.modeName());
+      }
       return this;
     }
 
@@ -241,7 +251,7 @@ public class FlinkSink {
      * @return {@link Builder} to connect the iceberg table.
      */
     public Builder upsert(boolean enabled) {
-      this.upsert = enabled;
+      writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), 
Boolean.toString(enabled));
       return this;
     }
 
@@ -308,6 +318,8 @@ public class FlinkSink {
         }
       }
 
+      flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);
+
       // Find out the equality field id list based on the user-provided 
equality field column names.
       List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
 
@@ -316,7 +328,7 @@ public class FlinkSink {
 
       // Distribute the records from input data stream based on the 
write.distribution-mode and equality fields.
       DataStream<RowData> distributeStream = distributeDataStream(
-          rowDataInput, table.properties(), equalityFieldIds, table.spec(), 
table.schema(), flinkRowType);
+          rowDataInput, equalityFieldIds, table.spec(), table.schema(), 
flinkRowType);
 
       // Add parallel writers that append rows to files
       SingleOutputStreamOperator<WriteResult> writerStream = 
appendWriter(distributeStream, flinkRowType,
@@ -379,8 +391,8 @@ public class FlinkSink {
 
     private SingleOutputStreamOperator<Void> 
appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
       IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(
-          tableLoader, overwrite, snapshotProperties,
-          
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
+          tableLoader, flinkWriteConf.overwriteMode(), snapshotProperties,
+          flinkWriteConf.workerPoolSize());
       SingleOutputStreamOperator<Void> committerStream = writerStream
           .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, 
filesCommitter)
           .setParallelism(1)
@@ -393,14 +405,9 @@ public class FlinkSink {
 
     private SingleOutputStreamOperator<WriteResult> 
appendWriter(DataStream<RowData> input, RowType flinkRowType,
                                                                  List<Integer> 
equalityFieldIds) {
-
-      // Fallback to use upsert mode parsed from table properties if don't 
specify in job level.
-      boolean upsertMode = upsert || 
PropertyUtil.propertyAsBoolean(table.properties(),
-          UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
-
       // Validate the equality fields and partition fields if we enable the 
upsert mode.
-      if (upsertMode) {
-        Preconditions.checkState(!overwrite,
+      if (flinkWriteConf.upsertMode()) {
+        Preconditions.checkState(!flinkWriteConf.overwriteMode(),
             "OVERWRITE mode shouldn't be enable when configuring to use UPSERT 
data stream.");
         Preconditions.checkState(!equalityFieldIds.isEmpty(),
             "Equality field columns shouldn't be empty when configuring to use 
UPSERT data stream.");
@@ -413,7 +420,8 @@ public class FlinkSink {
         }
       }
 
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, 
flinkRowType, equalityFieldIds, upsertMode);
+      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, 
flinkWriteConf,
+          flinkRowType, equalityFieldIds);
 
       int parallelism = writeParallelism == null ? input.getParallelism() : 
writeParallelism;
       SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -426,22 +434,11 @@ public class FlinkSink {
     }
 
     private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
-                                                     Map<String, String> 
properties,
                                                      List<Integer> 
equalityFieldIds,
                                                      PartitionSpec 
partitionSpec,
                                                      Schema iSchema,
                                                      RowType flinkRowType) {
-      DistributionMode writeMode;
-      if (distributionMode == null) {
-        // Fallback to use distribution mode parsed from table properties if 
don't specify in job level.
-        String modeName = PropertyUtil.propertyAsString(properties,
-            WRITE_DISTRIBUTION_MODE,
-            WRITE_DISTRIBUTION_MODE_NONE);
-
-        writeMode = DistributionMode.fromName(modeName);
-      } else {
-        writeMode = distributionMode;
-      }
+      DistributionMode writeMode = flinkWriteConf.distributionMode();
 
       LOG.info("Write distribution mode is '{}'", writeMode.modeName());
       switch (writeMode) {
@@ -511,30 +508,15 @@ public class FlinkSink {
   }
 
   static IcebergStreamWriter<RowData> createStreamWriter(Table table,
+                                                         FlinkWriteConf 
flinkWriteConf,
                                                          RowType flinkRowType,
-                                                         List<Integer> 
equalityFieldIds,
-                                                         boolean upsert) {
+                                                         List<Integer> 
equalityFieldIds) {
     Preconditions.checkArgument(table != null, "Iceberg table should't be 
null");
-    Map<String, String> props = table.properties();
-    long targetFileSize = getTargetFileSizeBytes(props);
-    FileFormat fileFormat = getFileFormat(props);
 
     Table serializableTable = SerializableTable.copyOf(table);
     TaskWriterFactory<RowData> taskWriterFactory = new 
RowDataTaskWriterFactory(
-        serializableTable, flinkRowType, targetFileSize,
-        fileFormat, equalityFieldIds, upsert);
-
+        serializableTable, flinkRowType, flinkWriteConf.targetDataFileSize(),
+        flinkWriteConf.dataFileFormat(), equalityFieldIds, 
flinkWriteConf.upsertMode());
     return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
   }
-
-  private static FileFormat getFileFormat(Map<String, String> properties) {
-    String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, 
DEFAULT_FILE_FORMAT_DEFAULT);
-    return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
-  }
-
-  private static long getTargetFileSizeBytes(Map<String, String> properties) {
-    return PropertyUtil.propertyAsLong(properties,
-        WRITE_TARGET_FILE_SIZE_BYTES,
-        WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-  }
 }
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 514430502..f6ee976e6 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -85,7 +85,9 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
 
   @Override
   public void initialize(int taskId, int attemptId) {
-    this.outputFileFactory = OutputFileFactory.builderFor(table, taskId, 
attemptId).build();
+    this.outputFileFactory = OutputFileFactory.builderFor(table, taskId, 
attemptId)
+        .format(format)
+        .build();
   }
 
   @Override
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 70f87f270..263c2fc03 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
@@ -47,6 +48,7 @@ import org.apache.iceberg.flink.source.BoundedTestSource;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -325,4 +327,53 @@ public class TestFlinkIcebergSink {
     Assert.assertEquals("rightTable", 
rightTable.currentSnapshot().summary().get("direction"));
   }
 
+  @Test
+  public void testOverrideWriteConfigWithUnknownDistributionMode() {
+    Map<String, String> newProps = Maps.newHashMap();
+    newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
+
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
+
+    FlinkSink.Builder builder = FlinkSink.forRow(dataStream, 
SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .setAll(newProps);
+
+    AssertHelpers.assertThrows("Should fail with invalid distribution mode.",
+        IllegalArgumentException.class, "No enum constant 
org.apache.iceberg.DistributionMode.UNRECOGNIZED",
+        () -> {
+          builder.append();
+
+          // Execute the program.
+          env.execute("Test Iceberg DataStream.");
+          return null;
+        });
+  }
+
+  @Test
+  public void testOverrideWriteConfigWithUnknownFileFormat() {
+    Map<String, String> newProps = Maps.newHashMap();
+    newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
+
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
+
+    FlinkSink.Builder builder = FlinkSink.forRow(dataStream, 
SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .writeParallelism(parallelism)
+        .setAll(newProps);
+
+    AssertHelpers.assertThrows("Should fail with invalid file format.",
+        IllegalArgumentException.class, "No enum constant 
org.apache.iceberg.FileFormat.UNRECOGNIZED",
+        () -> {
+          builder.append();
+
+          // Execute the program.
+          env.execute("Test Iceberg DataStream.");
+          return null;
+        });
+  }
 }
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index 433c6a1ec..ae2171238 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -47,12 +47,14 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
@@ -333,7 +335,13 @@ public class TestIcebergStreamWriter {
   private OneInputStreamOperatorTestHarness<RowData, WriteResult> 
createIcebergStreamWriter(
       Table icebergTable, TableSchema flinkSchema) throws Exception {
     RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), 
flinkSchema);
-    IcebergStreamWriter<RowData> streamWriter = 
FlinkSink.createStreamWriter(icebergTable, flinkRowType, null, false);
+    FlinkWriteConf flinkWriteConfig =
+        new FlinkWriteConf(icebergTable, Maps.newHashMap(), new 
org.apache.flink.configuration.Configuration());
+
+    IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(
+        icebergTable,
+        flinkWriteConfig,
+        flinkRowType, null);
     OneInputStreamOperatorTestHarness<RowData, WriteResult> harness = new 
OneInputStreamOperatorTestHarness<>(
         streamWriter, 1, 1, 0);
 

Reply via email to