[ https://issues.apache.org/jira/browse/HUDI-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Balaji Varadarajan updated HUDI-1021: ------------------------------------- Parent: HUDI-1265 (was: HUDI-242) > [Bug] Unable to update bootstrapped table using rows from the written > bootstrapped table > ---------------------------------------------------------------------------------------- > > Key: HUDI-1021 > URL: https://issues.apache.org/jira/browse/HUDI-1021 > Project: Apache Hudi > Issue Type: Sub-task > Components: bootstrap > Reporter: Udit Mehrotra > Assignee: Wenning Ding > Priority: Blocker > Fix For: 0.6.1 > > > Reproduction Steps: > > {code:java} > import spark.implicits._ > import org.apache.hudi.DataSourceWriteOptions > import org.apache.hudi.DataSourceReadOptions > import org.apache.hudi.config.HoodieWriteConfig > import org.apache.hudi.HoodieDataSourceHelpers > import org.apache.hudi.common.model.HoodieTableType > import org.apache.spark.sql.SaveMode > val sourcePath = > "s3://uditme-iad/hudi/tables/events/events_data_partitioned_non_null" > val sourceDf = spark.read.parquet(sourcePath + "/*") > var tableName = "events_data_partitioned_non_null_00" > var tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName > sourceDf.write.format("org.apache.hudi") > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .option(DataSourceWriteOptions.OPERATION_OPT_KEY, > DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) > .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, > DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") > .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") > .mode(SaveMode.Overwrite) > .save(tablePath) > val readDf = spark.read.format("org.apache.hudi").load(tablePath + "/*") > val updateDf = readDf.filter($"event_id" === "106") > .withColumn("event_name", lit("udit_event_106")) > > updateDf.write.format("org.apache.hudi") > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .option(DataSourceWriteOptions.OPERATION_OPT_KEY, > DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) > .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, > DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id") > .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts") > .mode(SaveMode.Append) > .save(tablePath) > {code} > > Full Stack trace: > {noformat} > Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting > bucketType UPDATE for partition :0 > at > org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:276) > at > org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at > org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:134) > at > org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:90) > at > org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74) > at > org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:269) > ... 30 more > Caused by: org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143) > at > org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:132) > ... 33 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141) > ... 34 more > Caused by: java.lang.NullPointerException > at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:222) > at > org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:159) > at > org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:149) > at > org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ... 3 more > {noformat} > > Here as you can see *updateDf* is being formed by reading row from the > *bootstrapped hudi table* itself. If however, we for the *updateDf* from the > source data it works fine: > val readDf = spark.read.parquet(sourcePath + "/*") > val updateDf = readDf.filter($"event_id" === "106").withColumn("event_name", > lit("udit_event_106")) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)