This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f6d24f53d9 [HUDI-6235] Update and Delete statements for Flink (#8749)
2f6d24f53d9 is described below

commit 2f6d24f53d9937d44bf208f26d7edcaaa4eda19b
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Wed May 24 11:06:47 2023 +0800

    [HUDI-6235] Update and Delete statements for Flink (#8749)
    
    * support UPDATE statement
    * support DELETE statement
    * keep backward compatibility for pre 1.17.0 releases
    * fix the 1.17.0 bundle validation for incorrect flink runtime jar
---
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  1 +
 .../org/apache/hudi/table/HoodieTableSink.java     | 24 +++++++++-
 .../apache/hudi/util/DataModificationInfos.java    | 34 ++++++++++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 54 ++++++++++++++++++++++
 .../test/java/org/apache/hudi/utils/TestData.java  | 32 +++++++++++++
 .../adapter/SupportsRowLevelDeleteAdapter.java     | 33 +++++++++++++
 .../adapter/SupportsRowLevelUpdateAdapter.java     | 37 +++++++++++++++
 .../adapter/SupportsRowLevelDeleteAdapter.java     | 33 +++++++++++++
 .../adapter/SupportsRowLevelUpdateAdapter.java     | 37 +++++++++++++++
 .../adapter/SupportsRowLevelDeleteAdapter.java     | 33 +++++++++++++
 .../adapter/SupportsRowLevelUpdateAdapter.java     | 37 +++++++++++++++
 .../adapter/SupportsRowLevelDeleteAdapter.java     | 33 +++++++++++++
 .../adapter/SupportsRowLevelUpdateAdapter.java     | 37 +++++++++++++++
 .../adapter/SupportsRowLevelDeleteAdapter.java     | 42 +++++++++++++++++
 .../adapter/SupportsRowLevelUpdateAdapter.java     | 45 ++++++++++++++++++
 .../base/build_flink1170hive313spark332.sh         | 27 +++++++++++
 packaging/bundle-validation/ci_run.sh              |  4 +-
 17 files changed, 540 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index c37b2325ca7..7dc6ea9c0f4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -186,6 +186,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
         this.writeFunction = (records, instantTime) -> 
this.writeClient.insert(records, instantTime);
         break;
       case UPSERT:
+      case DELETE: // shares the code path with UPSERT
         this.writeFunction = (records, instantTime) -> 
this.writeClient.upsert(records, instantTime);
         break;
       case INSERT_OVERWRITE:
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index af10b620e69..81b3a6eefd5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.adapter.DataStreamSinkProviderAdapter;
+import org.apache.hudi.adapter.SupportsRowLevelDeleteAdapter;
+import org.apache.hudi.adapter.SupportsRowLevelUpdateAdapter;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -26,10 +28,12 @@ import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.utils.Pipelines;
 import org.apache.hudi.util.ChangelogModes;
+import org.apache.hudi.util.DataModificationInfos;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -37,12 +41,18 @@ import 
org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.List;
 import java.util.Map;
 
 /**
  * Hoodie table sink.
  */
-public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning, SupportsOverwrite {
+public class HoodieTableSink implements
+    DynamicTableSink,
+    SupportsPartitioning,
+    SupportsOverwrite,
+    SupportsRowLevelDeleteAdapter,
+    SupportsRowLevelUpdateAdapter {
 
   private final Configuration conf;
   private final ResolvedSchema schema;
@@ -148,4 +158,16 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
     // if there are explicit partitions, #applyStaticPartition would overwrite 
the option.
     this.conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.INSERT_OVERWRITE_TABLE.value());
   }
+
+  @Override
+  public RowLevelDeleteInfoAdapter applyRowLevelDelete() {
+    this.conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.DELETE.value());
+    return DataModificationInfos.DEFAULT_DELETE_INFO;
+  }
+
+  @Override
+  public RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> list) {
+    this.conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.UPSERT.value());
+    return DataModificationInfos.DEFAULT_UPDATE_INFO;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataModificationInfos.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataModificationInfos.java
new file mode 100644
index 00000000000..cc1e77607bb
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataModificationInfos.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.adapter.SupportsRowLevelDeleteAdapter;
+import org.apache.hudi.adapter.SupportsRowLevelUpdateAdapter;
+
+/**
+ * Utilities for all kinds of data modification infos.
+ *
+ * @see SupportsRowLevelUpdateAdapter
+ * @see SupportsRowLevelDeleteAdapter
+ */
+public class DataModificationInfos {
+  public static final SupportsRowLevelDeleteAdapter.RowLevelDeleteInfoAdapter 
DEFAULT_DELETE_INFO = new 
SupportsRowLevelDeleteAdapter.RowLevelDeleteInfoAdapter() {};
+
+  public static final SupportsRowLevelUpdateAdapter.RowLevelUpdateInfoAdapter 
DEFAULT_UPDATE_INFO = new 
SupportsRowLevelUpdateAdapter.RowLevelUpdateInfoAdapter() {};
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 9f2e1e13cf8..9021e88c057 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1952,6 +1952,60 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(actualResult, 
TestData.DATA_SET_INSERT_SEPARATE_PARTITION);
   }
 
