This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e346f18162426646e12d40958051af49f081a30b Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Tue Dec 13 10:22:10 2022 +0530 [HUDI-5351] Handle populateMetaFields when repartitioning in sort partitioner (#7411) --- .../MultipleSparkJobExecutionStrategy.java | 6 +- .../BulkInsertInternalPartitionerFactory.java | 20 +++--- ...lkInsertInternalPartitionerWithRowsFactory.java | 19 ++--- .../bulkinsert/GlobalSortPartitioner.java | 14 ++++ .../bulkinsert/GlobalSortPartitionerWithRows.java | 14 ++++ ...PartitionPathRepartitionAndSortPartitioner.java | 12 +++- ...nPathRepartitionAndSortPartitionerWithRows.java | 12 +++- .../PartitionPathRepartitionPartitioner.java | 12 +++- ...artitionPathRepartitionPartitionerWithRows.java | 12 +++- .../PartitionSortPartitionerWithRows.java | 14 ++++ .../bulkinsert/RDDPartitionSortPartitioner.java | 14 ++++ .../TestBulkInsertInternalPartitioner.java | 83 ++++++++++++++-------- .../TestBulkInsertInternalPartitionerForRows.java | 69 ++++++++++++------ .../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +- 14 files changed, 223 insertions(+), 83 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index e37eb4316d0..275f954781e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -206,10 +206,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy)); } }).orElse(isRowPartitioner - ? BulkInsertInternalPartitionerWithRowsFactory.get( - getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true) - : BulkInsertInternalPartitionerFactory.get( - getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true)); + ? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig(), getHoodieTable().isPartitioned(), true) + : BulkInsertInternalPartitionerFactory.get(getHoodieTable(), getWriteConfig(), true)); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java index 900d2729f10..c854d72c5f9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java @@ -30,32 +30,30 @@ import org.apache.hudi.table.HoodieTable; public abstract class BulkInsertInternalPartitionerFactory { public static BulkInsertPartitioner get(HoodieTable table, HoodieWriteConfig config) { - return get(config.getBulkInsertSortMode(), table.isPartitioned(), false); + return get(table, config, false); } public static BulkInsertPartitioner get( HoodieTable table, HoodieWriteConfig config, boolean enforceNumOutputPartitions) { - return get(config.getBulkInsertSortMode(), table.isPartitioned(), enforceNumOutputPartitions); + return get(config, table.isPartitioned(), enforceNumOutputPartitions); } - public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, boolean isTablePartitioned) { - return get(sortMode, isTablePartitioned, false); - } - - public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, + public static BulkInsertPartitioner get(HoodieWriteConfig config, boolean isTablePartitioned, boolean enforceNumOutputPartitions) { + BulkInsertSortMode sortMode = config.getBulkInsertSortMode(); + switch (sortMode) { case NONE: return new NonSortPartitioner(enforceNumOutputPartitions); case GLOBAL_SORT: - return new GlobalSortPartitioner(); + return new GlobalSortPartitioner(config); case PARTITION_SORT: - return new RDDPartitionSortPartitioner(); + return new RDDPartitionSortPartitioner(config); case PARTITION_PATH_REPARTITION: - return new PartitionPathRepartitionPartitioner(isTablePartitioned); + return new PartitionPathRepartitionPartitioner(isTablePartitioned, config); case PARTITION_PATH_REPARTITION_AND_SORT: - return new PartitionPathRepartitionAndSortPartitioner(isTablePartitioned); + return new PartitionPathRepartitionAndSortPartitioner(isTablePartitioned, config); default: throw new HoodieException("The bulk insert sort mode \"" + sortMode.name() + "\" is not supported."); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java index 218eae0dc94..07995e50d6a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java @@ -18,6 +18,7 @@ package org.apache.hudi.execution.bulkinsert; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.sql.Dataset; @@ -29,24 +30,26 @@ import org.apache.spark.sql.Row; */ public abstract class BulkInsertInternalPartitionerWithRowsFactory { - public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode sortMode, + public static BulkInsertPartitioner<Dataset<Row>> get(HoodieWriteConfig config, boolean isTablePartitioned) { - return get(sortMode, isTablePartitioned, false); + return get(config, isTablePartitioned, false); } - public static BulkInsertPartitioner<Dataset<Row>> get( - BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean enforceNumOutputPartitions) { + public static BulkInsertPartitioner<Dataset<Row>> get(HoodieWriteConfig config, + boolean isTablePartitioned, + boolean enforceNumOutputPartitions) { + BulkInsertSortMode sortMode = config.getBulkInsertSortMode(); switch (sortMode) { case NONE: return new NonSortPartitionerWithRows(enforceNumOutputPartitions); case GLOBAL_SORT: - return new GlobalSortPartitionerWithRows(); + return new GlobalSortPartitionerWithRows(config); case PARTITION_SORT: - return new PartitionSortPartitionerWithRows(); + return new PartitionSortPartitionerWithRows(config); case PARTITION_PATH_REPARTITION: - return new PartitionPathRepartitionPartitionerWithRows(isTablePartitioned); + return new PartitionPathRepartitionPartitionerWithRows(isTablePartitioned, config); case PARTITION_PATH_REPARTITION_AND_SORT: - return new PartitionPathRepartitionAndSortPartitionerWithRows(isTablePartitioned); + return new PartitionPathRepartitionAndSortPartitionerWithRows(isTablePartitioned, config); default: throw new UnsupportedOperationException("The bulk insert sort mode \"" + sortMode.name() + "\" is not supported."); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java index a184c009a1b..e10d23743da 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java @@ -20,10 +20,14 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.api.java.JavaRDD; +import static org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.GLOBAL_SORT; + /** * A built-in partitioner that does global sorting for the input records across partitions * after repartition for bulk insert operation, corresponding to the @@ -34,9 +38,19 @@ import org.apache.spark.api.java.JavaRDD; public class GlobalSortPartitioner<T extends HoodieRecordPayload> implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> { + private final boolean shouldPopulateMetaFields; + + public GlobalSortPartitioner(HoodieWriteConfig config) { + this.shouldPopulateMetaFields = config.populateMetaFields(); + } + @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { + if (!shouldPopulateMetaFields) { + throw new HoodieException(GLOBAL_SORT.name() + " mode requires meta-fields to be enabled"); + } + // Now, sort the records and line them up nicely for loading. return records.sortBy(record -> { // Let's use "partitionPath + key" as the sort key. Spark, will ensure diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java index 24bcc0aff0d..1a8e87a3823 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java @@ -19,19 +19,33 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.functions; +import static org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.GLOBAL_SORT; + /** * A built-in partitioner that does global sorting for the input Rows across partitions after repartition for bulk insert operation, corresponding to the {@code BulkInsertSortMode.GLOBAL_SORT} mode. */ public class GlobalSortPartitionerWithRows implements BulkInsertPartitioner<Dataset<Row>> { + private final boolean shouldPopulateMetaFields; + + public GlobalSortPartitionerWithRows(HoodieWriteConfig config) { + this.shouldPopulateMetaFields = config.populateMetaFields(); + } + @Override public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputSparkPartitions) { + if (!shouldPopulateMetaFields) { + throw new HoodieException(GLOBAL_SORT.name() + " mode requires meta-fields to be enabled"); + } + // Now, sort the records and line them up nicely for loading. // Let's use "partitionPath + key" as the sort key. return rows.sort(functions.col(HoodieRecord.PARTITION_PATH_METADATA_FIELD), functions.col(HoodieRecord.RECORD_KEY_METADATA_FIELD)) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java index e8e1e2072f5..a47f1f9df43 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java @@ -21,12 +21,16 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.api.java.JavaRDD; import scala.Tuple2; +import static org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT; + /** * A built-in partitioner that does the following for input records for bulk insert operation * <p> @@ -45,14 +49,20 @@ public class PartitionPathRepartitionAndSortPartitioner<T extends HoodieRecordPa implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> { private final boolean isTablePartitioned; + private final boolean shouldPopulateMetaFields; - public PartitionPathRepartitionAndSortPartitioner(boolean isTablePartitioned) { + public PartitionPathRepartitionAndSortPartitioner(boolean isTablePartitioned, HoodieWriteConfig config) { this.isTablePartitioned = isTablePartitioned; + this.shouldPopulateMetaFields = config.populateMetaFields(); } @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { + if (!shouldPopulateMetaFields) { + throw new HoodieException(PARTITION_PATH_REPARTITION_AND_SORT.name() + " mode requires meta-fields to be enabled"); + } + if (isTablePartitioned) { PartitionPathRDDPartitioner partitioner = new PartitionPathRDDPartitioner( (partitionPath) -> (String) partitionPath, outputSparkPartitions); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java index cf3ff1acfa4..ff505624913 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java @@ -20,12 +20,16 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import static org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT; + /** * A built-in partitioner that does the following for input rows for bulk insert operation * <p> @@ -41,13 +45,19 @@ import org.apache.spark.sql.Row; public class PartitionPathRepartitionAndSortPartitionerWithRows implements BulkInsertPartitioner<Dataset<Row>> { private final boolean isTablePartitioned; + private final boolean shouldPopulateMetaFields; - public PartitionPathRepartitionAndSortPartitionerWithRows(boolean isTablePartitioned) { + public PartitionPathRepartitionAndSortPartitionerWithRows(boolean isTablePartitioned, HoodieWriteConfig config) { this.isTablePartitioned = isTablePartitioned; + this.shouldPopulateMetaFields = config.populateMetaFields(); } @Override public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputSparkPartitions) { + if (!shouldPopulateMetaFields) { + throw new HoodieException(PARTITION_PATH_REPARTITION_AND_SORT.name() + " mode requires meta-fields to be enabled"); + } + if (isTablePartitioned) { return rows.repartition(outputSparkPartitions, new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD)) .sortWithinPartitions(new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD)); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java index 5931b565757..393e47a61e3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java @@ -21,12 +21,16 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.api.java.JavaRDD; import scala.Tuple2; +import static org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION; + /** * A built-in partitioner that does the following for input records for bulk insert operation * <p> @@ -44,14 +48,20 @@ public class PartitionPathRepartitionPartitioner<T extends HoodieRecordPayload> implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> { private final boolean isTablePartitioned; + private final boolean shouldPopulateMetaFields; - public PartitionPathRepartitionPartitioner(boolean isTablePartitioned) { + public PartitionPathRepartitionPartitioner(boolean isTablePartitioned, HoodieWriteConfig config) { this.isTablePartitioned = isTablePartitioned; + this.shouldPopulateMetaFields = config.populateMetaFields(); } @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { + if (!shouldPopulateMetaFields) { + throw new HoodieException(PARTITION_PATH_REPARTITION.name() + " mode requires meta-fields to be enabled"); + } + if (isTablePartitioned) { PartitionPathRDDPartitioner partitioner = new PartitionPathRDDPartitioner( (partitionPath) -> (String) partitionPath, outputSparkPartitions); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java index 62d9edbca84..966648d579c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java @@ -20,12 +20,16 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import static org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION; + /** * A built-in partitioner that does the following for input rows for bulk insert operation * <p> @@ -40,13 +44,19 @@ import org.apache.spark.sql.Row; public class PartitionPathRepartitionPartitionerWithRows implements BulkInsertPartitioner<Dataset<Row>> { private final boolean isTablePartitioned; + private final boolean shouldPopulateMetaFields; - public PartitionPathRepartitionPartitionerWithRows(boolean isTablePartitioned) { + public PartitionPathRepartitionPartitionerWithRows(boolean isTablePartitioned, HoodieWriteConfig config) { this.isTablePartitioned = isTablePartitioned; + this.shouldPopulateMetaFields = config.populateMetaFields(); } @Override public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputSparkPartitions) { + if (!shouldPopulateMetaFields) { + throw new HoodieException(PARTITION_PATH_REPARTITION.name() + " mode requires meta-fields to be enabled"); + } + if (isTablePartitioned) { return rows.repartition(outputSparkPartitions, new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java index b669c338f86..c5c21f14a8e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java @@ -19,18 +19,32 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import static org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_SORT; + /** * A built-in partitioner that does local sorting for each spark partitions after coalesce for bulk insert operation, corresponding to the {@code BulkInsertSortMode.PARTITION_SORT} mode. */ public class PartitionSortPartitionerWithRows implements BulkInsertPartitioner<Dataset<Row>> { + private final boolean shouldPopulateMetaFields; + + public PartitionSortPartitionerWithRows(HoodieWriteConfig config) { + this.shouldPopulateMetaFields = config.populateMetaFields(); + } + @Override public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputSparkPartitions) { + if (!shouldPopulateMetaFields) { + throw new HoodieException(PARTITION_SORT.name() + " mode requires meta-fields to be enabled"); + } + return rows.coalesce(outputSparkPartitions).sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java index 9526ad58564..b5dc83cc2fa 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java @@ -20,6 +20,8 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.api.java.JavaRDD; @@ -30,6 +32,8 @@ import java.util.List; import scala.Tuple2; +import static org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_SORT; + /** * A built-in partitioner that does local sorting for each RDD partition * after coalesce for bulk insert operation, corresponding to the @@ -40,9 +44,19 @@ import scala.Tuple2; public class RDDPartitionSortPartitioner<T extends HoodieRecordPayload> implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> { + private final boolean shouldPopulateMetaFields; + + public RDDPartitionSortPartitioner(HoodieWriteConfig config) { + this.shouldPopulateMetaFields = config.populateMetaFields(); + } + @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { + if (!shouldPopulateMetaFields) { + throw new HoodieException(PARTITION_SORT.name() + " mode requires meta-fields to be enabled"); + } + return records.coalesce(outputSparkPartitions) .mapToPair(record -> new Tuple2<>( diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index 7bc64b54457..11420c180eb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.testutils.HoodieClientTestBase; @@ -48,6 +49,7 @@ import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase implements Serializable { private static final Comparator<HoodieRecord<? extends HoodieRecordPayload>> KEY_COMPARATOR = @@ -83,16 +85,21 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl // boolean isTablePartitioned, // boolean enforceNumOutputPartitions, // boolean isGloballySorted, - // boolean isLocallySorted + // boolean isLocallySorted, + // boolean populateMetaFields Object[][] data = new Object[][] { - {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true}, - {BulkInsertSortMode.PARTITION_SORT, true, true, false, true}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false}, - {BulkInsertSortMode.NONE, true, true, false, false}, - {BulkInsertSortMode.NONE, true, false, false, false} + {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, true}, + {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false, true}, + {BulkInsertSortMode.NONE, true, true, false, false, true}, + {BulkInsertSortMode.NONE, true, false, false, false, true}, + {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, false}, + {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false, false} }; return Stream.of(data).map(Arguments::of); } @@ -109,7 +116,8 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted, - Map<String, Long> expectedPartitionNumRecords) { + Map<String, Long> expectedPartitionNumRecords, + boolean populateMetaFields) { testBulkInsertInternalPartitioner( partitioner, records, @@ -117,7 +125,8 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl isGloballySorted, isLocallySorted, expectedPartitionNumRecords, - Option.empty()); + Option.empty(), + populateMetaFields); } private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, @@ -126,8 +135,13 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl boolean isGloballySorted, boolean isLocallySorted, Map<String, Long> expectedPartitionNumRecords, - Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) { + Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator, + boolean populateMetaFields) { int numPartitions = 2; + if (!populateMetaFields) { + assertThrows(HoodieException.class, () -> partitioner.repartitionRecords(records, numPartitions)); + return; + } JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> actualRecords = (JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>) partitioner.repartitionRecords(records, numPartitions); assertEquals( @@ -163,25 +177,35 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl boolean isTablePartitioned, boolean enforceNumOutputPartitions, boolean isGloballySorted, - boolean isLocallySorted) { + boolean isLocallySorted, + boolean populateMetaFields) { JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc); + + HoodieWriteConfig config = HoodieWriteConfig + .newBuilder() + .withPath("/") + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withBulkInsertSortMode(sortMode.name()) + .withPopulateMetaFields(populateMetaFields) + .build(); + testBulkInsertInternalPartitioner( - BulkInsertInternalPartitionerFactory.get( - sortMode, isTablePartitioned, enforceNumOutputPartitions), + BulkInsertInternalPartitionerFactory.get(config, isTablePartitioned, enforceNumOutputPartitions), records1, enforceNumOutputPartitions, isGloballySorted, isLocallySorted, - generateExpectedPartitionNumRecords(records1)); + generateExpectedPartitionNumRecords(records1), + populateMetaFields); testBulkInsertInternalPartitioner( - BulkInsertInternalPartitionerFactory.get( - sortMode, isTablePartitioned, enforceNumOutputPartitions), + BulkInsertInternalPartitionerFactory.get(config, isTablePartitioned, enforceNumOutputPartitions), records2, enforceNumOutputPartitions, isGloballySorted, isLocallySorted, - generateExpectedPartitionNumRecords(records2)); + generateExpectedPartitionNumRecords(records2), + populateMetaFields); } @Test @@ -193,22 +217,21 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), - records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator), true); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), - records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator), true); HoodieWriteConfig config = HoodieWriteConfig - .newBuilder() - .withPath("/") - .withSchema(TRIP_EXAMPLE_SCHEMA) - .withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName()) - .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) - .build(); + .newBuilder() + .withPath("/") + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName()) + .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) + .build(); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), - records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator), true); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), - records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); - + records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator), true); } private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, String[] sortColumns) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java index de827f7a450..6332e00ba67 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.SparkDatasetTestUtils; @@ -44,7 +45,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Stream; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; /** * Unit tests {@link BulkInsertPartitioner}s with Rows. @@ -53,6 +56,7 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa private static final Comparator<Row> KEY_COMPARATOR = Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD))); + @BeforeEach public void setUp() throws Exception { initSparkContexts("TestBulkInsertInternalPartitionerForRows"); @@ -71,16 +75,21 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa // boolean isTablePartitioned, // boolean enforceNumOutputPartitions, // boolean isGloballySorted, - // boolean isLocallySorted + // boolean isLocallySorted, + // boolean populateMetaFields Object[][] data = new Object[][] { - {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true}, - {BulkInsertSortMode.PARTITION_SORT, true, true, false, true}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false}, - {BulkInsertSortMode.NONE, true, true, false, false}, - {BulkInsertSortMode.NONE, true, false, false, false} + {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, true}, + {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false, true}, + {BulkInsertSortMode.NONE, true, true, false, false, true}, + {BulkInsertSortMode.NONE, true, false, false, false, true}, + {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, false}, + {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false, false, false}, }; return Stream.of(data).map(Arguments::of); } @@ -91,28 +100,37 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa boolean isTablePartitioned, boolean enforceNumOutputPartitions, boolean isGloballySorted, - boolean isLocallySorted) - throws Exception { + boolean isLocallySorted, + boolean populateMetaFields) { Dataset<Row> records1 = generateTestRecords(); Dataset<Row> records2 = generateTestRecords(); + + HoodieWriteConfig config = HoodieWriteConfig + .newBuilder() + .withPath("/") + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withBulkInsertSortMode(sortMode.name()) + .withPopulateMetaFields(populateMetaFields) + .build(); + testBulkInsertInternalPartitioner( - BulkInsertInternalPartitionerWithRowsFactory.get( - sortMode, isTablePartitioned, enforceNumOutputPartitions), + BulkInsertInternalPartitionerWithRowsFactory.get(config, isTablePartitioned, enforceNumOutputPartitions), records1, enforceNumOutputPartitions, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1), - Option.empty()); + Option.empty(), + populateMetaFields); testBulkInsertInternalPartitioner( - BulkInsertInternalPartitionerWithRowsFactory.get( - sortMode, isTablePartitioned, enforceNumOutputPartitions), + BulkInsertInternalPartitionerWithRowsFactory.get(config, isTablePartitioned, enforceNumOutputPartitions), records2, enforceNumOutputPartitions, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2), - Option.empty()); + Option.empty(), + populateMetaFields); } @Test @@ -124,9 +142,9 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa Comparator<Row> comparator = getCustomColumnComparator(sortColumns); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), - records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); + records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator), true); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), - records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); + records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator), true); HoodieWriteConfig config = HoodieWriteConfig .newBuilder() @@ -135,9 +153,9 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) .build(); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), - records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); + records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator), true); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), - records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); + records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator), true); } private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, @@ -146,8 +164,13 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa boolean isGloballySorted, boolean isLocallySorted, Map<String, Long> expectedPartitionNumRecords, - Option<Comparator<Row>> comparator) { + Option<Comparator<Row>> comparator, + boolean populateMetaFields) { int numPartitions = 2; + if (!populateMetaFields) { + assertThrows(HoodieException.class, () -> partitioner.repartitionRecords(rows, numPartitions)); + return; + } Dataset<Row> actualRecords = (Dataset<Row>) partitioner.repartitionRecords(rows, numPartitions); assertEquals( enforceNumOutputPartitions ? numPartitions : rows.rdd().getNumPartitions(), @@ -198,7 +221,7 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa private void verifyRowsAscendingOrder(List<Row> records, Option<Comparator<Row>> comparator) { List<Row> expectedRecords = new ArrayList<>(records); - Collections.sort(expectedRecords,comparator.orElse(KEY_COMPARATOR)); + Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR)); assertEquals(expectedRecords, records); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 003dbd35e11..d8f9ab8435b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -639,8 +639,7 @@ object HoodieSparkSqlWriter { if (userDefinedBulkInsertPartitionerOpt.isPresent) { userDefinedBulkInsertPartitionerOpt.get } else { - BulkInsertInternalPartitionerWithRowsFactory.get( - writeConfig.getBulkInsertSortMode, isTablePartitioned) + BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, isTablePartitioned) } } else { // Sort modes are not yet supported when meta fields are disabled @@ -886,7 +885,7 @@ object HoodieSparkSqlWriter { } private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String], - tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = { + tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = { val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams) val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions) if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)