This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d4f0d7e803 Flink: Add IcebergSinkBuilder interface allowed unification 
of most of operations on FlinkSink and IcebergSink Builders (#11305)
d4f0d7e803 is described below

commit d4f0d7e8036488924edf9acdc2aed6fc3a3415e1
Author: Arek Burdach <[email protected]>
AuthorDate: Mon Oct 21 06:35:03 2024 +0200

    Flink: Add IcebergSinkBuilder interface allowed unification of most of 
operations on FlinkSink and IcebergSink Builders (#11305)
---
 .../org/apache/iceberg/flink/sink/FlinkSink.java   | 14 +++-
 .../org/apache/iceberg/flink/sink/IcebergSink.java | 14 +++-
 .../iceberg/flink/sink/IcebergSinkBuilder.java     | 83 ++++++++++++++++++++++
 3 files changed, 109 insertions(+), 2 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index c534314909..e862e88c96 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -134,7 +134,7 @@ public class FlinkSink {
     return new Builder().forRowData(input);
   }
 
-  public static class Builder {
+  public static class Builder implements IcebergSinkBuilder<Builder> {
     private Function<String, DataStream<RowData>> inputCreator = null;
     private TableLoader tableLoader;
     private Table table;
@@ -179,6 +179,7 @@ public class FlinkSink {
      * @param newTable the loaded iceberg table instance.
      * @return {@link Builder} to connect the iceberg table.
      */
+    @Override
     public Builder table(Table newTable) {
       this.table = newTable;
       return this;
@@ -192,6 +193,7 @@ public class FlinkSink {
      * @param newTableLoader to load iceberg table inside tasks.
      * @return {@link Builder} to connect the iceberg table.
      */
+    @Override
     public Builder tableLoader(TableLoader newTableLoader) {
       this.tableLoader = newTableLoader;
       return this;
@@ -210,21 +212,25 @@ public class FlinkSink {
      * Set the write properties for Flink sink. View the supported properties 
in {@link
      * FlinkWriteOptions}
      */
+    @Override
     public Builder setAll(Map<String, String> properties) {
       writeOptions.putAll(properties);
       return this;
     }
 
+    @Override
     public Builder tableSchema(TableSchema newTableSchema) {
       this.tableSchema = newTableSchema;
       return this;
     }
 
+    @Override
     public Builder overwrite(boolean newOverwrite) {
       writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), 
Boolean.toString(newOverwrite));
       return this;
     }
 
+    @Override
     public Builder flinkConf(ReadableConfig config) {
       this.readableConfig = config;
       return this;
@@ -237,6 +243,7 @@ public class FlinkSink {
      * @param mode to specify the write distribution mode.
      * @return {@link Builder} to connect the iceberg table.
      */
+    @Override
     public Builder distributionMode(DistributionMode mode) {
       if (mode != null) {
         writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), 
mode.modeName());
@@ -306,6 +313,7 @@ public class FlinkSink {
      * @param newWriteParallelism the number of parallel iceberg stream writer.
      * @return {@link Builder} to connect the iceberg table.
      */
+    @Override
     public Builder writeParallelism(int newWriteParallelism) {
       writeOptions.put(
           FlinkWriteOptions.WRITE_PARALLELISM.key(), 
Integer.toString(newWriteParallelism));
@@ -321,6 +329,7 @@ public class FlinkSink {
      * @param enabled indicate whether it should transform all 
INSERT/UPDATE_AFTER events to UPSERT.
      * @return {@link Builder} to connect the iceberg table.
      */
+    @Override
     public Builder upsert(boolean enabled) {
       writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), 
Boolean.toString(enabled));
       return this;
@@ -332,6 +341,7 @@ public class FlinkSink {
      * @param columns defines the iceberg table's key.
      * @return {@link Builder} to connect the iceberg table.
      */
+    @Override
     public Builder equalityFieldColumns(List<String> columns) {
       this.equalityFieldColumns = columns;
       return this;
@@ -376,6 +386,7 @@ public class FlinkSink {
       return this;
     }
 
+    @Override
     public Builder toBranch(String branch) {
       writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
       return this;
@@ -436,6 +447,7 @@ public class FlinkSink {
      *
      * @return {@link DataStreamSink} for sink.
      */
+    @Override
     public DataStreamSink<Void> append() {
       return chainIcebergOperators();
     }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index d080169544..01be4a2eef 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -255,7 +255,7 @@ public class IcebergSink
     return new WriteResultSerializer();
   }
 
-  public static class Builder {
+  public static class Builder implements IcebergSinkBuilder<Builder> {
     private TableLoader tableLoader;
     private String uidSuffix = "";
     private Function<String, DataStream<RowData>> inputCreator = null;
@@ -311,6 +311,7 @@ public class IcebergSink
      * @param newTable the loaded iceberg table instance.
      * @return {@link IcebergSink.Builder} to connect the iceberg table.
      */
+    @Override
     public Builder table(Table newTable) {
       this.table = (SerializableTable) SerializableTable.copyOf(newTable);
       return this;
@@ -325,6 +326,7 @@ public class IcebergSink
      * @param newTableLoader to load iceberg table inside tasks.
      * @return {@link Builder} to connect the iceberg table.
      */
+    @Override
     public Builder tableLoader(TableLoader newTableLoader) {
       this.tableLoader = newTableLoader;
       return this;
@@ -347,21 +349,25 @@ public class IcebergSink
      * Set the write properties for IcebergSink. View the supported properties 
in {@link
      * FlinkWriteOptions}
      */
+    @Override
     public Builder setAll(Map<String, String> properties) {
       writeOptions.putAll(properties);
       return this;
     }
 
+    @Override
     public Builder tableSchema(TableSchema newTableSchema) {
       this.tableSchema = newTableSchema;
       return this;
     }
 
+    @Override
     public Builder overwrite(boolean newOverwrite) {
       writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), 
Boolean.toString(newOverwrite));
       return this;
     }
 
+    @Override
     public Builder flinkConf(ReadableConfig config) {
       this.readableConfig = config;
       return this;
@@ -374,6 +380,7 @@ public class IcebergSink
      * @param mode to specify the write distribution mode.
      * @return {@link IcebergSink.Builder} to connect the iceberg table.
      */
+    @Override
     public Builder distributionMode(DistributionMode mode) {
       Preconditions.checkArgument(
           !DistributionMode.RANGE.equals(mode),
@@ -390,6 +397,7 @@ public class IcebergSink
      * @param newWriteParallelism the number of parallel iceberg stream writer.
      * @return {@link IcebergSink.Builder} to connect the iceberg table.
      */
+    @Override
     public Builder writeParallelism(int newWriteParallelism) {
       writeOptions.put(
           FlinkWriteOptions.WRITE_PARALLELISM.key(), 
Integer.toString(newWriteParallelism));
@@ -405,6 +413,7 @@ public class IcebergSink
      * @param enabled indicate whether it should transform all 
INSERT/UPDATE_AFTER events to UPSERT.
      * @return {@link IcebergSink.Builder} to connect the iceberg table.
      */
+    @Override
     public Builder upsert(boolean enabled) {
       writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), 
Boolean.toString(enabled));
       return this;
@@ -416,6 +425,7 @@ public class IcebergSink
      * @param columns defines the iceberg table's key.
      * @return {@link Builder} to connect the iceberg table.
      */
+    @Override
     public Builder equalityFieldColumns(List<String> columns) {
       this.equalityFieldColumns = columns;
       return this;
@@ -458,6 +468,7 @@ public class IcebergSink
       return this;
     }
 
+    @Override
     public Builder toBranch(String branch) {
       writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
       return this;
@@ -527,6 +538,7 @@ public class IcebergSink
      *
      * @return {@link DataStreamSink} for sink.
      */
+    @Override
     public DataStreamSink<RowData> append() {
       IcebergSink sink = build();
       String suffix = defaultSuffix(uidSuffix, table.name());
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java
new file mode 100644
index 0000000000..f232df5128
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+
+/**
+ * This class is for internal purpose of transition between the previous 
implementation of Flink's
+ * sink ({@link FlinkSink}) and the new one implementation based on Flink 
SinkV2 API ({@link
+ * IcebergSink}). After we remove the previous implementation, all occurrences 
of this class would
+ * be replaced by direct {@link IcebergSink} usage.
+ */
+@Internal
+interface IcebergSinkBuilder<T extends IcebergSinkBuilder<?>> {
+
+  T tableSchema(TableSchema newTableSchema);
+
+  T tableLoader(TableLoader newTableLoader);
+
+  T equalityFieldColumns(List<String> columns);
+
+  T overwrite(boolean newOverwrite);
+
+  T setAll(Map<String, String> properties);
+
+  T flinkConf(ReadableConfig config);
+
+  T table(Table newTable);
+
+  T writeParallelism(int newWriteParallelism);
+
+  T distributionMode(DistributionMode mode);
+
+  T toBranch(String branch);
+
+  T upsert(boolean enabled);
+
+  DataStreamSink<?> append();
+
+  static IcebergSinkBuilder<?> forRow(
+      DataStream<Row> input, TableSchema tableSchema, boolean useV2Sink) {
+    if (useV2Sink) {
+      return IcebergSink.forRow(input, tableSchema);
+    } else {
+      return FlinkSink.forRow(input, tableSchema);
+    }
+  }
+
+  static IcebergSinkBuilder<?> forRowData(DataStream<RowData> input, boolean 
useV2Sink) {
+    if (useV2Sink) {
+      return IcebergSink.forRowData(input);
+    } else {
+      return FlinkSink.forRowData(input);
+    }
+  }
+}

Reply via email to