jlloh opened a new issue, #9256: URL: https://github.com/apache/hudi/issues/9256
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes - Join the mailing list to engage in conversations and get faster support at dev-subscr...@hudi.apache.org. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** I have a Flink job (1.16) writing a Hudi COW table with async compaction. The way I am writing the job is: * Using the Table API to create the table * Subsequently doing an insert into the table ```scala // In Flink Scala val hudiTable = "my_table_name" val hudiTableDescriptor = buildHudiTableDescriptor() logger.info(s"Creating hudi table {} with descriptor: {}", hudiTable, hudiTableDescriptor.toString()) bsTableEnv.createTable(hudiTable, hudiTableDescriptor) bsTableEnv.fromDataStream(d).select(getOrderedColumnExpr(): _*).insertInto(AppConfig.output.hudiSink.targetTable).execute() ``` The table is registered as an external table in Hive, but Flink is not attached to any Hive catalogue. The table is registered in SparkSQL using: ``` spark.sql("create table xxx.yyy using hudi location 's3a://location_flink_is_writing_to' ").show() ``` Right now, I want to try schema evolution and to add a column to the table. I am creating a savepoint, adding the column into the Flink Hudi DDL, and recreating the table, and subsequently inserting with the new column. On the Spark side, I have tried: 1. Dropping the external table and recreating it after resuming my flink job from the savepoint with new columns (it still doesn't register the new column, probably because hoodie.properties is not being updated by Flink) 2. Stop my job, trying to `ALTER TABLE ADD COLUMN ` from spark following https://hudi.apache.org/docs/schema_evolution/, i.e.: ``` spark.sql("alter table xxx.yyy add columns (new_column ARRAY<STRING>)").show(100, False) ``` and resuming my job from the savepoint but with the new columns above. (1) doesn't end up registering a new column. (2) the column is added in the hive metastore but all values are still null, even new incoming rows. **To Reproduce** Steps to reproduce the behavior: 1. Create a Flink job writing data to s3, by creating a hudi table with the table API and then doing an insert 2. Register the table in Hive as an external table 4. Restart the Flink job from a savepoint, adding in the new column into the DDL to run the create again and insert 5. Try to add the column by either recreating the external table or by doing an `alter table add column` in Spark SQL 6. Query table using Spark SQL using the hive table 7. All nulls returned for new column ``` spark.sql(""" SELECT request_id, new_column from xxx.yyy where concat(local_year,local_month, local_day) = '20230721' and new_column is not null limit 100 """).show(100, False) +----------+-------------------+ |request_id|new_column | +----------+-------------------+ +----------+-------------------+ ``` **Expected behavior** Expected to be able to add a column successfully through either method and for column to be populated with data and queryable. **Environment Description** * Hudi version : 0.13 * Spark version : 3.3 * Hive version : 2 * Hadoop version: * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : Yes, on Kubernetes **Additional context** spark configuration ``` conf = pyspark.SparkConf()\ .set("spark.executor.instances", number_of_executors)\ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\ .set("spark.sql.catalogImplementation", "hive")\ .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")\ .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")\ .set("spark.jars.packages", "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1")\ .set("spark.hadoop.fs.auth.grabfs.detailed.error.msg", "true")\ .set("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") ``` flink hudi table configuration: ``` WITH ( 'hoodie.clustering.plan.strategy.daybased.lookback.partitions' = '2', 'compaction.schedule.enabled' = 'true', 'clustering.plan.strategy.sort.columns' = 'country_code,card_bin,ip_address,passenger_id,request_id,booking_code,rule_id,version_id', 'hoodie.datasource.write.reconcile.schema' = 'true', 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.InProcessLockProvider', 'hoodie.schema.on.read.enable' = 'true', 'clustering.plan.strategy.small.file.limit' = '100', 'hoodie.parquet.max.file.size' = '26214400', 'path' = 's3a://location_flink_is_writing_to', 'hoodie.datasource.write.hive_style_partitioning' = 'true', 'hoodie.cleaner.hours.retained' = '1', 'write.merge.max_memory' = '512', 'hoodie.parquet.small.file.limit' = '13107200', 'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS', 'write.operation' = 'insert', 'write.insert.cluster' = 'false', 'table.table' = 'COPY_ON_WRITE', 'precombine.field' = 'event_datetime_local', 'write.task.max.size' = '1024', 'hoodie.clustering.async.max.commits' = '1', 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control', 'clustering.tasks' = '30', 'connector' = 'hudi', 'clustering.async.enabled' = 'true', 'hoodie.cleaner.policy.failed.writes' = 'LAZY', 'clustering.plan.partition.filter.mode' = 'RECENT_DAYS', 'hoodie.copyonwrite.record.size.estimate' = '104857600', 'clustering.schedule.enabled' = 'true', 'write.sort.memory' = '512' ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org