This is an automated email from the ASF dual-hosted git repository. satish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new e177466 [HUDI-1350] Support Partition level delete API in HUDI (#2254) e177466 is described below commit e177466fd266ebf3a8d371ce1bf2ecf3bdfe90ed Author: lw0090 <lw309637...@gmail.com> AuthorDate: Tue Dec 29 07:01:06 2020 +0800 [HUDI-1350] Support Partition level delete API in HUDI (#2254) * [HUDI-1350] Support Partition level delete API in HUDI * [HUDI-1350] Support Partition level delete API in HUDI base InsertOverwriteCommitAction * [HUDI-1350] Support Partition level delete API in HUDI base InsertOverwriteCommitAction --- .../java/org/apache/hudi/table/HoodieTable.java | 9 ++ .../hudi/table/HoodieFlinkCopyOnWriteTable.java | 5 + .../hudi/table/HoodieJavaCopyOnWriteTable.java | 5 + .../apache/hudi/client/SparkRDDWriteClient.java | 7 ++ .../hudi/table/HoodieSparkCopyOnWriteTable.java | 6 ++ .../SparkDeletePartitionCommitActionExecutor.java | 68 +++++++++++++ .../SparkInsertOverwriteCommitActionExecutor.java | 2 +- ...rkInsertOverwriteTableCommitActionExecutor.java | 6 -- .../TestHoodieClientOnCopyOnWriteStorage.java | 105 ++++++++++++++++++++- .../hudi/testutils/HoodieClientTestUtils.java | 27 ++++-- .../hudi/common/model/WriteOperationType.java | 4 + 11 files changed, 228 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 36cd89a..6b7a7d2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -159,6 +159,15 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem public abstract HoodieWriteMetadata<O> delete(HoodieEngineContext context, String instantTime, K keys); /** + * Deletes all data of partitions. + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param partitions {@link List} of partition to be deleted + * @return HoodieWriteMetadata + */ + public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions); + + /** * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. * <p> * This implementation requires that the input records are already tagged, and de-duped if needed. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 3c4d7fb..d0cb8de 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -85,6 +85,11 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends } @Override + public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) { + throw new HoodieNotSupportedException("DeletePartitions is not supported yet"); + } + + @Override public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> preppedRecords) { return new FlinkUpsertPreppedCommitActionExecutor<>(context, config, this, instantTime, preppedRecords).execute(); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 7f65889..ddc995a 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -85,6 +85,11 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H } @Override + public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) { + throw new HoodieNotSupportedException("Delete partitions is not supported yet"); + } + + @Override public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> preppedRecords) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 18f5309..f7e7690 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -245,6 +245,13 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends return postWrite(result, instantTime, table); } + public HoodieWriteResult deletePartitions(List<String> partitions, String instantTime) { + HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime); + setOperationType(WriteOperationType.DELETE_PARTITION); + HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.deletePartitions(context,instantTime, partitions); + return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); + } + @Override protected JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata<JavaRDD<WriteStatus>> result, String instantTime, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 71085a2..357b5ce 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -50,6 +50,7 @@ import org.apache.hudi.table.action.cluster.SparkClusteringPlanActionExecutor; import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor; +import org.apache.hudi.table.action.commit.SparkDeletePartitionCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor; @@ -109,6 +110,11 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends } @Override + public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) { + return new SparkDeletePartitionCommitActionExecutor(context, config, this, instantTime, partitions).execute(); + } + + @Override public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) { return new SparkUpsertPreppedCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java new file mode 100644 index 0000000..ea1ef51 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.WorkloadStat; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SparkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayload<T>> + extends SparkInsertOverwriteCommitActionExecutor<T> { + + private List<String> partitions; + public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, List<String> partitions) { + super(context, config, table, instantTime,null, WriteOperationType.DELETE_PARTITION); + this.partitions = partitions; + } + + @Override + public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() { + JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); + HoodieTimer timer = new HoodieTimer().startTimer(); + Map<String, List<String>> partitionToReplaceFileIds = jsc.parallelize(partitions, partitions.size()).distinct() + .mapToPair(partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap(); + HoodieWriteMetadata result = new HoodieWriteMetadata(); + result.setPartitionToReplaceFileIds(partitionToReplaceFileIds); + result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer())); + + result.setWriteStatuses(jsc.emptyRDD()); + this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime); + this.commitOnAutoCommit(result); + return result; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java index 1e38220..c5d3c76 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java @@ -77,7 +77,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayl new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap(); } - private List<String> getAllExistingFileIds(String partitionPath) { + protected List<String> getAllExistingFileIds(String partitionPath) { // because new commit is not complete. it is safe to mark all existing file Ids as old files return table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java index e349657..c014515 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecordPayload<T>> extends SparkInsertOverwriteCommitActionExecutor<T> { @@ -47,11 +46,6 @@ public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecor super(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE_TABLE); } - protected List<String> getAllExistingFileIds(String partitionPath) { - return table.getSliceView().getLatestFileSlices(partitionPath) - .map(fg -> fg.getFileId()).distinct().collect(Collectors.toList()); - } - @Override protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) { Map<String, List<String>> partitionToExistingFileIds = new HashMap<>(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index c201efd..e86cb2d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -1103,7 +1103,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { * Test scenario of writing similar number file groups in partition. */ @Test - public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords() throws Exception { + public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords() throws Exception { verifyInsertOverwritePartitionHandling(3000, 3000); } @@ -1144,6 +1144,109 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } /** + * Test scenario of writing fewer file groups for first partition than second an third partition. + */ + @Test + public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition() throws Exception { + verifyDeletePartitionsHandling(1000, 3000, 3000); + } + + /** + * Test scenario of writing similar number file groups in partition. + */ + @Test + public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords() throws Exception { + verifyDeletePartitionsHandling(3000, 3000, 3000); + } + + /** + * Test scenario of writing more file groups for first partition than second an third partition. + */ + @Test + public void verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition() throws Exception { + verifyDeletePartitionsHandling(3000, 1000, 1000); + } + + private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient client, int recordsCount, String commitTime1, String partitionPath) { + client.startCommitWithTime(commitTime1); + List<HoodieRecord> inserts1 = dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath); + JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2); + List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect(); + assertNoWriteErrors(statuses); + Set<String> batchBuckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet()); + verifyRecordsWritten(commitTime1, inserts1, statuses); + return batchBuckets; + } + + private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client, String commitTime, List<String> deletePartitionPath) { + client.startCommitWithTime(commitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); + HoodieWriteResult writeResult = client.deletePartitions(deletePartitionPath, commitTime); + Set<String> deletePartitionReplaceFileIds = + writeResult.getPartitionToReplaceFileIds().entrySet() + .stream().flatMap(entry -> entry.getValue().stream()).collect(Collectors.toSet()); + return deletePartitionReplaceFileIds; + } + + /** + * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for first partition. + * 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for second partition. + * 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for third partition. + * 4) delete first partition and check result. + * 5) delete second and third partition and check result. + * + */ + private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2RecordsCount, int batch3RecordsCount) throws Exception { + HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false); + SparkRDDWriteClient client = getHoodieWriteClient(config, false); + dataGen = new HoodieTestDataGenerator(); + + // Do Inserts for DEFAULT_FIRST_PARTITION_PATH + String commitTime1 = "001"; + Set<String> batch1Buckets = + this.insertPartitionRecordsWithCommit(client, batch1RecordsCount, commitTime1, DEFAULT_FIRST_PARTITION_PATH); + + // Do Inserts for DEFAULT_SECOND_PARTITION_PATH + String commitTime2 = "002"; + Set<String> batch2Buckets = + this.insertPartitionRecordsWithCommit(client, batch2RecordsCount, commitTime2, DEFAULT_SECOND_PARTITION_PATH); + + // Do Inserts for DEFAULT_THIRD_PARTITION_PATH + String commitTime3 = "003"; + Set<String> batch3Buckets = + this.insertPartitionRecordsWithCommit(client, batch3RecordsCount, commitTime3, DEFAULT_THIRD_PARTITION_PATH); + + // delete DEFAULT_FIRST_PARTITION_PATH + String commitTime4 = "004"; + Set<String> deletePartitionReplaceFileIds1 = + deletePartitionWithCommit(client, commitTime4, Arrays.asList(DEFAULT_FIRST_PARTITION_PATH)); + assertEquals(batch1Buckets, deletePartitionReplaceFileIds1); + List<HoodieBaseFile> baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs, + String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH)); + assertEquals(0, baseFiles.size()); + baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs, + String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH)); + assertTrue(baseFiles.size() > 0); + baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs, + String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH)); + assertTrue(baseFiles.size() > 0); + + // delete DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH + String commitTime5 = "005"; + Set<String> deletePartitionReplaceFileIds2 = + deletePartitionWithCommit(client, commitTime5, Arrays.asList(DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH)); + Set<String> expectedFileId = new HashSet<>(); + expectedFileId.addAll(batch2Buckets); + expectedFileId.addAll(batch3Buckets); + assertEquals(expectedFileId, deletePartitionReplaceFileIds2); + + baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs, + String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH), + String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH), + String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH)); + assertEquals(0, baseFiles.size()); + } + + /** * Verify data in parquet files matches expected records and commit time. */ private void verifyRecordsWritten(String commitTime, List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 307e068..c91b51b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -147,6 +147,22 @@ public class HoodieClientTestUtils { } } + public static List<HoodieBaseFile> getLatestBaseFiles(String basePath, FileSystem fs, + String... paths) { + List<HoodieBaseFile> latestFiles = new ArrayList<>(); + try { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); + for (String path : paths) { + BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, + metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path))); + latestFiles.addAll(fileSystemView.getLatestBaseFiles().collect(Collectors.toList())); + } + } catch (Exception e) { + throw new HoodieException("Error reading hoodie table as a dataframe", e); + } + return latestFiles; + } + /** * Reads the paths under the a hoodie table out as a DataFrame. */ @@ -154,14 +170,9 @@ public class HoodieClientTestUtils { String... paths) { List<String> filteredPaths = new ArrayList<>(); try { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); - for (String path : paths) { - BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, - metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path))); - List<HoodieBaseFile> latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList()); - for (HoodieBaseFile file : latestFiles) { - filteredPaths.add(file.getPath()); - } + List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, fs, paths); + for (HoodieBaseFile file : latestFiles) { + filteredPaths.add(file.getPath()); } return sqlContext.read().parquet(filteredPaths.toArray(new String[filteredPaths.size()])); } catch (Exception e) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index 39f0f62..f237156 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -42,6 +42,8 @@ public enum WriteOperationType { INSERT_OVERWRITE("insert_overwrite"), // cluster CLUSTER("cluster"), + // delete partition + DELETE_PARTITION("delete_partition"), // insert overwrite with dynamic partitioning INSERT_OVERWRITE_TABLE("insert_overwrite_table"), // used for old version @@ -74,6 +76,8 @@ public enum WriteOperationType { return DELETE; case "insert_overwrite": return INSERT_OVERWRITE; + case "delete_partition": + return DELETE_PARTITION; case "insert_overwrite_table": return INSERT_OVERWRITE_TABLE; case "cluster":