This is an automated email from the ASF dual-hosted git repository. akudinkin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new e3b95e88a76 [HUDI-5678] Fix `deduceShuffleParallelism` in row-writing Bulk Insert helper (#7818) e3b95e88a76 is described below commit e3b95e88a7645b3a03bf109671e69e354079fe5d Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Thu Feb 2 11:50:48 2023 -0500 [HUDI-5678] Fix `deduceShuffleParallelism` in row-writing Bulk Insert helper (#7818) `deduceShuffleParallelism` returns 0 in some situations which should never occur. --- .../hudi/HoodieDatasetBulkInsertHelper.scala | 4 +- .../org/apache/spark/sql/HoodieUnsafeUtils.scala | 20 ++++++- .../apache/spark/sql/hudi/TestInsertTable.scala | 66 ++++++++++++++++++++++ 3 files changed, 85 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 7e235993e33..a6488b07b51 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -34,7 +34,7 @@ import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue} -import org.apache.spark.sql.HoodieUnsafeUtils.getOutputPartitioning +import org.apache.spark.sql.HoodieUnsafeUtils.getNumPartitions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.logical.Project @@ -45,7 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter} object HoodieDatasetBulkInsertHelper - extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df => getOutputPartitioning(df).numPartitions)) with Logging { + extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df => getNumPartitions(df))) with Logging { /** * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following steps: diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala index dfd416b6f52..ee22f714c9c 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala @@ -22,7 +22,8 @@ import org.apache.hudi.HoodieUnsafeRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair @@ -38,8 +39,21 @@ object HoodieUnsafeUtils { * but instead will just execute Spark resolution, optimization and actual execution planning stages * returning instance of [[SparkPlan]] ready for execution */ - def getOutputPartitioning(df: DataFrame): Partitioning = - df.queryExecution.executedPlan.outputPartitioning + def getNumPartitions(df: DataFrame): Int = { + // NOTE: In general we'd rely on [[outputPartitioning]] of the executable [[SparkPlan]] to determine + // number of partitions plan is going to be executed with. + // However in case of [[LogicalRDD]] plan's output-partitioning will be stubbed as [[UnknownPartitioning]] + // and therefore we will be falling back to determine number of partitions by looking at the RDD itself + df.queryExecution.logical match { + case LogicalRDD(_, rdd, outputPartitioning, _, _) => + outputPartitioning match { + case _: UnknownPartitioning => rdd.getNumPartitions + case _ => outputPartitioning.numPartitions + } + + case _ => df.queryExecution.executedPlan.outputPartitioning.numPartitions + } + } /** * Creates [[DataFrame]] from provided [[plan]] diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index b092a68e20d..b33deebdf72 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -26,9 +26,11 @@ import org.apache.hudi.common.model.WriteOperationType import org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieDuplicateKeyException +import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.spark.sql.SaveMode import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata +import org.scalatest.Inspectors.forAll import java.io.File @@ -1059,4 +1061,68 @@ class TestInsertTable extends HoodieSparkSqlTestBase { ) } } + + /** + * This test is to make sure that bulk insert doesn't create a bunch of tiny files if + * hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start with the partition columns + * + * NOTE: Additionally, this test serves as a smoke test making sure that all of the bulk-insert + * modes work + */ + test(s"Test Bulk Insert with all sort-modes") { + withTempDir { basePath => + BulkInsertSortMode.values().foreach { sortMode => + val tableName = generateTableName + // Remove these with [HUDI-5419] + spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") + spark.sessionState.conf.unsetConf("hoodie.datasource.write.insert.drop.duplicates") + spark.sessionState.conf.unsetConf("hoodie.merge.allow.duplicate.on.inserts") + spark.sessionState.conf.unsetConf("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled") + // Default parallelism is 200 which means in global sort, each record will end up in a different spark partition so + // 9 files would be created. Setting parallelism to 3 so that each spark partition will contain a hudi partition. + val parallelism = if (sortMode.name.equals(BulkInsertSortMode.GLOBAL_SORT.name())) { + "hoodie.bulkinsert.shuffle.parallelism = 3," + } else { + "" + } + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | dt string + |) using hudi + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'name', + | type = 'cow', + | $parallelism + | hoodie.bulkinsert.sort.mode = '${sortMode.name}' + | ) + | partitioned by (dt) + | location '${basePath.getCanonicalPath}/$tableName' + """.stripMargin) + + spark.sql("set hoodie.sql.bulk.insert.enable = true") + spark.sql("set hoodie.sql.insert.mode = non-strict") + + spark.sql( + s"""insert into $tableName values + |(5, 'a', 35, '2021-05-21'), + |(1, 'a', 31, '2021-01-21'), + |(3, 'a', 33, '2021-03-21'), + |(4, 'b', 16, '2021-05-21'), + |(2, 'b', 18, '2021-01-21'), + |(6, 'b', 17, '2021-03-21'), + |(8, 'a', 21, '2021-05-21'), + |(9, 'a', 22, '2021-01-21'), + |(7, 'a', 23, '2021-03-21') + |""".stripMargin) + + // TODO re-enable + //assertResult(3)(spark.sql(s"select distinct _hoodie_file_name from $tableName").count()) + } + } + } }