This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 9f8d4d0130d [HUDI-6072] Fix NPE when upsert merger and null map or array (#8432) 9f8d4d0130d is described below commit 9f8d4d0130dbe78598f24f00e7fa75c13737fc79 Author: Nicolas Paris <nicolas.pa...@adevinta.com> AuthorDate: Fri Apr 28 07:23:46 2023 +0200 [HUDI-6072] Fix NPE when upsert merger and null map or array (#8432) Co-authored-by: Danny Chan <yuzhao....@gmail.com> --- .../apache/spark/sql/HoodieInternalRowUtils.scala | 5 +- .../apache/hudi/functional/TestCOWDataSource.scala | 101 ++++++++++++++------- 2 files changed, 73 insertions(+), 33 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala index 3ea801177fb..b56b0b1e4ce 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala @@ -188,7 +188,10 @@ object HoodieInternalRowUtils { null } - fieldWriters(pos)(fieldUpdater, pos, prevValue) + if(prevValue == null) + fieldUpdater.setNullAt(pos) + else + fieldWriters(pos)(fieldUpdater, pos, prevValue) pos += 1 } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index ae1f62b7e61..6b1773807fe 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -60,6 +60,8 @@ import java.sql.{Date, Timestamp} import java.util.function.Consumer import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import org.junit.jupiter.api.Assertions.assertDoesNotThrow +import org.junit.jupiter.api.function.Executable /** @@ -151,22 +153,22 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup @Test def testInferPartitionBy(): Unit = { val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, Map()) - // Insert Operation - val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList - val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + // Insert Operation + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) - val commonOptsNoPreCombine = Map( - "hoodie.insert.shuffle.parallelism" -> "4", - "hoodie.upsert.shuffle.parallelism" -> "4", - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" - ) ++ writeOpts + val commonOptsNoPreCombine = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) ++ writeOpts - inputDF.write.partitionBy("partition").format("hudi") - .options(commonOptsNoPreCombine) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Overwrite) - .save(basePath) + inputDF.write.partitionBy("partition").format("hudi") + .options(commonOptsNoPreCombine) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) val snapshot0 = spark.read.format("org.apache.hudi").options(readOpts).load(basePath) snapshot0.cache() @@ -195,10 +197,10 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup val records2 = recordsToStrings(dataGen.generateInserts("000", 200)).toList val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) // hard code the value for rider and fare so that we can verify the partitions paths with hudi - val toInsertDf = inputDF1.withColumn("fare",lit(100)).withColumn("rider",lit("rider-123")) - .union(inputDF2.withColumn("fare",lit(200)).withColumn("rider",lit("rider-456"))) + val toInsertDf = inputDF1.withColumn("fare", lit(100)).withColumn("rider", lit("rider-123")) + .union(inputDF2.withColumn("fare", lit(200)).withColumn("rider", lit("rider-456"))) - toInsertDf.write.partitionBy("fare","rider").format("hudi") + toInsertDf.write.partitionBy("fare", "rider").format("hudi") .options(commonOptsNoPreCombine) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .mode(SaveMode.Overwrite) @@ -217,7 +219,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(snapshot1.filter("_hoodie_partition_path = '200/rider-456'").count(), 200) // triggering 2nd batch to ensure table config validation does not fail. - toInsertDf.write.partitionBy("fare","rider").format("hudi") + toInsertDf.write.partitionBy("fare", "rider").format("hudi") .options(commonOptsNoPreCombine) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .mode(SaveMode.Append) @@ -346,7 +348,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count() } - private def writeToHudi(opts: Map[String, String], df: Dataset[Row]) : Unit = { + private def writeToHudi(opts: Map[String, String], df: Dataset[Row]): Unit = { df.write.format("hudi") .options(opts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) @@ -584,7 +586,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup .save(basePath) val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath) - .setLoadActiveTimelineOnLoad(true).build(); + .setLoadActiveTimelineOnLoad(true).build(); val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction) assertEquals(2, commits.size) @@ -663,7 +665,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup .save(basePath) val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath) - .setLoadActiveTimelineOnLoad(true).build() + .setLoadActiveTimelineOnLoad(true).build() val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction) assertEquals(2, commits.size) @@ -722,7 +724,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(7, filterSecondPartitionCount) val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath) - .setLoadActiveTimelineOnLoad(true).build() + .setLoadActiveTimelineOnLoad(true).build() val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray .map(instant => instant.asInstanceOf[HoodieInstant].getAction) assertEquals(3, commits.size) @@ -777,7 +779,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(7, filterSecondPartitionCount) val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath) - .setLoadActiveTimelineOnLoad(true).build() + .setLoadActiveTimelineOnLoad(true).build() val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray .map(instant => instant.asInstanceOf[HoodieInstant].getAction) assertEquals(2, commits.size) @@ -795,7 +797,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup val insert2NewKeyCnt = 2 val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt - val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate) + val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate) val inserts1 = allRecords.subList(0, insert1Cnt) val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt)) val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt)) @@ -847,7 +849,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup Row("22", "lisi", Timestamp.valueOf("1970-01-02 13:31:24"), Date.valueOf("1991-11-08"), BigDecimal.valueOf(2.0), 11, 1), Row("33", "zhangsan", Timestamp.valueOf("1970-01-03 13:31:24"), Date.valueOf("1991-11-09"), BigDecimal.valueOf(3.0), 11, 1)) val rdd = jsc.parallelize(records) - val recordsDF = spark.createDataFrame(rdd, schema) + val recordsDF = spark.createDataFrame(rdd, schema) recordsDF.write.format("org.apache.hudi") .options(writeOpts) .mode(SaveMode.Overwrite) @@ -1392,7 +1394,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup val (writeOpts, _) = getWriterReaderOpts(recordType, getQuickstartWriteConfigs.asScala.toMap) val dataGenerator = new QuickstartUtils.DataGenerator() - val records = convertToStringList(dataGenerator.generateInserts( 10)) + val records = convertToStringList(dataGenerator.generateInserts(10)) val recordsRDD = spark.sparkContext.parallelize(records, 2) val inputDF = spark.read.json(sparkSession.createDataset(recordsRDD)(Encoders.STRING)) inputDF.write.format("hudi") @@ -1411,6 +1413,41 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(false, Metrics.isInitialized(basePath), "Metrics should be shutdown") } + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testMapArrayTypeSchemaEvolution(recordType: HoodieRecordType): Unit = { + assertDoesNotThrow( + new Executable { + override def execute(): Unit = { + val (writeOpts, _) = getWriterReaderOpts(recordType, getQuickstartWriteConfigs.asScala.toMap) + + val schema1 = StructType( + StructField("_row_key", StringType, nullable = false) :: + StructField("name", MapType(StringType, + ArrayType(StringType, containsNull = false)), nullable = true) :: + StructField("timestamp", LongType, nullable = true) :: + StructField("partition", LongType, nullable = true) :: Nil) + val records = List(Row("1", null, 1L, 1L)) + val inputDF = spark.createDataFrame(spark.sparkContext.parallelize(records, 2), schema1) + inputDF.write.format("org.apache.hudi") + .options(commonOpts ++ writeOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + + val schema2 = StructType(StructField("_row_key", StringType, nullable = false) :: + StructField("name", MapType(StringType, ArrayType(StringType, + containsNull = true)), nullable = true) :: + StructField("timestamp", LongType, nullable = true) :: + StructField("partition", LongType, nullable = true) :: Nil) + val records2 = List(Row("1", null, 1L, 1L)) + val inputDF2 = spark.createDataFrame(spark.sparkContext.parallelize(records2, 2), schema2) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts ++ writeOpts) + .mode(SaveMode.Append) + .save(basePath) + }}) + } + /** * Validates that clustering dag is triggered only once. * We leverage spark event listener to validate it. @@ -1421,7 +1458,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup val sm = new StageEventManager("org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering") spark.sparkContext.addSparkListener(sm) - var structType : StructType = null + var structType: StructType = null for (i <- 1 to 2) { val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) @@ -1429,7 +1466,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup inputDF.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) - .option("hoodie.metadata.enable","false") + .option("hoodie.metadata.enable", "false") .mode(if (i == 0) SaveMode.Overwrite else SaveMode.Append) .save(basePath) } @@ -1444,7 +1481,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup .option("hoodie.parquet.small.file.limit", "0") .option("hoodie.clustering.inline", "true") .option("hoodie.clustering.inline.max.commits", "2") - .option("hoodie.metadata.enable","false") + .option("hoodie.metadata.enable", "false") .mode(SaveMode.Append) .save(basePath) @@ -1466,8 +1503,8 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } def getWriterReaderOptsLessPartitionPath(recordType: HoodieRecordType, - opt: Map[String, String] = commonOpts, - enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()): + opt: Map[String, String] = commonOpts, + enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()): (Map[String, String], Map[String, String]) = { val (writeOpts, readOpts) = getWriterReaderOpts(recordType, opt, enableFileIndex) (writeOpts.-(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()), readOpts) @@ -1483,7 +1520,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } } - /************** Stage Event Listener **************/ + /** ************ Stage Event Listener ************* */ class StageEventManager(eventToTrack: String) extends SparkListener() { var triggerCount = 0