[
https://issues.apache.org/jira/browse/HUDI-8409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892621#comment-17892621
]
Jonathan Vexler commented on HUDI-8409:
---------------------------------------
Upgrade and backwards compatible read are both completely broken
{code:java}
hudi_spark_shell -p -v 0.15.0
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig,
HoodieLockConfig, HoodieWriteConfig}import
org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}import
org.apache.spark.sql.SaveMode._
val basePath = "/tmp/test_table_upgrade"
val columns = Seq("timestamp", "partition", "_row_key", "data")val data1 =
Seq((100L, "partA", "rec1", "data0"), (100L, "partA", "rec2", "data0"),
(100L, "partA", "rec3", "data0"));
val inputDF1 = spark.createDataFrame(data1).toDF(columns:_*)
inputDF1.write.format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE.key,
"MERGE_ON_READ") .option(DataSourceWriteOptions.RECORDKEY_FIELD.key,
"_row_key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key,
"partition") .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key,
"timestamp") .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
.option("hoodie.datasource.write.payload.class",
"org.apache.hudi.common.model.DefaultHoodieRecordPayload") .mode(Overwrite)
.save(basePath)
spark.read.format("hudi").load(basePath).show(100,false)
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+|_hoodie_commit_time|_hoodie_commit_seqno
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
|timestamp|partition|_row_key|data
|+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+|20241024142643730
|20241024142643730_0_0|rec3 |partA
|9a00995b-ca6a-4095-8583-813132063596-0_0-23-32_20241024142643730.parquet|100
|partA |rec3 |data0||20241024142643730 |20241024142643730_0_1|rec1
|partA
|9a00995b-ca6a-4095-8583-813132063596-0_0-23-32_20241024142643730.parquet|100
|partA |rec1 |data0||20241024142643730 |20241024142643730_0_2|rec2
|partA
|9a00995b-ca6a-4095-8583-813132063596-0_0-23-32_20241024142643730.parquet|100
|partA |rec2
|data0|+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+
val data2 = Seq((50L, "partA", "rec1", "data1"), (100L, "partA", "rec2",
"data1"), (200L, "partA", "rec3", "data1"));
val inputDF2 = spark.createDataFrame(data2).toDF(columns:_*)
inputDF2.write.format("hudi").mode(Append).save(basePath)
spark.read.format("hudi").load(basePath).show(100,false)
+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+|_hoodie_commit_time|_hoodie_commit_seqno
|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
|timestamp|partition|_row_key|data
|+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+|20241024142650408
|20241024142650408_0_1|rec3 |partA
|9a00995b-ca6a-4095-8583-813132063596-0 |200
|partA |rec3 |data1||20241024142643730 |20241024142643730_0_1|rec1
|partA
|9a00995b-ca6a-4095-8583-813132063596-0_0-23-32_20241024142643730.parquet|100
|partA |rec1 |data0||20241024142650408 |20241024142650408_0_3|rec2
|partA |9a00995b-ca6a-4095-8583-813132063596-0
|100 |partA |rec2
|data1|+-------------------+---------------------+------------------+----------------------+------------------------------------------------------------------------+---------+---------+--------+-----+
hudi_spark_shell -j
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig,
HoodieLockConfig, HoodieWriteConfig}import
org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}import
org.apache.spark.sql.SaveMode._
val basePath = "/tmp/test_table_upgrade"
spark.read.format("hudi").load(basePath).show(100,false)
org.apache.hudi.exception.HoodieIOException: Failed to fetch
HoodieCommitMetadata for instant
([20241024142650408__20241024142651540__deltacommit__COMPLETED]) at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.lambda$getCommitMetadataStream$4(HoodieActiveTimeline.java:396)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:361) at
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:500)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486) at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at
java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:531) at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.getLastCommitMetadataWithValidSchema(HoodieActiveTimeline.java:365)
at
org.apache.hudi.common.table.TableSchemaResolver.getLatestCommitMetadataWithValidSchema(TableSchemaResolver.java:419)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableSchemaFromLatestCommitMetadata(TableSchemaResolver.java:209)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaInternal(TableSchemaResolver.java:183)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:136)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:125)
at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.$anonfun$x$2$10(HoodieHadoopFsRelationFactory.scala:131)
at scala.util.Try$.apply(Try.scala:213) at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.$anonfun$x$2$9(HoodieHadoopFsRelationFactory.scala:131)
at scala.Option.getOrElse(Option.scala:189) at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.x$2$lzycompute(HoodieHadoopFsRelationFactory.scala:131)
at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.x$2(HoodieHadoopFsRelationFactory.scala:108)
at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.tableAvroSchema$lzycompute(HoodieHadoopFsRelationFactory.scala:108)
at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.tableAvroSchema(HoodieHadoopFsRelationFactory.scala:108)
at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.tableStructSchema$lzycompute(HoodieHadoopFsRelationFactory.scala:147)
at
org.apache.hudi.HoodieBaseHadoopFsRelationFactory.tableStructSchema(HoodieHadoopFsRelationFactory.scala:146)
at
org.apache.hudi.HoodieMergeOnReadSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:234)
at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:314) at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:142) at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:80) at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189) at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186) ... 49
elidedCaused by: java.io.IOException: unable to read commit metadata for bytes
length: 1708 at
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:508)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.lambda$getCommitMetadataStream$4(HoodieActiveTimeline.java:393)
... 83 moreCaused by: org.apache.avro.InvalidAvroMagicException: Not an Avro
data file at
org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:78) at
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:220)
at
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata(TimelineMetadataUtils.java:210)
at
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:505)
... 84 more
val columns = Seq("timestamp", "partition", "_row_key", "data")val data3 =
Seq((25L, "partA", "rec1", "data3"), (100L, "partA", "rec2", "data3"),
(400L, "partA", "rec3", "data3"));
val inputDF3 = spark.createDataFrame(data3).toDF(columns:_*)
inputDF3.write.format("hudi").mode(Append).save(basePath)
24/10/24 14:29:25 WARN ConfigUtils: The configuration key
'hoodie.compaction.record.merger.strategy' has been deprecated and may be
removed in the future. Please use the new key 'hoodie.record.merge.strategy.id'
instead.24/10/24 14:29:25 WARN ConfigUtils: The configuration key
'hoodie.compaction.record.merger.strategy' has been deprecated and may be
removed in the future. Please use the new key 'hoodie.record.merge.strategy.id'
instead.org.apache.hudi.exception.HoodieIOException: Failed to fetch
HoodieCommitMetadata for instant
([20241024142650408__20241024142651540__deltacommit__COMPLETED]) at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.lambda$getCommitMetadataStream$4(HoodieActiveTimeline.java:396)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:361) at
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:500)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486) at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at
java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:531) at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.getLastCommitMetadataWithValidSchema(HoodieActiveTimeline.java:365)
at
org.apache.hudi.common.table.TableSchemaResolver.getLatestCommitMetadataWithValidSchema(TableSchemaResolver.java:419)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableSchemaFromLatestCommitMetadata(TableSchemaResolver.java:209)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaInternal(TableSchemaResolver.java:183)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromLatestCommit(TableSchemaResolver.java:285)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.getLatestTableSchema(HoodieSparkSqlWriter.scala:665)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:343)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:190)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at
org.apache.spark.sql.adapter.BaseSpark3Adapter.sqlExecutionWithNewExecutionId(BaseSpark3Adapter.scala:105)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:212)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:127) at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:169) at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240) ...
49 elidedCaused by: java.io.IOException: unable to read commit metadata for
bytes length: 1708 at
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:508)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.lambda$getCommitMetadataStream$4(HoodieActiveTimeline.java:393)
... 103 moreCaused by: org.apache.avro.InvalidAvroMagicException: Not an Avro
data file at
org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:78) at
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata(TimelineMetadataUtils.java:220)
at
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata(TimelineMetadataUtils.java:210)
at
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:505)
... 104 more
spark.read.format("hudi").load(basePath).show(100,false)
same error as the previous read
{code}
> Ensure that upgrade and downgrade work with merging configs
> -----------------------------------------------------------
>
> Key: HUDI-8409
> URL: https://issues.apache.org/jira/browse/HUDI-8409
> Project: Apache Hudi
> Issue Type: Task
> Reporter: Jonathan Vexler
> Assignee: Jonathan Vexler
> Priority: Blocker
> Fix For: 1.0.0
>
>
> Make sure that we are upgrading and downgrading correctly with the merge
> configs such as payload class and merge mode
--
This message was sent by Atlassian Jira
(v8.20.10#820010)