+  @ParameterizedTest
+  @MethodSource("indexAndTableTypeParams")
+  void testUpdateDelete(String indexType, HoodieTableType tableType) {
+    TableEnvironment tableEnv = batchTableEnv;
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.INDEX_TYPE, indexType)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+    // update EQ(IN)
+    final String update1 = "update t1 set age=18 where uuid in('id1', 'id2')";
+
+    execInsertSql(tableEnv, update1);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    List<RowData> expected1 = TestData.update(TestData.DATA_SET_SOURCE_INSERT, 
2, 18, 0, 1);
+    assertRowsEquals(result1, expected1);
+
+    // update GT(>)
+    final String update2 = "update t1 set age=19 where uuid > 'id5'";
+
+    execInsertSql(tableEnv, update2);
+
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    List<RowData> expected2 = TestData.update(expected1, 2, 19, 5, 6, 7);
+    assertRowsEquals(result2, expected2);
+
+    // delete EQ(=)
+    final String update3 = "delete from t1 where uuid = 'id1'";
+
+    execInsertSql(tableEnv, update3);
+
+    List<Row> result3 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    List<RowData> expected3 = TestData.delete(expected2, 0);
+    assertRowsEquals(result3, expected3);
+
+    // delete LTE(<=)
+    final String update4 = "delete from t1 where uuid <= 'id5'";
+
+    execInsertSql(tableEnv, update4);
+
+    List<Row> result4 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    List<RowData> expected4 = TestData.delete(expected3, 0, 1, 2, 3);
+    assertRowsEquals(result4, expected4);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 263b72f489e..c2e505d9304 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -391,6 +391,38 @@ public class TestData {
     return inserts;
   }
 
