This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.14.0-siva-0.14.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 790903712ecd5ee65850673141227698ea0ced26 Author: bhat-vinay <152183592+bhat-vi...@users.noreply.github.com> AuthorDate: Mon Dec 11 22:08:30 2023 +0530 [HUDI-7040] Handle dropping of partition columns in BulkInsertDataInternalWriterHelper::write(...) (#10272) Issue: There are two configs which when set in a certain manner throws exceptions or asserts 1. Configs to disable populating metadata fields (for each row) 2. Configs to drop partition columns (to save storage space) from a row With #1 and #2, partition paths cannot be deduced using partition columns (as the partition columns are dropped higher up the stack. BulkInsertDataInternalWriterHelper::write(...) relied on metadata fields to extract partition path in such cases. But with #1 it is not possible resulting in asserts/exceptions. The fix is to push down the dropping of partition columns down the stack after partition path is computed. The fix manipulates the raw 'InternalRow' row structure by only copying the relevent fields into a new 'InternalRow' structure. Each row is processed individually to drop the partition columns and copy it a to new 'InternalRow' Co-authored-by: Vinaykumar Bhat <vi...@onehouse.ai> --- .../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++ .../commit/BulkInsertDataInternalWriterHelper.java | 34 ++++++++++++++- .../hudi/HoodieDatasetBulkInsertHelper.scala | 31 +++++--------- .../BaseDatasetBulkInsertCommitActionExecutor.java | 3 +- .../TestHoodieDatasetBulkInsertHelper.java | 12 +++--- .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 48 +++++++++++++++++++++- 6 files changed, 101 insertions(+), 31 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2524d7ef904..0cf1f287976 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1345,6 +1345,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE); } + public boolean shouldDropPartitionColumns() { + return getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS); + } + public String getWriteStatusClassName() { return getString(WRITE_STATUS_CLASS_NAME); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java index 7f6054b2296..0773e8a5a0a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.HoodieDatasetBulkInsertHelper; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; @@ -38,11 +39,16 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.UUID; +import scala.collection.JavaConversions; +import scala.collection.JavaConverters; + /** * Helper class for HoodieBulkInsertDataInternalWriter used by Spark datasource v2. */ @@ -124,7 +130,33 @@ public class BulkInsertDataInternalWriterHelper { lastKnownPartitionPath = partitionPath.clone(); } - handle.write(row); + boolean shouldDropPartitionColumns = writeConfig.shouldDropPartitionColumns(); + if (shouldDropPartitionColumns) { + // Drop the partition columns from the row + // Using the deprecated JavaConversions to be compatible with scala versions < 2.12. Once hudi support for scala versions < 2.12 is + // stopped, can move this to JavaConverters.seqAsJavaList(...) + List<String> partitionCols = JavaConversions.<String>seqAsJavaList(HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig)); + Set<Integer> partitionIdx = new HashSet<Integer>(); + for (String col : partitionCols) { + partitionIdx.add(this.structType.fieldIndex(col)); + } + + // Relies on InternalRow::toSeq(...) preserving the column ordering based on the supplied schema + // Using the deprecated JavaConversions to be compatible with scala versions < 2.12. + List<Object> cols = JavaConversions.<Object>seqAsJavaList(row.toSeq(structType)); + int idx = 0; + List<Object> newCols = new ArrayList<Object>(); + for (Object o : cols) { + if (!partitionIdx.contains(idx)) { + newCols.add(o); + } + idx += 1; + } + InternalRow newRow = InternalRow.fromSeq(JavaConverters.<Object>asScalaIteratorConverter(newCols.iterator()).asScala().toSeq()); + handle.write(newRow); + } else { + handle.write(row); + } } catch (Throwable t) { LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t); throw t; diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 12e446d7be6..75ec069946d 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -62,7 +62,6 @@ object HoodieDatasetBulkInsertHelper def prepareForBulkInsert(df: DataFrame, config: HoodieWriteConfig, partitioner: BulkInsertPartitioner[Dataset[Row]], - shouldDropPartitionColumns: Boolean, instantTime: String): Dataset[Row] = { val populateMetaFields = config.populateMetaFields() val schema = df.schema @@ -128,16 +127,10 @@ object HoodieDatasetBulkInsertHelper HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery) } - val trimmedDF = if (shouldDropPartitionColumns) { - dropPartitionColumns(updatedDF, config) - } else { - updatedDF - } - val targetParallelism = - deduceShuffleParallelism(trimmedDF, config.getBulkInsertShuffleParallelism) + deduceShuffleParallelism(updatedDF, config.getBulkInsertShuffleParallelism) - partitioner.repartitionRecords(trimmedDF, targetParallelism) + partitioner.repartitionRecords(updatedDF, targetParallelism) } /** @@ -243,21 +236,17 @@ object HoodieDatasetBulkInsertHelper } } - private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = { - val partitionPathFields = getPartitionPathFields(config).toSet - val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.')) - if (nestedPartitionPathFields.nonEmpty) { - logWarning(s"Can not drop nested partition path fields: $nestedPartitionPathFields") - } - - val partitionPathCols = (partitionPathFields -- nestedPartitionPathFields).toSeq - - df.drop(partitionPathCols: _*) - } - private def getPartitionPathFields(config: HoodieWriteConfig): Seq[String] = { val keyGeneratorClassName = config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME) val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator] keyGenerator.getPartitionPathFields.asScala } + + def getPartitionPathCols(config: HoodieWriteConfig): Seq[String] = { + val partitionPathFields = getPartitionPathFields(config).toSet + val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.')) + + return (partitionPathFields -- nestedPartitionPathFields).toSeq + } + } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java index fb0218137d2..1e20e4ab663 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java @@ -95,8 +95,7 @@ public abstract class BaseDatasetBulkInsertCommitActionExecutor implements Seria table = writeClient.initTable(getWriteOperationType(), Option.ofNullable(instantTime)); BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows = getPartitioner(populateMetaFields, isTablePartitioned); - boolean shouldDropPartitionColumns = writeConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS()); - Dataset<Row> hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns, instantTime); + Dataset<Row> hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, bulkInsertPartitionerRows, instantTime); preExecute(); HoodieWriteMetadata<JavaRDD<WriteStatus>> result = buildHoodieWriteMetadata(doExecute(hoodieDF, bulkInsertPartitionerRows.arePartitionRecordsSorted())); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java index 8166820cb87..1c21c9a5253 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java @@ -131,7 +131,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieSparkClientTestBase List<Row> rows = DataSourceTestUtils.generateRandomRows(10); Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType); Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "0000000001"); + new NonSortPartitionerWithRows(), "0000000001"); StructType resultSchema = result.schema(); assertEquals(result.count(), 10); @@ -175,7 +175,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieSparkClientTestBase .build(); Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType); Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); StructType resultSchema = result.schema(); assertEquals(result.count(), 10); @@ -212,7 +212,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieSparkClientTestBase rows.addAll(updates); Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType); Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); StructType resultSchema = result.schema(); assertEquals(result.count(), enablePreCombine ? 10 : 15); @@ -316,7 +316,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieSparkClientTestBase Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType); try { Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); preparedDF.count(); fail("Should have thrown exception"); } catch (Exception e) { @@ -328,7 +328,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieSparkClientTestBase dataset = sqlContext.createDataFrame(rows, structType); try { Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); preparedDF.count(); fail("Should have thrown exception"); } catch (Exception e) { @@ -340,7 +340,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieSparkClientTestBase dataset = sqlContext.createDataFrame(rows, structType); try { Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); preparedDF.count(); fail("Should have thrown exception"); } catch (Exception e) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 865ca147eb0..38221cc05c7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.hudi.command.SqlKeyGenerator -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertNull, assertTrue, fail} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments.arguments @@ -365,6 +365,52 @@ class TestHoodieSparkSqlWriter { testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields) } +@Test +def testBulkInsertForDropPartitionColumn(): Unit = { + //create a new table + val tableName = "trips_table" + val basePath = "file:///tmp/trips_table" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city") + val data = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-M", 27.70, "san_francisco"), + (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", "driver-L", 33.90, "san_francisco"), + (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-F", "driver-P", 34.15, "sao_paulo"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-J", "driver-T", 17.85, "chennai")); + + var inserts = spark.createDataFrame(data).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "city"). + option(HoodieWriteConfig.TABLE_NAME, tableName). + option("hoodie.datasource.write.recordkey.field", "uuid"). + option("hoodie.datasource.write.precombine.field", "rider"). + option("hoodie.datasource.write.operation", "bulk_insert"). + option("hoodie.datasource.write.hive_style_partitioning", "true"). + option("hoodie.populate.meta.fields", "false"). + option("hoodie.datasource.write.drop.partition.columns", "true"). + mode(SaveMode.Overwrite). + save(basePath) + + // Ensure the partition column (i.e 'city') can be read back + val tripsDF = spark.read.format("hudi").load(basePath) + tripsDF.show() + tripsDF.select("city").foreach(row => { + assertNotNull(row) + }) + + // Peek into the raw parquet file and ensure partition column is not written to the file + val partitions = Seq("city=san_francisco", "city=chennai", "city=sao_paulo") + val partitionPaths = new Array[String](3) + for (i <- partitionPaths.indices) { + partitionPaths(i) = String.format("%s/%s/*", basePath, partitions(i)) + } + val rawFileDf = spark.sqlContext.read.parquet(partitionPaths(0), partitionPaths(1), partitionPaths(2)) + rawFileDf.show() + rawFileDf.select("city").foreach(row => { + assertNull(row.get(0)) + }) +} + /** * Test case for disable and enable meta fields. */