[ 
https://issues.apache.org/jira/browse/HUDI-8409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892621#comment-17892621
 ] 

Jonathan Vexler commented on HUDI-8409:
---------------------------------------

Upgrade and backwards compatible read are both completely broken

 
{code:java}
hudi_spark_shell -p -v 0.15.0

import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieLockConfig, HoodieWriteConfig}import 
org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}import 
org.apache.spark.sql.SaveMode._

val basePath = "/tmp/test_table_upgrade"

val columns = Seq("timestamp", "partition", "_row_key", "data")val data1 =  
Seq((100L, "partA", "rec1", "data0"),     (100L, "partA", "rec2", "data0"),     
(100L, "partA", "rec3", "data0"));
val inputDF1 = spark.createDataFrame(data1).toDF(columns:_*)
inputDF1.write.format("hudi")    .option(DataSourceWriteOptions.TABLE_TYPE.key, 
"MERGE_ON_READ")    .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, 
"_row_key")    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, 
"partition")    .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, 
"timestamp")    .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")    
.option("hoodie.datasource.write.payload.class", 
"org.apache.hudi.common.model.DefaultHoodieRecordPayload")    .mode(Overwrite)  
  .save(basePath)

spark.read.format("hudi").load(basePath).show(100,false)
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+|_hoodie_commit_time|_hoodie_commit_seqno
 |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                   
                                    |timestamp|partition|_row_key|data 
|+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+|20241024142643730
  |20241024142643730_0_0|rec3              |partA                 
|9a00995b-ca6a-4095-8583-813132063596-0_0-23-32_20241024142643730.parquet|100   
   |partA    |rec3    |data0||20241024142643730  |20241024142643730_0_1|rec1    
          |partA                 
|9a00995b-ca6a-4095-8583-813132063596-0_0-23-32_20241024142643730.parquet|100   
   |partA    |rec1    |data0||20241024142643730  |20241024142643730_0_2|rec2    
          |partA                 
|9a00995b-ca6a-4095-8583-813132063596-0_0-23-32_20241024142643730.parquet|100   
   |partA    |rec2    
|data0|+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+


val data2 =  Seq((50L, "partA", "rec1", "data1"),     (100L, "partA", "rec2", 
"data1"),     (200L, "partA", "rec3", "data1"));
val inputDF2 = spark.createDataFrame(data2).toDF(columns:_*)
inputDF2.write.format("hudi").mode(Append).save(basePath)

spark.read.format("hudi").load(basePath).show(100,false)
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+|_hoodie_commit_time|_hoodie_commit_seqno
 |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                   
                                    |timestamp|partition|_row_key|data 
|+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+|20241024142650408
  |20241024142650408_0_1|rec3              |partA                 
|9a00995b-ca6a-4095-8583-813132063596-0                                  |200   
   |partA    |rec3    |data1||20241024142643730  |20241024142643730_0_1|rec1    
          |partA                 
|9a00995b-ca6a-4095-8583-813132063596-0_0-23-32_20241024142643730.parquet|100   
   |partA    |rec1    |data0||20241024142650408  |20241024142650408_0_3|rec2    
          |partA                 |9a00995b-ca6a-4095-8583-813132063596-0        
                          |100      |partA    |rec2    
|data1|+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+


hudi_spark_shell -j  
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieLockConfig, HoodieWriteConfig}import 
org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}import 
org.apache.spark.sql.SaveMode._

val basePath = "/tmp/test_table_upgrade"
spark.read.format("hudi").load(basePath).show(100,false)
org.apache.hudi.exception.HoodieIOException: Failed to fetch 
HoodieCommitMetadata for instant 
([20241024142650408__20241024142651540__deltacommit__COMPLETED])  at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.lambda$getCommitMetadataStream$4(HoodieActiveTimeline.java:396)
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
at java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:361)  at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:500) 
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)  at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)  
at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)  at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  at 
java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:531)  at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.getLastCommitMetadataWithValidSchema(HoodieActiveTimeline.java:365)
  at 
org.apache.hudi.common.table.TableSchemaResolver.getLatestCommitMetadataWithValidSchema(TableSchemaResolver.java:419)
  at 
