This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f8d4d0130d [HUDI-6072] Fix NPE when upsert merger and null map or 
array (#8432)
9f8d4d0130d is described below

commit 9f8d4d0130dbe78598f24f00e7fa75c13737fc79
Author: Nicolas Paris <nicolas.pa...@adevinta.com>
AuthorDate: Fri Apr 28 07:23:46 2023 +0200

    [HUDI-6072] Fix NPE when upsert merger and null map or array (#8432)
    
    Co-authored-by: Danny Chan <yuzhao....@gmail.com>
---
 .../apache/spark/sql/HoodieInternalRowUtils.scala  |   5 +-
 .../apache/hudi/functional/TestCOWDataSource.scala | 101 ++++++++++++++-------
 2 files changed, 73 insertions(+), 33 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index 3ea801177fb..b56b0b1e4ce 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -188,7 +188,10 @@ object HoodieInternalRowUtils {
           null
         }
 
-        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        if(prevValue == null)
+          fieldUpdater.setNullAt(pos)
+        else
+          fieldWriters(pos)(fieldUpdater, pos, prevValue)
         pos += 1
       }
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index ae1f62b7e61..6b1773807fe 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -60,6 +60,8 @@ import java.sql.{Date, Timestamp}
 import java.util.function.Consumer
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
+import org.junit.jupiter.api.Assertions.assertDoesNotThrow
+import org.junit.jupiter.api.function.Executable
 
 
 /**
@@ -151,22 +153,22 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
   @Test
   def testInferPartitionBy(): Unit = {
     val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, 
Map())
-      // Insert Operation
-      val records = recordsToStrings(dataGen.generateInserts("000", 
100)).toList
-      val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
 
-      val commonOptsNoPreCombine = Map(
-        "hoodie.insert.shuffle.parallelism" -> "4",
-        "hoodie.upsert.shuffle.parallelism" -> "4",
-        DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
-        HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
-      ) ++ writeOpts
+    val commonOptsNoPreCombine = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+    ) ++ writeOpts
 
-      inputDF.write.partitionBy("partition").format("hudi")
-        .options(commonOptsNoPreCombine)
-        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
-        .mode(SaveMode.Overwrite)
-        .save(basePath)
+    inputDF.write.partitionBy("partition").format("hudi")
+      .options(commonOptsNoPreCombine)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
 
     val snapshot0 = 
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
     snapshot0.cache()
@@ -195,10 +197,10 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     val records2 = recordsToStrings(dataGen.generateInserts("000", 200)).toList
     val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
     // hard code the value for rider and fare so that we can verify the 
partitions paths with hudi
-    val toInsertDf = 
inputDF1.withColumn("fare",lit(100)).withColumn("rider",lit("rider-123"))
-      
.union(inputDF2.withColumn("fare",lit(200)).withColumn("rider",lit("rider-456")))
+    val toInsertDf = inputDF1.withColumn("fare", lit(100)).withColumn("rider", 
lit("rider-123"))
+      .union(inputDF2.withColumn("fare", lit(200)).withColumn("rider", 
lit("rider-456")))
 
-    toInsertDf.write.partitionBy("fare","rider").format("hudi")
+    toInsertDf.write.partitionBy("fare", "rider").format("hudi")
       .options(commonOptsNoPreCombine)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Overwrite)
@@ -217,7 +219,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     assertEquals(snapshot1.filter("_hoodie_partition_path = 
'200/rider-456'").count(), 200)
 
     // triggering 2nd batch to ensure table config validation does not fail.
-    toInsertDf.write.partitionBy("fare","rider").format("hudi")
+    toInsertDf.write.partitionBy("fare", "rider").format("hudi")
       .options(commonOptsNoPreCombine)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .mode(SaveMode.Append)
@@ -346,7 +348,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     
spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
   }
 
-  private def writeToHudi(opts: Map[String, String], df: Dataset[Row]) : Unit 
= {
+  private def writeToHudi(opts: Map[String, String], df: Dataset[Row]): Unit = 
{
     df.write.format("hudi")
       .options(opts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
@@ -584,7 +586,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       .save(basePath)
 
     val metaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
-    .setLoadActiveTimelineOnLoad(true).build();
+      .setLoadActiveTimelineOnLoad(true).build();
     val commits = 
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
       .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
     assertEquals(2, commits.size)
@@ -663,7 +665,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       .save(basePath)
 
     val metaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
-    .setLoadActiveTimelineOnLoad(true).build()
+      .setLoadActiveTimelineOnLoad(true).build()
     val commits = 
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
       .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
     assertEquals(2, commits.size)
@@ -722,7 +724,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     assertEquals(7, filterSecondPartitionCount)
 
     val metaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
-    .setLoadActiveTimelineOnLoad(true).build()
+      .setLoadActiveTimelineOnLoad(true).build()
     val commits = 
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
       .map(instant => instant.asInstanceOf[HoodieInstant].getAction)
     assertEquals(3, commits.size)
@@ -777,7 +779,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     assertEquals(7, filterSecondPartitionCount)
 
     val metaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
-    .setLoadActiveTimelineOnLoad(true).build()
+      .setLoadActiveTimelineOnLoad(true).build()
     val commits = 
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
       .map(instant => instant.asInstanceOf[HoodieInstant].getAction)
     assertEquals(2, commits.size)
@@ -795,7 +797,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     val insert2NewKeyCnt = 2
 
     val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt
-    val allRecords =  dataGen.generateInserts("001", totalUniqueKeyToGenerate)
+    val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate)
     val inserts1 = allRecords.subList(0, insert1Cnt)
     val inserts2New = dataGen.generateSameKeyInserts("002", 
allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt))
     val inserts2Dup = dataGen.generateSameKeyInserts("002", 
inserts1.subList(0, insert2DupKeyCnt))
@@ -847,7 +849,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       Row("22", "lisi", Timestamp.valueOf("1970-01-02 13:31:24"), 
Date.valueOf("1991-11-08"), BigDecimal.valueOf(2.0), 11, 1),
       Row("33", "zhangsan", Timestamp.valueOf("1970-01-03 13:31:24"), 
Date.valueOf("1991-11-09"), BigDecimal.valueOf(3.0), 11, 1))
     val rdd = jsc.parallelize(records)
-    val  recordsDF = spark.createDataFrame(rdd, schema)
+    val recordsDF = spark.createDataFrame(rdd, schema)
     recordsDF.write.format("org.apache.hudi")
       .options(writeOpts)
       .mode(SaveMode.Overwrite)
@@ -1392,7 +1394,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     val (writeOpts, _) = getWriterReaderOpts(recordType, 
getQuickstartWriteConfigs.asScala.toMap)
 
     val dataGenerator = new QuickstartUtils.DataGenerator()
-    val records = convertToStringList(dataGenerator.generateInserts( 10))
+    val records = convertToStringList(dataGenerator.generateInserts(10))
     val recordsRDD = spark.sparkContext.parallelize(records, 2)
     val inputDF = 
spark.read.json(sparkSession.createDataset(recordsRDD)(Encoders.STRING))
     inputDF.write.format("hudi")
@@ -1411,6 +1413,41 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     assertEquals(false, Metrics.isInitialized(basePath), "Metrics should be 
shutdown")
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
+  def testMapArrayTypeSchemaEvolution(recordType: HoodieRecordType): Unit = {
+    assertDoesNotThrow(
+      new Executable {
+        override def execute(): Unit = {
+    val (writeOpts, _) = getWriterReaderOpts(recordType, 
getQuickstartWriteConfigs.asScala.toMap)
+
+    val schema1 = StructType(
+      StructField("_row_key", StringType, nullable = false) ::
+        StructField("name", MapType(StringType,
+          ArrayType(StringType, containsNull = false)), nullable = true) ::
+        StructField("timestamp", LongType, nullable = true) ::
+        StructField("partition", LongType, nullable = true) :: Nil)
+    val records = List(Row("1", null, 1L, 1L))
+    val inputDF = 
spark.createDataFrame(spark.sparkContext.parallelize(records, 2), schema1)
+    inputDF.write.format("org.apache.hudi")
+      .options(commonOpts ++ writeOpts)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val schema2 = StructType(StructField("_row_key", StringType, nullable = 
false) ::
+      StructField("name", MapType(StringType, ArrayType(StringType,
+        containsNull = true)), nullable = true) ::
+      StructField("timestamp", LongType, nullable = true) ::
+      StructField("partition", LongType, nullable = true) :: Nil)
+    val records2 = List(Row("1", null, 1L, 1L))
+    val inputDF2 = 
spark.createDataFrame(spark.sparkContext.parallelize(records2, 2), schema2)
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts ++ writeOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    }})
+  }
+
   /**
    * Validates that clustering dag is triggered only once.
    * We leverage spark event listener to validate it.
@@ -1421,7 +1458,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     val sm = new 
StageEventManager("org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering")
     spark.sparkContext.addSparkListener(sm)
 
-    var structType : StructType = null
+    var structType: StructType = null
     for (i <- 1 to 2) {
       val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 
100)).toList
       val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
@@ -1429,7 +1466,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       inputDF.write.format("hudi")
         .options(commonOpts)
         .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
-        .option("hoodie.metadata.enable","false")
+        .option("hoodie.metadata.enable", "false")
         .mode(if (i == 0) SaveMode.Overwrite else SaveMode.Append)
         .save(basePath)
     }
@@ -1444,7 +1481,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       .option("hoodie.parquet.small.file.limit", "0")
       .option("hoodie.clustering.inline", "true")
       .option("hoodie.clustering.inline.max.commits", "2")
-      .option("hoodie.metadata.enable","false")
+      .option("hoodie.metadata.enable", "false")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -1466,8 +1503,8 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
   }
 
   def getWriterReaderOptsLessPartitionPath(recordType: HoodieRecordType,
-                          opt: Map[String, String] = commonOpts,
-                          enableFileIndex: Boolean = 
DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):
+                                           opt: Map[String, String] = 
commonOpts,
+                                           enableFileIndex: Boolean = 
DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):
   (Map[String, String], Map[String, String]) = {
     val (writeOpts, readOpts) = getWriterReaderOpts(recordType, opt, 
enableFileIndex)
     (writeOpts.-(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()), readOpts)
@@ -1483,7 +1520,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     }
   }
 
-  /************** Stage Event Listener **************/
+  /** ************ Stage Event Listener ************* */
   class StageEventManager(eventToTrack: String) extends SparkListener() {
     var triggerCount = 0
 

Reply via email to