[ https://issues.apache.org/jira/browse/HUDI-1489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vinoth Chandar closed HUDI-1489. -------------------------------- Resolution: Fixed > Not able to read after updating bootstrap table with written table > ------------------------------------------------------------------ > > Key: HUDI-1489 > URL: https://issues.apache.org/jira/browse/HUDI-1489 > Project: Apache Hudi > Issue Type: Bug > Reporter: Wenning Ding > Assignee: Wenning Ding > Priority: Major > Labels: pull-request-available > Fix For: 0.7.0 > > > After updating Hudi table with the written bootstrap table, it would fail to > read the latest bootstrap table. > h3. Reproduction steps > {code:java} > import org.apache.hudi.DataSourceWriteOptions > import org.apache.hudi.common.model.HoodieTableType > import org.apache.hudi.config.HoodieBootstrapConfig > import org.apache.hudi.config.HoodieWriteConfig > import org.apache.spark.sql.SaveMode > import org.apache.spark.sql.SparkSession > val bucketName = "wenningd-dev" > val tableName = "hudi_bootstrap_test_cow_5c1a5147_888e_4b638bef8" > val recordKeyName = "event_id" > val partitionKeyName = "event_type" > val precombineKeyName = "event_time" > val verificationRecordKey = "4" > val verificationColumn = "event_name" > val originalVerificationValue = "event_d" > val updatedVerificationValue = "event_test" > val sourceTableLocation = "s3://wenningd-dev/hudi/test-data/source_table/" > val tableType = HoodieTableType.COPY_ON_WRITE.name() > val verificationSqlQuery = "select " + verificationColumn + " from " + > tableName + " where " + recordKeyName + " = '" + verificationRecordKey + "'" > val tablePath = "s3://" + bucketName + "/hudi/tables/" + tableName > val loadTablePath = tablePath + "/*/*" > // Create table and sync with hive > val df = spark.emptyDataFrame > val tableType = HoodieTableType.COPY_ON_WRITE.name > df.write > .format("hudi") > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, > sourceTableLocation) > .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, > "org.apache.hudi.keygen.SimpleKeyGenerator") > .option(DataSourceWriteOptions.OPERATION_OPT_KEY, > DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) > .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, > recordKeyName) > .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") > .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) > .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, > partitionKeyName) > > .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, > "org.apache.hudi.hive.MultiPartKeysValueExtractor") > .mode(SaveMode.Overwrite) > .save(tablePath) > // Verify create with spark sql query > val result0 = spark.sql(verificationSqlQuery) > if (!(result0.count == 1) || > !result0.collect.mkString.contains(originalVerificationValue)) { > throw new TestFailureException("Create table verification failed!") > } > val df3 = spark.read.format("org.apache.hudi").load(loadTablePath) > val df4 = df3.filter(col(recordKeyName) === verificationRecordKey) > val df5 = df4.withColumn(verificationColumn, lit(updatedVerificationValue)) > df5.write.format("org.apache.hudi") > .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, tableType) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyName) > .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, > partitionKeyName) > .option(HoodieWriteConfig.TABLE_NAME, tableName) > .option(DataSourceWriteOptions.OPERATION_OPT_KEY, > DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, precombineKeyName) > .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") > .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) > .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, > partitionKeyName) > .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, > "org.apache.hudi.hive.MultiPartKeysValueExtractor") > .mode(SaveMode.Append) > .save(tablePath) > val result1 = spark.sql(verificationSqlQuery) > val df6 = spark.read.format("org.apache.hudi").load(loadTablePath) > df6.show > {code} > df6.show would return: > {code:java} > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) > at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:407) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3395) > at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552) > at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2552) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370) > at > org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94) > at > org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141) > at > org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369) > at org.apache.spark.sql.Dataset.head(Dataset.scala:2552) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2766) > at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:292) > at org.apache.spark.sql.Dataset.show(Dataset.scala:753) > at org.apache.spark.sql.Dataset.show(Dataset.scala:712) > at org.apache.spark.sql.Dataset.show(Dataset.scala:721) > ... 49 elided > Caused by: java.lang.NullPointerException > at > org.apache.spark.sql.execution.vectorized.WritableColumnVector.arrayData(WritableColumnVector.java:637) > at > org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:378) > at > org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getUTF8String(MutableColumnarRow.java:135) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:297) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:289) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) > {code} > The root cause is: > the {{requiredColumns}} in the > [buildScan()|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L72] > does not follow the same order as the schema file. > For example, when I selected all the columns, I printed the > {{requiredColumns}}: > > {{20/10/13 22:57:59 WARN HoodieBootstrapRelation: wenningd = > required > columns: _hoodie_commit_time _hoodie_record_key _hoodie_partition_path > event_type event_id event_guests event_time _hoodie_commit_seqno > _hoodie_file_name event_name}} > You can see not all the metadata columns are in the front. So the problem > here is when we try to use > [regularReadFunction|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L127], > we use this as the schema: {{requiredSkeletonSchema.fields ++ > requiredDataSchema.fields}}. But since the required columns do not follow the > same order as schema file, there's a schema mismatch between > [requiredSchema|https://github.com/apache/hudi/blob/release-0.6.0/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala#L132] > and {{requiredColumns}} -- This message was sent by Atlassian Jira (v8.3.4#803005)