This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ff6cf54ed Flink: Support write options in FlinkSink builder (#3998)
ff6cf54ed is described below
commit ff6cf54eda99c741bd810d0addddba37f76e05b1
Author: liliwei <[email protected]>
AuthorDate: Mon Jul 4 04:09:43 2022 +0800
Flink: Support write options in FlinkSink builder (#3998)
---
docs/flink-getting-started.md | 21 +++
.../org/apache/iceberg/flink/FlinkConfParser.java | 201 +++++++++++++++++++++
.../org/apache/iceberg/flink/FlinkWriteConf.java | 105 +++++++++++
.../apache/iceberg/flink/FlinkWriteOptions.java | 57 ++++++
.../org/apache/iceberg/flink/sink/FlinkSink.java | 100 +++++-----
.../flink/sink/RowDataTaskWriterFactory.java | 4 +-
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 51 ++++++
.../flink/sink/TestIcebergStreamWriter.java | 10 +-
8 files changed, 488 insertions(+), 61 deletions(-)
diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index b3af43a82..c34dfe857 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -551,6 +551,27 @@ FlinkSink.forRowData(input)
env.execute("Test Iceberg DataStream");
```
+## Write options
+
+Flink write options are passed when configuring the FlinkSink, like this:
+
+```
+FlinkSink.Builder builder = FlinkSink.forRow(dataStream,
SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .set("write-format", "orc")
+ .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
+```
+
+| Flink option | Default | Description
|
+|------------------------| --------------------------
|------------------------------------------------------------------------------------------------------------|
+| write-format | Table write.format.default | File format to use for
this write operation; parquet, avro, or orc
|
+| target-file-size-bytes | As per table property | Overrides this table's
write.target-file-size-bytes
|
+| upsert-enabled | Table write.upsert.enabled | Overrides this table's
write.upsert.enabled
|
+| overwrite-enabled | false | Overwrite the table's data, overwrite mode
shouldn't be enable when configuring to use UPSERT data stream. |
+| distribution-mode | Table write.distribution-mode | Overrides this
table's write.distribution-mode
|
+
+
## Inspecting tables.
Iceberg does not support inspecting table in flink sql now, we need to use
[iceberg's Java API](../api) to read iceberg's meta data to get those table
information.
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
new file mode 100644
index 000000000..e984f6875
--- /dev/null
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+class FlinkConfParser {
+
+ private final Map<String, String> tableProperties;
+ private final Map<String, String> options;
+ private final ReadableConfig readableConfig;
+
+ FlinkConfParser(Table table, Map<String, String> options, ReadableConfig
readableConfig) {
+ this.tableProperties = table.properties();
+ this.options = options;
+ this.readableConfig = readableConfig;
+ }
+
+ public BooleanConfParser booleanConf() {
+ return new BooleanConfParser();
+ }
+
+ public IntConfParser intConf() {
+ return new IntConfParser();
+ }
+
+ public LongConfParser longConf() {
+ return new LongConfParser();
+ }
+
+ public StringConfParser stringConf() {
+ return new StringConfParser();
+ }
+
+ class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
+ private Boolean defaultValue;
+
+ @Override
+ protected BooleanConfParser self() {
+ return this;
+ }
+
+ public BooleanConfParser defaultValue(boolean value) {
+ this.defaultValue = value;
+ return self();
+ }
+
+ public BooleanConfParser defaultValue(String value) {
+ this.defaultValue = Boolean.parseBoolean(value);
+ return self();
+ }
+
+ public boolean parse() {
+ Preconditions.checkArgument(defaultValue != null, "Default value cannot
be null");
+ return parse(Boolean::parseBoolean, defaultValue);
+ }
+ }
+
+ class IntConfParser extends ConfParser<IntConfParser, Integer> {
+ private Integer defaultValue;
+
+ @Override
+ protected IntConfParser self() {
+ return this;
+ }
+
+ public IntConfParser defaultValue(int value) {
+ this.defaultValue = value;
+ return self();
+ }
+
+ public int parse() {
+ Preconditions.checkArgument(defaultValue != null, "Default value cannot
be null");
+ return parse(Integer::parseInt, defaultValue);
+ }
+
+ public Integer parseOptional() {
+ return parse(Integer::parseInt, null);
+ }
+ }
+
+ class LongConfParser extends ConfParser<LongConfParser, Long> {
+ private Long defaultValue;
+
+ @Override
+ protected LongConfParser self() {
+ return this;
+ }
+
+ public LongConfParser defaultValue(long value) {
+ this.defaultValue = value;
+ return self();
+ }
+
+ public long parse() {
+ Preconditions.checkArgument(defaultValue != null, "Default value cannot
be null");
+ return parse(Long::parseLong, defaultValue);
+ }
+
+ public Long parseOptional() {
+ return parse(Long::parseLong, null);
+ }
+ }
+
+ class StringConfParser extends ConfParser<StringConfParser, String> {
+ private String defaultValue;
+
+ @Override
+ protected StringConfParser self() {
+ return this;
+ }
+
+ public StringConfParser defaultValue(String value) {
+ this.defaultValue = value;
+ return self();
+ }
+
+ public String parse() {
+ Preconditions.checkArgument(defaultValue != null, "Default value cannot
be null");
+ return parse(Function.identity(), defaultValue);
+ }
+
+ public String parseOptional() {
+ return parse(Function.identity(), null);
+ }
+ }
+
+ abstract class ConfParser<ThisT, T> {
+ private final List<String> optionNames = Lists.newArrayList();
+ private String tablePropertyName;
+ private ConfigOption<T> configOption;
+
+ protected abstract ThisT self();
+
+ public ThisT option(String name) {
+ this.optionNames.add(name);
+ return self();
+ }
+
+ public ThisT flinkConfig(ConfigOption<T> newConfigOption) {
+ this.configOption = newConfigOption;
+ return self();
+ }
+
+ public ThisT tableProperty(String name) {
+ this.tablePropertyName = name;
+ return self();
+ }
+
+ protected T parse(Function<String, T> conversion, T defaultValue) {
+ if (!optionNames.isEmpty()) {
+ for (String optionName : optionNames) {
+ String optionValue = options.get(optionName);
+ if (optionValue != null) {
+ return conversion.apply(optionValue);
+ }
+ }
+ }
+
+ if (configOption != null) {
+ T propertyValue = readableConfig.get(configOption);
+ if (propertyValue != null) {
+ return propertyValue;
+ }
+ }
+
+ if (tablePropertyName != null) {
+ String propertyValue = tableProperties.get(tablePropertyName);
+ if (propertyValue != null) {
+ return conversion.apply(propertyValue);
+ }
+ }
+
+ return defaultValue;
+ }
+ }
+}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
new file mode 100644
index 000000000..ddb5f18c5
--- /dev/null
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+
+/**
+ * A class for common Iceberg configs for Flink writes.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is
used (top to bottom):
+ * <ol>
+ * <li>Write options</li>
+ * <li>flink ReadableConfig</li>
+ * <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in write options and takes precedence over
all other configs.
+ * If no write option is provided, this class checks the flink configuration
for any overrides.
+ * If no applicable value is found in the write options, this class uses the
table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized.
+ */
+public class FlinkWriteConf {
+
+ private final FlinkConfParser confParser;
+
+ public FlinkWriteConf(Table table, Map<String, String> writeOptions,
ReadableConfig readableConfig) {
+ this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
+ }
+
+ public boolean overwriteMode() {
+ return confParser.booleanConf()
+ .option(FlinkWriteOptions.OVERWRITE_MODE.key())
+ .flinkConfig(FlinkWriteOptions.OVERWRITE_MODE)
+ .defaultValue(FlinkWriteOptions.OVERWRITE_MODE.defaultValue())
+ .parse();
+ }
+
+ public boolean upsertMode() {
+ return confParser.booleanConf()
+ .option(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key())
+ .flinkConfig(FlinkWriteOptions.WRITE_UPSERT_ENABLED)
+ .tableProperty(TableProperties.UPSERT_ENABLED)
+ .defaultValue(TableProperties.UPSERT_ENABLED_DEFAULT)
+ .parse();
+ }
+
+ public FileFormat dataFileFormat() {
+ String valueAsString = confParser.stringConf()
+ .option(FlinkWriteOptions.WRITE_FORMAT.key())
+ .flinkConfig(FlinkWriteOptions.WRITE_FORMAT)
+ .tableProperty(TableProperties.DEFAULT_FILE_FORMAT)
+ .defaultValue(TableProperties.DEFAULT_FILE_FORMAT_DEFAULT)
+ .parse();
+ return FileFormat.valueOf(valueAsString.toUpperCase(Locale.ENGLISH));
+ }
+
+ public long targetDataFileSize() {
+ return confParser.longConf()
+ .option(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES.key())
+ .flinkConfig(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES)
+ .tableProperty(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES)
+ .defaultValue(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT)
+ .parse();
+ }
+
+ public DistributionMode distributionMode() {
+ String modeName = confParser.stringConf()
+ .option(FlinkWriteOptions.DISTRIBUTION_MODE.key())
+ .flinkConfig(FlinkWriteOptions.DISTRIBUTION_MODE)
+ .tableProperty(TableProperties.WRITE_DISTRIBUTION_MODE)
+ .defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_NONE)
+ .parse();
+ return DistributionMode.fromName(modeName);
+ }
+
+ public int workerPoolSize() {
+ return confParser.intConf()
+ .flinkConfig(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE)
+
.defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
+ .parse();
+ }
+}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
new file mode 100644
index 000000000..d0dc9c7fd
--- /dev/null
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Flink sink write options
+ */
+public class FlinkWriteOptions {
+
+ private FlinkWriteOptions() {
+ }
+
+ // File format for write operations(default: Table write.format.default )
+ public static final ConfigOption<String> WRITE_FORMAT =
+ ConfigOptions.key("write-format")
+ .stringType().noDefaultValue();
+
+ // Overrides this table's write.target-file-size-bytes
+ public static final ConfigOption<Long> TARGET_FILE_SIZE_BYTES =
+ ConfigOptions.key("target-file-size-bytes")
+ .longType().noDefaultValue();
+
+ // Overrides this table's write.upsert.enabled
+ public static final ConfigOption<Boolean> WRITE_UPSERT_ENABLED =
+ ConfigOptions.key("upsert-enabled")
+ .booleanType().noDefaultValue();
+
+ public static final ConfigOption<Boolean> OVERWRITE_MODE =
+ ConfigOptions.key("overwrite-enabled")
+ .booleanType().defaultValue(false);
+
+ // Overrides the table's write.distribution-mode
+ public static final ConfigOption<String> DISTRIBUTION_MODE =
+ ConfigOptions.key("distribution-mode")
+ .stringType().noDefaultValue();
+
+}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 72f0580cf..56fafab06 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -22,7 +22,6 @@ package org.apache.iceberg.flink.sink;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@@ -43,14 +42,14 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
-import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteConf;
+import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
@@ -60,18 +59,10 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED;
-import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
-import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
public class FlinkSink {
private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
@@ -132,14 +123,13 @@ public class FlinkSink {
private TableLoader tableLoader;
private Table table;
private TableSchema tableSchema;
- private boolean overwrite = false;
- private DistributionMode distributionMode = null;
private Integer writeParallelism = null;
- private boolean upsert = false;
private List<String> equalityFieldColumns = null;
private String uidPrefix = null;
private final Map<String, String> snapshotProperties = Maps.newHashMap();
private ReadableConfig readableConfig = new Configuration();
+ private final Map<String, String> writeOptions = Maps.newHashMap();
+ private FlinkWriteConf flinkWriteConf = null;
private Builder() {
}
@@ -191,13 +181,31 @@ public class FlinkSink {
return this;
}
+ /**
+ * Set the write properties for Flink sink.
+ * View the supported properties in {@link FlinkWriteOptions}
+ */
+ public Builder set(String property, String value) {
+ writeOptions.put(property, value);
+ return this;
+ }
+
+ /**
+ * Set the write properties for Flink sink.
+ * View the supported properties in {@link FlinkWriteOptions}
+ */
+ public Builder setAll(Map<String, String> properties) {
+ writeOptions.putAll(properties);
+ return this;
+ }
+
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}
public Builder overwrite(boolean newOverwrite) {
- this.overwrite = newOverwrite;
+ writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(),
Boolean.toString(newOverwrite));
return this;
}
@@ -216,7 +224,9 @@ public class FlinkSink {
public Builder distributionMode(DistributionMode mode) {
Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode),
"Flink does not support 'range' write distribution mode now.");
- this.distributionMode = mode;
+ if (mode != null) {
+ writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(),
mode.modeName());
+ }
return this;
}
@@ -241,7 +251,7 @@ public class FlinkSink {
* @return {@link Builder} to connect the iceberg table.
*/
public Builder upsert(boolean enabled) {
- this.upsert = enabled;
+ writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(),
Boolean.toString(enabled));
return this;
}
@@ -308,6 +318,8 @@ public class FlinkSink {
}
}
+ flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);
+
// Find out the equality field id list based on the user-provided
equality field column names.
List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
@@ -316,7 +328,7 @@ public class FlinkSink {
// Distribute the records from input data stream based on the
write.distribution-mode and equality fields.
DataStream<RowData> distributeStream = distributeDataStream(
- rowDataInput, table.properties(), equalityFieldIds, table.spec(),
table.schema(), flinkRowType);
+ rowDataInput, equalityFieldIds, table.spec(), table.schema(),
flinkRowType);
// Add parallel writers that append rows to files
SingleOutputStreamOperator<WriteResult> writerStream =
appendWriter(distributeStream, flinkRowType,
@@ -379,8 +391,8 @@ public class FlinkSink {
private SingleOutputStreamOperator<Void>
appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(
- tableLoader, overwrite, snapshotProperties,
-
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
+ tableLoader, flinkWriteConf.overwriteMode(), snapshotProperties,
+ flinkWriteConf.workerPoolSize());
SingleOutputStreamOperator<Void> committerStream = writerStream
.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID,
filesCommitter)
.setParallelism(1)
@@ -393,14 +405,9 @@ public class FlinkSink {
private SingleOutputStreamOperator<WriteResult>
appendWriter(DataStream<RowData> input, RowType flinkRowType,
List<Integer>
equalityFieldIds) {
-
- // Fallback to use upsert mode parsed from table properties if don't
specify in job level.
- boolean upsertMode = upsert ||
PropertyUtil.propertyAsBoolean(table.properties(),
- UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
-
// Validate the equality fields and partition fields if we enable the
upsert mode.
- if (upsertMode) {
- Preconditions.checkState(!overwrite,
+ if (flinkWriteConf.upsertMode()) {
+ Preconditions.checkState(!flinkWriteConf.overwriteMode(),
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT
data stream.");
Preconditions.checkState(!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use
UPSERT data stream.");
@@ -413,7 +420,8 @@ public class FlinkSink {
}
}
- IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table,
flinkRowType, equalityFieldIds, upsertMode);
+ IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table,
flinkWriteConf,
+ flinkRowType, equalityFieldIds);
int parallelism = writeParallelism == null ? input.getParallelism() :
writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -426,22 +434,11 @@ public class FlinkSink {
}
private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
- Map<String, String>
properties,
List<Integer>
equalityFieldIds,
PartitionSpec
partitionSpec,
Schema iSchema,
RowType flinkRowType) {
- DistributionMode writeMode;
- if (distributionMode == null) {
- // Fallback to use distribution mode parsed from table properties if
don't specify in job level.
- String modeName = PropertyUtil.propertyAsString(properties,
- WRITE_DISTRIBUTION_MODE,
- WRITE_DISTRIBUTION_MODE_NONE);
-
- writeMode = DistributionMode.fromName(modeName);
- } else {
- writeMode = distributionMode;
- }
+ DistributionMode writeMode = flinkWriteConf.distributionMode();
LOG.info("Write distribution mode is '{}'", writeMode.modeName());
switch (writeMode) {
@@ -511,30 +508,15 @@ public class FlinkSink {
}
static IcebergStreamWriter<RowData> createStreamWriter(Table table,
+ FlinkWriteConf
flinkWriteConf,
RowType flinkRowType,
- List<Integer>
equalityFieldIds,
- boolean upsert) {
+ List<Integer>
equalityFieldIds) {
Preconditions.checkArgument(table != null, "Iceberg table should't be
null");
- Map<String, String> props = table.properties();
- long targetFileSize = getTargetFileSizeBytes(props);
- FileFormat fileFormat = getFileFormat(props);
Table serializableTable = SerializableTable.copyOf(table);
TaskWriterFactory<RowData> taskWriterFactory = new
RowDataTaskWriterFactory(
- serializableTable, flinkRowType, targetFileSize,
- fileFormat, equalityFieldIds, upsert);
-
+ serializableTable, flinkRowType, flinkWriteConf.targetDataFileSize(),
+ flinkWriteConf.dataFileFormat(), equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
-
- private static FileFormat getFileFormat(Map<String, String> properties) {
- String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT,
DEFAULT_FILE_FORMAT_DEFAULT);
- return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
- }
-
- private static long getTargetFileSizeBytes(Map<String, String> properties) {
- return PropertyUtil.propertyAsLong(properties,
- WRITE_TARGET_FILE_SIZE_BYTES,
- WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- }
}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 514430502..f6ee976e6 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -85,7 +85,9 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
@Override
public void initialize(int taskId, int attemptId) {
- this.outputFileFactory = OutputFileFactory.builderFor(table, taskId,
attemptId).build();
+ this.outputFileFactory = OutputFileFactory.builderFor(table, taskId,
attemptId)
+ .format(format)
+ .build();
}
@Override
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 70f87f270..263c2fc03 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
@@ -47,6 +48,7 @@ import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -325,4 +327,53 @@ public class TestFlinkIcebergSink {
Assert.assertEquals("rightTable",
rightTable.currentSnapshot().summary().get("direction"));
}
+ @Test
+ public void testOverrideWriteConfigWithUnknownDistributionMode() {
+ Map<String, String> newProps = Maps.newHashMap();
+ newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
+
+ List<Row> rows = createRows("");
+ DataStream<Row> dataStream = env.addSource(createBoundedSource(rows),
ROW_TYPE_INFO);
+
+ FlinkSink.Builder builder = FlinkSink.forRow(dataStream,
SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .setAll(newProps);
+
+ AssertHelpers.assertThrows("Should fail with invalid distribution mode.",
+ IllegalArgumentException.class, "No enum constant
org.apache.iceberg.DistributionMode.UNRECOGNIZED",
+ () -> {
+ builder.append();
+
+ // Execute the program.
+ env.execute("Test Iceberg DataStream.");
+ return null;
+ });
+ }
+
+ @Test
+ public void testOverrideWriteConfigWithUnknownFileFormat() {
+ Map<String, String> newProps = Maps.newHashMap();
+ newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
+
+ List<Row> rows = createRows("");
+ DataStream<Row> dataStream = env.addSource(createBoundedSource(rows),
ROW_TYPE_INFO);
+
+ FlinkSink.Builder builder = FlinkSink.forRow(dataStream,
SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .setAll(newProps);
+
+ AssertHelpers.assertThrows("Should fail with invalid file format.",
+ IllegalArgumentException.class, "No enum constant
org.apache.iceberg.FileFormat.UNRECOGNIZED",
+ () -> {
+ builder.append();
+
+ // Execute the program.
+ env.execute("Test Iceberg DataStream.");
+ return null;
+ });
+ }
}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index 433c6a1ec..ae2171238 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -47,12 +47,14 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
@@ -333,7 +335,13 @@ public class TestIcebergStreamWriter {
private OneInputStreamOperatorTestHarness<RowData, WriteResult>
createIcebergStreamWriter(
Table icebergTable, TableSchema flinkSchema) throws Exception {
RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(),
flinkSchema);
- IcebergStreamWriter<RowData> streamWriter =
FlinkSink.createStreamWriter(icebergTable, flinkRowType, null, false);
+ FlinkWriteConf flinkWriteConfig =
+ new FlinkWriteConf(icebergTable, Maps.newHashMap(), new
org.apache.flink.configuration.Configuration());
+
+ IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(
+ icebergTable,
+ flinkWriteConfig,
+ flinkRowType, null);
OneInputStreamOperatorTestHarness<RowData, WriteResult> harness = new
OneInputStreamOperatorTestHarness<>(
streamWriter, 1, 1, 0);