This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d4f0d7e803 Flink: Add IcebergSinkBuilder interface allowed unification
of most of operations on FlinkSink and IcebergSink Builders (#11305)
d4f0d7e803 is described below
commit d4f0d7e8036488924edf9acdc2aed6fc3a3415e1
Author: Arek Burdach <[email protected]>
AuthorDate: Mon Oct 21 06:35:03 2024 +0200
Flink: Add IcebergSinkBuilder interface allowed unification of most of
operations on FlinkSink and IcebergSink Builders (#11305)
---
.../org/apache/iceberg/flink/sink/FlinkSink.java | 14 +++-
.../org/apache/iceberg/flink/sink/IcebergSink.java | 14 +++-
.../iceberg/flink/sink/IcebergSinkBuilder.java | 83 ++++++++++++++++++++++
3 files changed, 109 insertions(+), 2 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index c534314909..e862e88c96 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -134,7 +134,7 @@ public class FlinkSink {
return new Builder().forRowData(input);
}
- public static class Builder {
+ public static class Builder implements IcebergSinkBuilder<Builder> {
private Function<String, DataStream<RowData>> inputCreator = null;
private TableLoader tableLoader;
private Table table;
@@ -179,6 +179,7 @@ public class FlinkSink {
* @param newTable the loaded iceberg table instance.
* @return {@link Builder} to connect the iceberg table.
*/
+ @Override
public Builder table(Table newTable) {
this.table = newTable;
return this;
@@ -192,6 +193,7 @@ public class FlinkSink {
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
+ @Override
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
@@ -210,21 +212,25 @@ public class FlinkSink {
* Set the write properties for Flink sink. View the supported properties
in {@link
* FlinkWriteOptions}
*/
+ @Override
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}
+ @Override
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}
+ @Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(),
Boolean.toString(newOverwrite));
return this;
}
+ @Override
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
@@ -237,6 +243,7 @@ public class FlinkSink {
* @param mode to specify the write distribution mode.
* @return {@link Builder} to connect the iceberg table.
*/
+ @Override
public Builder distributionMode(DistributionMode mode) {
if (mode != null) {
writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(),
mode.modeName());
@@ -306,6 +313,7 @@ public class FlinkSink {
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link Builder} to connect the iceberg table.
*/
+ @Override
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(),
Integer.toString(newWriteParallelism));
@@ -321,6 +329,7 @@ public class FlinkSink {
* @param enabled indicate whether it should transform all
INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link Builder} to connect the iceberg table.
*/
+ @Override
public Builder upsert(boolean enabled) {
writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(),
Boolean.toString(enabled));
return this;
@@ -332,6 +341,7 @@ public class FlinkSink {
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
+ @Override
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
@@ -376,6 +386,7 @@ public class FlinkSink {
return this;
}
+ @Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
@@ -436,6 +447,7 @@ public class FlinkSink {
*
* @return {@link DataStreamSink} for sink.
*/
+ @Override
public DataStreamSink<Void> append() {
return chainIcebergOperators();
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index d080169544..01be4a2eef 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -255,7 +255,7 @@ public class IcebergSink
return new WriteResultSerializer();
}
- public static class Builder {
+ public static class Builder implements IcebergSinkBuilder<Builder> {
private TableLoader tableLoader;
private String uidSuffix = "";
private Function<String, DataStream<RowData>> inputCreator = null;
@@ -311,6 +311,7 @@ public class IcebergSink
* @param newTable the loaded iceberg table instance.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
+ @Override
public Builder table(Table newTable) {
this.table = (SerializableTable) SerializableTable.copyOf(newTable);
return this;
@@ -325,6 +326,7 @@ public class IcebergSink
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
+ @Override
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
@@ -347,21 +349,25 @@ public class IcebergSink
* Set the write properties for IcebergSink. View the supported properties
in {@link
* FlinkWriteOptions}
*/
+ @Override
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}
+ @Override
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}
+ @Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(),
Boolean.toString(newOverwrite));
return this;
}
+ @Override
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
@@ -374,6 +380,7 @@ public class IcebergSink
* @param mode to specify the write distribution mode.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
+ @Override
public Builder distributionMode(DistributionMode mode) {
Preconditions.checkArgument(
!DistributionMode.RANGE.equals(mode),
@@ -390,6 +397,7 @@ public class IcebergSink
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
+ @Override
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(),
Integer.toString(newWriteParallelism));
@@ -405,6 +413,7 @@ public class IcebergSink
* @param enabled indicate whether it should transform all
INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
+ @Override
public Builder upsert(boolean enabled) {
writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(),
Boolean.toString(enabled));
return this;
@@ -416,6 +425,7 @@ public class IcebergSink
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
+ @Override
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
@@ -458,6 +468,7 @@ public class IcebergSink
return this;
}
+ @Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
@@ -527,6 +538,7 @@ public class IcebergSink
*
* @return {@link DataStreamSink} for sink.
*/
+ @Override
public DataStreamSink<RowData> append() {
IcebergSink sink = build();
String suffix = defaultSuffix(uidSuffix, table.name());
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java
new file mode 100644
index 0000000000..f232df5128
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+
+/**
+ * This class is for internal purpose of transition between the previous
implementation of Flink's
+ * sink ({@link FlinkSink}) and the new one implementation based on Flink
SinkV2 API ({@link
+ * IcebergSink}). After we remove the previous implementation, all occurrences
of this class would
+ * be replaced by direct {@link IcebergSink} usage.
+ */
+@Internal
+interface IcebergSinkBuilder<T extends IcebergSinkBuilder<?>> {
+
+ T tableSchema(TableSchema newTableSchema);
+
+ T tableLoader(TableLoader newTableLoader);
+
+ T equalityFieldColumns(List<String> columns);
+
+ T overwrite(boolean newOverwrite);
+
+ T setAll(Map<String, String> properties);
+
+ T flinkConf(ReadableConfig config);
+
+ T table(Table newTable);
+
+ T writeParallelism(int newWriteParallelism);
+
+ T distributionMode(DistributionMode mode);
+
+ T toBranch(String branch);
+
+ T upsert(boolean enabled);
+
+ DataStreamSink<?> append();
+
+ static IcebergSinkBuilder<?> forRow(
+ DataStream<Row> input, TableSchema tableSchema, boolean useV2Sink) {
+ if (useV2Sink) {
+ return IcebergSink.forRow(input, tableSchema);
+ } else {
+ return FlinkSink.forRow(input, tableSchema);
+ }
+ }
+
+ static IcebergSinkBuilder<?> forRowData(DataStream<RowData> input, boolean
useV2Sink) {
+ if (useV2Sink) {
+ return IcebergSink.forRowData(input);
+ } else {
+ return FlinkSink.forRowData(input);
+ }
+ }
+}