This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.14.0-siva-0.14.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 790903712ecd5ee65850673141227698ea0ced26
Author: bhat-vinay <152183592+bhat-vi...@users.noreply.github.com>
AuthorDate: Mon Dec 11 22:08:30 2023 +0530

    [HUDI-7040] Handle dropping of partition columns in 
BulkInsertDataInternalWriterHelper::write(...) (#10272)
    
    Issue:
    There are two configs which when set in a certain manner throws exceptions 
or asserts
    1. Configs to disable populating metadata fields (for each row)
    2. Configs to drop partition columns (to save storage space) from a row
    
    With #1 and #2, partition paths cannot be deduced using partition columns 
(as the partition columns are dropped higher up the stack.
    BulkInsertDataInternalWriterHelper::write(...) relied on metadata fields to 
extract partition path in such cases.
    But with #1 it is not possible resulting in asserts/exceptions.
    
    The fix is to push down the dropping of partition columns down the stack 
after partition path is computed.
    The fix manipulates the raw 'InternalRow' row structure by only copying the 
relevent fields into a new 'InternalRow' structure.
    Each row is processed individually to drop the partition columns and copy 
it a to new 'InternalRow'
    
    Co-authored-by: Vinaykumar Bhat <vi...@onehouse.ai>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 ++
 .../commit/BulkInsertDataInternalWriterHelper.java | 34 ++++++++++++++-
 .../hudi/HoodieDatasetBulkInsertHelper.scala       | 31 +++++---------
 .../BaseDatasetBulkInsertCommitActionExecutor.java |  3 +-
 .../TestHoodieDatasetBulkInsertHelper.java         | 12 +++---
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 48 +++++++++++++++++++++-
 6 files changed, 101 insertions(+), 31 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2524d7ef904..0cf1f287976 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1345,6 +1345,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE);
   }
 