org.apache.hudi.common.table.TableSchemaResolver.getTableSchemaFromLatestCommitMetadata(TableSchemaResolver.java:209)
  at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaInternal(TableSchemaResolver.java:183)
  at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:136)
  at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:125)
  at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.$anonfun$x$2$10(HoodieHadoopFsRelationFactory.scala:131)
  at scala.util.Try$.apply(Try.scala:213)  at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.$anonfun$x$2$9(HoodieHadoopFsRelationFactory.scala:131)
  at scala.Option.getOrElse(Option.scala:189)  at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.x$2$lzycompute(HoodieHadoopFsRelationFactory.scala:131)
  at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.x$2(HoodieHadoopFsRelationFactory.scala:108)
  at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.tableAvroSchema$lzycompute(HoodieHadoopFsRelationFactory.scala:108)
  at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.tableAvroSchema(HoodieHadoopFsRelationFactory.scala:108)
  at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.tableStructSchema$lzycompute(HoodieHadoopFsRelationFactory.scala:147)
  at 
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.tableStructSchema(HoodieHadoopFsRelationFactory.scala:146)
  at 
org.apache.hudi.HoodieMergeOnReadSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:234)
  at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:314)  at 
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:142)  at 
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:80)  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
  at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)  
at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211) 
 at scala.Option.getOrElse(Option.scala:189)  at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)  at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)  ... 49 
elidedCaused by: java.io.IOException: unable to read commit metadata for bytes 
length: 1708  at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:508)
  at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.lambda$getCommitMetadataStream$4(HoodieActiveTimeline.java:393)
  ... 83 moreCaused by: org.apache.avro.InvalidAvroMagicException: Not an Avro 
data file  at 
org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:78)  at 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:220)
  at 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata(TimelineMetadataUtils.java:210)
  at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:505)
  ... 84 more


val columns = Seq("timestamp", "partition", "_row_key", "data")val data3 =  
Seq((25L, "partA", "rec1", "data3"),     (100L, "partA", "rec2", "data3"),     
(400L, "partA", "rec3", "data3"));
val inputDF3 = spark.createDataFrame(data3).toDF(columns:_*)
inputDF3.write.format("hudi").mode(Append).save(basePath)
24/10/24 14:29:25 WARN ConfigUtils: The configuration key 
'hoodie.compaction.record.merger.strategy' has been deprecated and may be 
removed in the future. Please use the new key 'hoodie.record.merge.strategy.id' 
instead.24/10/24 14:29:25 WARN ConfigUtils: The configuration key 
'hoodie.compaction.record.merger.strategy' has been deprecated and may be 
removed in the future. Please use the new key 'hoodie.record.merge.strategy.id' 
instead.org.apache.hudi.exception.HoodieIOException: Failed to fetch 
HoodieCommitMetadata for instant 
([20241024142650408__20241024142651540__deltacommit__COMPLETED])  at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.lambda$getCommitMetadataStream$4(HoodieActiveTimeline.java:396)
  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)  
at java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:361)  at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:500) 
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)  at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)  
at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)  at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)  at 
java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:531)  at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.getLastCommitMetadataWithValidSchema(HoodieActiveTimeline.java:365)
  at 
org.apache.hudi.common.table.TableSchemaResolver.getLatestCommitMetadataWithValidSchema(TableSchemaResolver.java:419)
  at 
org.apache.hudi.common.table.TableSchemaResolver.getTableSchemaFromLatestCommitMetadata(TableSchemaResolver.java:209)
  at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaInternal(TableSchemaResolver.java:183)
  at 
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromLatestCommit(TableSchemaResolver.java:285)
  at 
org.apache.hudi.HoodieSparkSqlWriterInternal.getLatestTableSchema(HoodieSparkSqlWriter.scala:665)
  at 
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:343)
  at 
org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:190)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
  at 
org.apache.spark.sql.adapter.BaseSpark3Adapter.sqlExecutionWithNewExecutionId(BaseSpark3Adapter.scala:105)
  at 
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:212)
  at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:127)  at 
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:169)  at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
  at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
  at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)  
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)  
at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
  at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
  at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
  at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859) 
 at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)  
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361) 
 at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)  ... 
49 elidedCaused by: java.io.IOException: unable to read commit metadata for 
bytes length: 1708  at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:508)
  at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.lambda$getCommitMetadataStream$4(HoodieActiveTimeline.java:393)
  ... 103 moreCaused by: org.apache.avro.InvalidAvroMagicException: Not an Avro 
data file  at 
org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:78)  at 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:220)
  at 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata(TimelineMetadataUtils.java:210)
  at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:505)
  ... 104 more

spark.read.format("hudi").load(basePath).show(100,false)
same error as the previous read

 {code}

> Ensure that upgrade and downgrade work with merging configs
> -----------------------------------------------------------
>
>                 Key: HUDI-8409
>                 URL: https://issues.apache.org/jira/browse/HUDI-8409
>             Project: Apache Hudi
>          Issue Type: Task
>            Reporter: Jonathan Vexler
>            Assignee: Jonathan Vexler
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> Make sure that we are upgrading and downgrading correctly with the merge 
> configs such as payload class and merge mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to