This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2f6d24f53d9 [HUDI-6235] Update and Delete statements for Flink (#8749) 2f6d24f53d9 is described below commit 2f6d24f53d9937d44bf208f26d7edcaaa4eda19b Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Wed May 24 11:06:47 2023 +0800 [HUDI-6235] Update and Delete statements for Flink (#8749) * support UPDATE statement * support DELETE statement * keep backward compatibility for pre 1.17.0 releases * fix the 1.17.0 bundle validation for incorrect flink runtime jar --- .../org/apache/hudi/sink/StreamWriteFunction.java | 1 + .../org/apache/hudi/table/HoodieTableSink.java | 24 +++++++++- .../apache/hudi/util/DataModificationInfos.java | 34 ++++++++++++++ .../apache/hudi/table/ITTestHoodieDataSource.java | 54 ++++++++++++++++++++++ .../test/java/org/apache/hudi/utils/TestData.java | 32 +++++++++++++ .../adapter/SupportsRowLevelDeleteAdapter.java | 33 +++++++++++++ .../adapter/SupportsRowLevelUpdateAdapter.java | 37 +++++++++++++++ .../adapter/SupportsRowLevelDeleteAdapter.java | 33 +++++++++++++ .../adapter/SupportsRowLevelUpdateAdapter.java | 37 +++++++++++++++ .../adapter/SupportsRowLevelDeleteAdapter.java | 33 +++++++++++++ .../adapter/SupportsRowLevelUpdateAdapter.java | 37 +++++++++++++++ .../adapter/SupportsRowLevelDeleteAdapter.java | 33 +++++++++++++ .../adapter/SupportsRowLevelUpdateAdapter.java | 37 +++++++++++++++ .../adapter/SupportsRowLevelDeleteAdapter.java | 42 +++++++++++++++++ .../adapter/SupportsRowLevelUpdateAdapter.java | 45 ++++++++++++++++++ .../base/build_flink1170hive313spark332.sh | 27 +++++++++++ packaging/bundle-validation/ci_run.sh | 4 +- 17 files changed, 540 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index c37b2325ca7..7dc6ea9c0f4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -186,6 +186,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime); break; case UPSERT: + case DELETE: // shares the code path with UPSERT this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime); break; case INSERT_OVERWRITE: diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index af10b620e69..81b3a6eefd5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -19,6 +19,8 @@ package org.apache.hudi.table; import org.apache.hudi.adapter.DataStreamSinkProviderAdapter; +import org.apache.hudi.adapter.SupportsRowLevelDeleteAdapter; +import org.apache.hudi.adapter.SupportsRowLevelUpdateAdapter; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; @@ -26,10 +28,12 @@ import org.apache.hudi.configuration.OptionsInference; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.util.ChangelogModes; +import org.apache.hudi.util.DataModificationInfos; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -37,12 +41,18 @@ import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.types.logical.RowType; +import java.util.List; import java.util.Map; /** * Hoodie table sink. */ -public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite { +public class HoodieTableSink implements + DynamicTableSink, + SupportsPartitioning, + SupportsOverwrite, + SupportsRowLevelDeleteAdapter, + SupportsRowLevelUpdateAdapter { private final Configuration conf; private final ResolvedSchema schema; @@ -148,4 +158,16 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // if there are explicit partitions, #applyStaticPartition would overwrite the option. this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value()); } + + @Override + public RowLevelDeleteInfoAdapter applyRowLevelDelete() { + this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.DELETE.value()); + return DataModificationInfos.DEFAULT_DELETE_INFO; + } + + @Override + public RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> list) { + this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.UPSERT.value()); + return DataModificationInfos.DEFAULT_UPDATE_INFO; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataModificationInfos.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataModificationInfos.java new file mode 100644 index 00000000000..cc1e77607bb --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataModificationInfos.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.adapter.SupportsRowLevelDeleteAdapter; +import org.apache.hudi.adapter.SupportsRowLevelUpdateAdapter; + +/** + * Utilities for all kinds of data modification infos. + * + * @see SupportsRowLevelUpdateAdapter + * @see SupportsRowLevelDeleteAdapter + */ +public class DataModificationInfos { + public static final SupportsRowLevelDeleteAdapter.RowLevelDeleteInfoAdapter DEFAULT_DELETE_INFO = new SupportsRowLevelDeleteAdapter.RowLevelDeleteInfoAdapter() {}; + + public static final SupportsRowLevelUpdateAdapter.RowLevelUpdateInfoAdapter DEFAULT_UPDATE_INFO = new SupportsRowLevelUpdateAdapter.RowLevelUpdateInfoAdapter() {}; +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 9f2e1e13cf8..9021e88c057 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1952,6 +1952,60 @@ public class ITTestHoodieDataSource { assertRowsEquals(actualResult, TestData.DATA_SET_INSERT_SEPARATE_PARTITION); } + @ParameterizedTest + @MethodSource("indexAndTableTypeParams") + void testUpdateDelete(String indexType, HoodieTableType tableType) { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.INDEX_TYPE, indexType) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_T1); + + // update EQ(IN) + final String update1 = "update t1 set age=18 where uuid in('id1', 'id2')"; + + execInsertSql(tableEnv, update1); + + List<Row> result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + List<RowData> expected1 = TestData.update(TestData.DATA_SET_SOURCE_INSERT, 2, 18, 0, 1); + assertRowsEquals(result1, expected1); + + // update GT(>) + final String update2 = "update t1 set age=19 where uuid > 'id5'"; + + execInsertSql(tableEnv, update2); + + List<Row> result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + List<RowData> expected2 = TestData.update(expected1, 2, 19, 5, 6, 7); + assertRowsEquals(result2, expected2); + + // delete EQ(=) + final String update3 = "delete from t1 where uuid = 'id1'"; + + execInsertSql(tableEnv, update3); + + List<Row> result3 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + List<RowData> expected3 = TestData.delete(expected2, 0); + assertRowsEquals(result3, expected3); + + // delete LTE(<=) + final String update4 = "delete from t1 where uuid <= 'id5'"; + + execInsertSql(tableEnv, update4); + + List<Row> result4 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + List<RowData> expected4 = TestData.delete(expected3, 0, 1, 2, 3); + assertRowsEquals(result4, expected4); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 263b72f489e..c2e505d9304 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -391,6 +391,38 @@ public class TestData { return inserts; } + /** + * Updates the rows with given value {@code val} at field index {@code idx}. + * All the target rows specified with range {@code targets} would be updated. + * + * <p>NOTE: only INT type is supported. + * + * @param dataset The rows to update + * @param idx The target field index + * @param val The new value + * @param targets The target row numbers to update, the number starts from 0 + * + * @return Copied rows with new values setup. + */ + public static List<RowData> update(List<RowData> dataset, int idx, int val, int... targets) { + List<RowData> copied = dataset.stream().map(TestConfigurations.SERIALIZER::copy).collect(Collectors.toList()); + Arrays.stream(targets).forEach(target -> { + BinaryRowData rowData = (BinaryRowData) copied.get(target); + rowData.setInt(idx, val); + }); + return copied; + } + + /** + * Returns a copy of the given rows excluding the rows at indices {@code targets}. + */ + public static List<RowData> delete(List<RowData> dataset, int... targets) { + Set<Integer> exclude = Arrays.stream(targets).boxed().collect(Collectors.toSet()); + return IntStream.range(0, dataset.size()) + .filter(i -> !exclude.contains(i)) + .mapToObj(i -> TestConfigurations.SERIALIZER.copy(dataset.get(i))).collect(Collectors.toList()); + } + public static List<RowData> filterOddRows(List<RowData> rows) { return filterRowsByIndexPredicate(rows, i -> i % 2 != 0); } diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java new file mode 100644 index 00000000000..cd5c4eb891b --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.adapter; + +/** + * Adapter clazz for {@code org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}. + */ +public interface SupportsRowLevelDeleteAdapter { + + RowLevelDeleteInfoAdapter applyRowLevelDelete(); + + /** + * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}. + */ + interface RowLevelDeleteInfoAdapter { + } +} diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java new file mode 100644 index 00000000000..6a62763ec5b --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.adapter; + +import org.apache.flink.table.catalog.Column; + +import java.util.List; + +/** + * Adapter clazz for {@code org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}. + */ +public interface SupportsRowLevelUpdateAdapter { + + RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns); + + /** + * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}. + */ + interface RowLevelUpdateInfoAdapter { + } +} diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java new file mode 100644 index 00000000000..cd5c4eb891b --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.adapter; + +/** + * Adapter clazz for {@code org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}. + */ +public interface SupportsRowLevelDeleteAdapter { + + RowLevelDeleteInfoAdapter applyRowLevelDelete(); + + /** + * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}. + */ + interface RowLevelDeleteInfoAdapter { + } +} diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java new file mode 100644 index 00000000000..6a62763ec5b --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.adapter; + +import org.apache.flink.table.catalog.Column; + +import java.util.List; + +/** + * Adapter clazz for {@code org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}. + */ +public interface SupportsRowLevelUpdateAdapter { + + RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns); + + /** + * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}. + */ + interface RowLevelUpdateInfoAdapter { + } +} diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java new file mode 100644 index 00000000000..cd5c4eb891b --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.adapter; + +/** + * Adapter clazz for {@code org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}. + */ +public interface SupportsRowLevelDeleteAdapter { + + RowLevelDeleteInfoAdapter applyRowLevelDelete(); + + /** + * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}. + */ + interface RowLevelDeleteInfoAdapter { + } +} diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java new file mode 100644 index 00000000000..6a62763ec5b --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.adapter; + +import org.apache.flink.table.catalog.Column; + +import java.util.List; + +/** + * Adapter clazz for {@code org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}. + */ +public interface SupportsRowLevelUpdateAdapter { + + RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns); + + /** + * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}. + */ + interface RowLevelUpdateInfoAdapter { + } +} diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java new file mode 100644 index 00000000000..cd5c4eb891b --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.adapter; + +/** + * Adapter clazz for {@code org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}. + */ +public interface SupportsRowLevelDeleteAdapter { + + RowLevelDeleteInfoAdapter applyRowLevelDelete(); + + /** + * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}. + */ + interface RowLevelDeleteInfoAdapter { + } +} diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java new file mode 100644 index 00000000000..6a62763ec5b --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.adapter; + +import org.apache.flink.table.catalog.Column; + +import java.util.List; + +/** + * Adapter clazz for {@code org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}. + */ +public interface SupportsRowLevelUpdateAdapter { + + RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns); + + /** + * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}. + */ + interface RowLevelUpdateInfoAdapter { + } +} diff --git a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java new file mode 100644 index 00000000000..de0019d41bd --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.adapter; + +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; + +import javax.annotation.Nullable; + +/** + * Adapter clazz for {@link org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}. + */ +public interface SupportsRowLevelDeleteAdapter extends SupportsRowLevelDelete { + @Override + default RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context) { + return applyRowLevelDelete(); + } + + RowLevelDeleteInfoAdapter applyRowLevelDelete(); + + /** + * Adapter clazz for {@link SupportsRowLevelDelete.RowLevelDeleteInfo}. + */ + interface RowLevelDeleteInfoAdapter extends RowLevelDeleteInfo { + } +} diff --git a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java new file mode 100644 index 00000000000..17c785d4845 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.adapter; + +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * Adapter clazz for {@link org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}. + */ +public interface SupportsRowLevelUpdateAdapter extends SupportsRowLevelUpdate { + @Override + default RowLevelUpdateInfo applyRowLevelUpdate(List<Column> updatedColumns, @Nullable RowLevelModificationScanContext context) { + return applyRowLevelUpdate(updatedColumns); + } + + RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> updatedColumns); + + /** + * Adapter clazz for {@link SupportsRowLevelUpdate.RowLevelUpdateInfo}. + */ + interface RowLevelUpdateInfoAdapter extends RowLevelUpdateInfo { + } +} diff --git a/packaging/bundle-validation/base/build_flink1170hive313spark332.sh b/packaging/bundle-validation/base/build_flink1170hive313spark332.sh new file mode 100755 index 00000000000..ae4858afcab --- /dev/null +++ b/packaging/bundle-validation/base/build_flink1170hive313spark332.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +docker build \ + --build-arg HIVE_VERSION=3.1.3 \ + --build-arg FLINK_VERSION=1.17.0 \ + --build-arg SPARK_VERSION=3.3.2 \ + --build-arg SPARK_HADOOP_VERSION=3 \ + --build-arg HADOOP_VERSION=3.3.5 \ + -t hudi-ci-bundle-validation-base:flink1170hive313spark332 . +docker image tag hudi-ci-bundle-validation-base:flink1170hive313spark332 apachehudi/hudi-ci-bundle-validation-base:flink1170hive313spark332 diff --git a/packaging/bundle-validation/ci_run.sh b/packaging/bundle-validation/ci_run.sh index 17105d46e5b..42538f7a59f 100755 --- a/packaging/bundle-validation/ci_run.sh +++ b/packaging/bundle-validation/ci_run.sh @@ -88,12 +88,12 @@ elif [[ ${SPARK_RUNTIME} == 'spark3.3.2' ]]; then HADOOP_VERSION=2.7.7 HIVE_VERSION=3.1.3 DERBY_VERSION=10.14.1.0 - FLINK_VERSION=1.15.3 + FLINK_VERSION=1.17.0 SPARK_VERSION=3.3.2 SPARK_HADOOP_VERSION=2 CONFLUENT_VERSION=5.5.12 KAFKA_CONNECT_HDFS_VERSION=10.1.13 - IMAGE_TAG=flink1153hive313spark332 + IMAGE_TAG=flink1170hive313spark332 fi # Copy bundle jars to temp dir for mounting