+  public boolean shouldDropPartitionColumns() {
+    return getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS);
+  }
+
   public String getWriteStatusClassName() {
     return getString(WRITE_STATUS_CLASS_NAME);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
index 7f6054b2296..0773e8a5a0a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.HoodieDatasetBulkInsertHelper;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
@@ -38,11 +39,16 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
 /**
  * Helper class for HoodieBulkInsertDataInternalWriter used by Spark 
datasource v2.
  */
@@ -124,7 +130,33 @@ public class BulkInsertDataInternalWriterHelper {
         lastKnownPartitionPath = partitionPath.clone();
       }
 
-      handle.write(row);
+      boolean shouldDropPartitionColumns = 
writeConfig.shouldDropPartitionColumns();
+      if (shouldDropPartitionColumns) {
+        // Drop the partition columns from the row
+        // Using the deprecated JavaConversions to be compatible with scala 
versions < 2.12. Once hudi support for scala versions < 2.12 is
+        // stopped, can move this to JavaConverters.seqAsJavaList(...)
+        List<String> partitionCols = 
JavaConversions.<String>seqAsJavaList(HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig));
+        Set<Integer> partitionIdx = new HashSet<Integer>();
+        for (String col : partitionCols) {
+          partitionIdx.add(this.structType.fieldIndex(col));
+        }
+
+        // Relies on InternalRow::toSeq(...) preserving the column ordering 
based on the supplied schema
+        // Using the deprecated JavaConversions to be compatible with scala 
versions < 2.12.
+        List<Object> cols = 
JavaConversions.<Object>seqAsJavaList(row.toSeq(structType));
+        int idx = 0;
+        List<Object> newCols = new ArrayList<Object>();
+        for (Object o : cols) {
+          if (!partitionIdx.contains(idx)) {
+            newCols.add(o);
+          }
+          idx += 1;
+        }
+        InternalRow newRow = 
InternalRow.fromSeq(JavaConverters.<Object>asScalaIteratorConverter(newCols.iterator()).asScala().toSeq());
+        handle.write(newRow);
+      } else {
+        handle.write(row);
+      }
     } catch (Throwable t) {
       LOG.error("Global error thrown while trying to write records in 
HoodieRowCreateHandle ", t);
       throw t;
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 12e446d7be6..75ec069946d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -62,7 +62,6 @@ object HoodieDatasetBulkInsertHelper
   def prepareForBulkInsert(df: DataFrame,
                            config: HoodieWriteConfig,
                            partitioner: BulkInsertPartitioner[Dataset[Row]],
-                           shouldDropPartitionColumns: Boolean,
                            instantTime: String): Dataset[Row] = {
     val populateMetaFields = config.populateMetaFields()
     val schema = df.schema
@@ -128,16 +127,10 @@ object HoodieDatasetBulkInsertHelper
       HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery)
     }
 
-    val trimmedDF = if (shouldDropPartitionColumns) {
-      dropPartitionColumns(updatedDF, config)
-    } else {
-      updatedDF
-    }
-
     val targetParallelism =
-      deduceShuffleParallelism(trimmedDF, 
config.getBulkInsertShuffleParallelism)
+      deduceShuffleParallelism(updatedDF, 
config.getBulkInsertShuffleParallelism)
 
-    partitioner.repartitionRecords(trimmedDF, targetParallelism)
+    partitioner.repartitionRecords(updatedDF, targetParallelism)
   }
 
   /**
@@ -243,21 +236,17 @@ object HoodieDatasetBulkInsertHelper
     }
   }
 
-  private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): 
DataFrame = {
-    val partitionPathFields = getPartitionPathFields(config).toSet
-    val nestedPartitionPathFields = partitionPathFields.filter(f => 
f.contains('.'))
-    if (nestedPartitionPathFields.nonEmpty) {
-      logWarning(s"Can not drop nested partition path fields: 
$nestedPartitionPathFields")
-    }
-
-    val partitionPathCols = (partitionPathFields -- 
nestedPartitionPathFields).toSeq
-
-    df.drop(partitionPathCols: _*)
-  }
-
   private def getPartitionPathFields(config: HoodieWriteConfig): Seq[String] = 
{
     val keyGeneratorClassName = 
config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME)
     val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
     keyGenerator.getPartitionPathFields.asScala
   }
+
+   def getPartitionPathCols(config: HoodieWriteConfig): Seq[String] = {
+    val partitionPathFields = getPartitionPathFields(config).toSet
+    val nestedPartitionPathFields = partitionPathFields.filter(f => 
f.contains('.'))
+
+    return (partitionPathFields -- nestedPartitionPathFields).toSeq
+  }
+
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
index fb0218137d2..1e20e4ab663 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
@@ -95,8 +95,7 @@ public abstract class 
BaseDatasetBulkInsertCommitActionExecutor implements Seria
     table = writeClient.initTable(getWriteOperationType(), 
Option.ofNullable(instantTime));
 
     BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows = 
getPartitioner(populateMetaFields, isTablePartitioned);
-    boolean shouldDropPartitionColumns = 
writeConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS());
-    Dataset<Row> hoodieDF = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, 
bulkInsertPartitionerRows, shouldDropPartitionColumns, instantTime);
+    Dataset<Row> hoodieDF = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, 
bulkInsertPartitionerRows, instantTime);
 
     preExecute();
     HoodieWriteMetadata<JavaRDD<WriteStatus>> result = 
buildHoodieWriteMetadata(doExecute(hoodieDF, 
bulkInsertPartitionerRows.arePartitionRecordsSorted()));
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
index 8166820cb87..1c21c9a5253 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java
@@ -131,7 +131,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
     List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
     Dataset<Row> result = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-        new NonSortPartitionerWithRows(), false, "0000000001");
+        new NonSortPartitionerWithRows(), "0000000001");
     StructType resultSchema = result.schema();
 
     assertEquals(result.count(), 10);
@@ -175,7 +175,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
         .build();
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
     Dataset<Row> result = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-        new NonSortPartitionerWithRows(), false, "000001111");
+        new NonSortPartitionerWithRows(), "000001111");
     StructType resultSchema = result.schema();
 
     assertEquals(result.count(), 10);
@@ -212,7 +212,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
     rows.addAll(updates);
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
     Dataset<Row> result = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-        new NonSortPartitionerWithRows(), false, "000001111");
+        new NonSortPartitionerWithRows(), "000001111");
     StructType resultSchema = result.schema();
 
     assertEquals(result.count(), enablePreCombine ? 10 : 15);
@@ -316,7 +316,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
     try {
       Dataset<Row> preparedDF = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-          new NonSortPartitionerWithRows(), false, "000001111");
+          new NonSortPartitionerWithRows(), "000001111");
       preparedDF.count();
       fail("Should have thrown exception");
     } catch (Exception e) {
@@ -328,7 +328,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
     dataset = sqlContext.createDataFrame(rows, structType);
     try {
       Dataset<Row> preparedDF = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-          new NonSortPartitionerWithRows(), false, "000001111");
+          new NonSortPartitionerWithRows(), "000001111");
       preparedDF.count();
       fail("Should have thrown exception");
     } catch (Exception e) {
@@ -340,7 +340,7 @@ public class TestHoodieDatasetBulkInsertHelper extends 
HoodieSparkClientTestBase
     dataset = sqlContext.createDataFrame(rows, structType);
     try {
       Dataset<Row> preparedDF = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-          new NonSortPartitionerWithRows(), false, "000001111");
+          new NonSortPartitionerWithRows(), "000001111");
       preparedDF.count();
       fail("Should have thrown exception");
     } catch (Exception e) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 865ca147eb0..38221cc05c7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{expr, lit}
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
 import org.apache.spark.sql.hudi.command.SqlKeyGenerator
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertTrue, fail}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertNull, assertTrue, fail}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.Arguments.arguments
@@ -365,6 +365,52 @@ class TestHoodieSparkSqlWriter {
     testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields)
   }
 
+@Test
+def testBulkInsertForDropPartitionColumn(): Unit = {
+  //create a new table
+  val tableName = "trips_table"
+  val basePath = "file:///tmp/trips_table"
+  val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city")
+  val data =
+    Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", 
"driver-K", 19.10, "san_francisco"),
+      (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", 
"driver-M", 27.70, "san_francisco"),
+      (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", 
"driver-L", 33.90, "san_francisco"),
+      (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-F", 
"driver-P", 34.15, "sao_paulo"),
+      (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-J", 
"driver-T", 17.85, "chennai"));
+
+  var inserts = spark.createDataFrame(data).toDF(columns: _*)
+  inserts.write.format("hudi").
+    option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "city").
+    option(HoodieWriteConfig.TABLE_NAME, tableName).
+    option("hoodie.datasource.write.recordkey.field", "uuid").
+    option("hoodie.datasource.write.precombine.field", "rider").
+    option("hoodie.datasource.write.operation", "bulk_insert").
+    option("hoodie.datasource.write.hive_style_partitioning", "true").
+    option("hoodie.populate.meta.fields", "false").
+    option("hoodie.datasource.write.drop.partition.columns", "true").
+    mode(SaveMode.Overwrite).
+    save(basePath)
+
+  // Ensure the partition column (i.e 'city') can be read back
+  val tripsDF = spark.read.format("hudi").load(basePath)
+  tripsDF.show()
+  tripsDF.select("city").foreach(row => {
+    assertNotNull(row)
+  })
+
+  // Peek into the raw parquet file and ensure partition column is not written 
to the file
+  val partitions = Seq("city=san_francisco", "city=chennai", "city=sao_paulo")
+  val partitionPaths = new Array[String](3)
+  for (i <- partitionPaths.indices) {
+    partitionPaths(i) = String.format("%s/%s/*", basePath, partitions(i))
+  }
+  val rawFileDf = spark.sqlContext.read.parquet(partitionPaths(0), 
partitionPaths(1), partitionPaths(2))
+  rawFileDf.show()
+  rawFileDf.select("city").foreach(row => {
+    assertNull(row.get(0))
+  })
+}
+
   /**
    * Test case for disable and enable meta fields.
    */

Reply via email to