+  /**
+   * Updates the rows with given value {@code val} at field index {@code idx}.
+   * All the target rows specified with range {@code targets} would be updated.
+   *
+   * <p>NOTE: only INT type is supported.
+   *
+   * @param dataset The rows to update
+   * @param idx     The target field index
+   * @param val     The new value
+   * @param targets The target row numbers to update, the number starts from 0
+   *
+   * @return Copied rows with new values setup.
+   */
+  public static List<RowData> update(List<RowData> dataset, int idx, int val, 
int... targets) {
+    List<RowData> copied = 
dataset.stream().map(TestConfigurations.SERIALIZER::copy).collect(Collectors.toList());
+    Arrays.stream(targets).forEach(target -> {
+      BinaryRowData rowData = (BinaryRowData) copied.get(target);
+      rowData.setInt(idx, val);
+    });
+    return copied;
+  }
+
+  /**
+   * Returns a copy of the given rows excluding the rows at indices {@code 
targets}.
+   */
+  public static List<RowData> delete(List<RowData> dataset, int... targets) {
+    Set<Integer> exclude = 
Arrays.stream(targets).boxed().collect(Collectors.toSet());
+    return IntStream.range(0, dataset.size())
+        .filter(i -> !exclude.contains(i))
+        .mapToObj(i -> 
TestConfigurations.SERIALIZER.copy(dataset.get(i))).collect(Collectors.toList());
+  }
+
   public static List<RowData> filterOddRows(List<RowData> rows) {
     return filterRowsByIndexPredicate(rows, i -> i % 2 != 0);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 00000000000..cd5c4eb891b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter clazz for {@code 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter {
+
+  RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+  /**
+   * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}.
+   */
+  interface RowLevelDeleteInfoAdapter {
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 00000000000..6a62763ec5b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@code 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter {
+
+  RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns);
+
+  /**
+   * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+   */
+  interface RowLevelUpdateInfoAdapter {
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 00000000000..cd5c4eb891b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter clazz for {@code 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter {
+
+  RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+  /**
+   * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}.
+   */
+  interface RowLevelDeleteInfoAdapter {
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 00000000000..6a62763ec5b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@code 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter {
+
+  RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns);
+
+  /**
+   * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+   */
+  interface RowLevelUpdateInfoAdapter {
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 00000000000..cd5c4eb891b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter clazz for {@code 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter {
+
+  RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+  /**
+   * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}.
+   */
+  interface RowLevelDeleteInfoAdapter {
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 00000000000..6a62763ec5b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@code 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter {
+
+  RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns);
+
+  /**
+   * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+   */
+  interface RowLevelUpdateInfoAdapter {
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 00000000000..cd5c4eb891b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+/**
+ * Adapter clazz for {@code 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter {
+
+  RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+  /**
+   * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}.
+   */
+  interface RowLevelDeleteInfoAdapter {
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 00000000000..6a62763ec5b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@code 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter {
+
+  RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns);
+
+  /**
+   * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+   */
+  interface RowLevelUpdateInfoAdapter {
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
new file mode 100644
index 00000000000..de0019d41bd
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+
+import javax.annotation.Nullable;
+
+/**
+ * Adapter clazz for {@link 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+public interface SupportsRowLevelDeleteAdapter extends SupportsRowLevelDelete {
+  @Override
+  default RowLevelDeleteInfo applyRowLevelDelete(@Nullable 
RowLevelModificationScanContext context) {
+    return applyRowLevelDelete();
+  }
+
+  RowLevelDeleteInfoAdapter applyRowLevelDelete();
+
+  /**
+   * Adapter clazz for {@link SupportsRowLevelDelete.RowLevelDeleteInfo}.
+   */
+  interface RowLevelDeleteInfoAdapter extends RowLevelDeleteInfo {
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
new file mode 100644
index 00000000000..17c785d4845
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Adapter clazz for {@link 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ */
+public interface SupportsRowLevelUpdateAdapter extends SupportsRowLevelUpdate {
+  @Override
+  default RowLevelUpdateInfo applyRowLevelUpdate(List<Column> updatedColumns, 
@Nullable RowLevelModificationScanContext context) {
+    return applyRowLevelUpdate(updatedColumns);
+  }
+
+  RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns);
+
+  /**
+   * Adapter clazz for {@link SupportsRowLevelUpdate.RowLevelUpdateInfo}.
+   */
+  interface RowLevelUpdateInfoAdapter extends RowLevelUpdateInfo {
+  }
+}
diff --git a/packaging/bundle-validation/base/build_flink1170hive313spark332.sh 
b/packaging/bundle-validation/base/build_flink1170hive313spark332.sh
new file mode 100755
index 00000000000..ae4858afcab
--- /dev/null
+++ b/packaging/bundle-validation/base/build_flink1170hive313spark332.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+docker build \
+ --build-arg HIVE_VERSION=3.1.3 \
+ --build-arg FLINK_VERSION=1.17.0 \
+ --build-arg SPARK_VERSION=3.3.2 \
+ --build-arg SPARK_HADOOP_VERSION=3 \
+ --build-arg HADOOP_VERSION=3.3.5 \
+ -t hudi-ci-bundle-validation-base:flink1170hive313spark332 .
+docker image tag hudi-ci-bundle-validation-base:flink1170hive313spark332 
apachehudi/hudi-ci-bundle-validation-base:flink1170hive313spark332
diff --git a/packaging/bundle-validation/ci_run.sh 
b/packaging/bundle-validation/ci_run.sh
index 17105d46e5b..42538f7a59f 100755
--- a/packaging/bundle-validation/ci_run.sh
+++ b/packaging/bundle-validation/ci_run.sh
@@ -88,12 +88,12 @@ elif [[ ${SPARK_RUNTIME} == 'spark3.3.2' ]]; then
   HADOOP_VERSION=2.7.7
   HIVE_VERSION=3.1.3
   DERBY_VERSION=10.14.1.0
-  FLINK_VERSION=1.15.3
+  FLINK_VERSION=1.17.0
   SPARK_VERSION=3.3.2
   SPARK_HADOOP_VERSION=2
   CONFLUENT_VERSION=5.5.12
   KAFKA_CONNECT_HDFS_VERSION=10.1.13
-  IMAGE_TAG=flink1153hive313spark332
+  IMAGE_TAG=flink1170hive313spark332
 fi
 
 # Copy bundle jars to temp dir for mounting

Reply via email to