jlloh opened a new issue, #9256:
URL: https://github.com/apache/hudi/issues/9256

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   Yes
   
   - Join the mailing list to engage in conversations and get faster support at 
dev-subscr...@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   I have a Flink job (1.16) writing a Hudi COW table with async compaction. 
The way I am writing the job is:
   * Using the Table API to create the table
   * Subsequently doing an insert into the table
   ```scala
   // In Flink Scala
       val hudiTable = "my_table_name"
       val hudiTableDescriptor = buildHudiTableDescriptor()
       logger.info(s"Creating hudi table {} with descriptor: {}", hudiTable, 
hudiTableDescriptor.toString())
       bsTableEnv.createTable(hudiTable, hudiTableDescriptor)
       bsTableEnv.fromDataStream(d).select(getOrderedColumnExpr(): 
_*).insertInto(AppConfig.output.hudiSink.targetTable).execute()
   
   ```
   
   The table is registered as an external table in Hive, but Flink is not 
attached to any Hive catalogue. The table is registered in SparkSQL using:
   ```
   spark.sql("create table xxx.yyy using hudi location 
's3a://location_flink_is_writing_to' ").show()
   ```
   
   Right now, I want to try schema evolution and to add a column to the table. 
I am creating a savepoint, adding the column into the Flink Hudi DDL, and 
recreating the table, and subsequently inserting with the new column.
   
   On the Spark side, I have tried:
   1. Dropping the external table and recreating it after resuming my flink job 
from the savepoint with new columns (it still doesn't register the new column, 
probably because hoodie.properties is not being updated by Flink)
   2. Stop my job, trying to `ALTER TABLE ADD COLUMN ` from spark following 
https://hudi.apache.org/docs/schema_evolution/, i.e.:
   ```
   spark.sql("alter table xxx.yyy add columns (new_column 
ARRAY<STRING>)").show(100, False)
   ``` 
   and resuming my job from the savepoint but with the new columns above. 
   
   (1) doesn't end up registering a new column.
   (2) the column is added in the hive metastore but all values are still null, 
even new incoming rows. 
   
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a Flink job writing data to s3, by creating a hudi table with the 
table API and then doing an insert
   2. Register the table in Hive as an external table
   4. Restart the Flink job from a savepoint, adding in the new column into the 
DDL to run the create again and insert
   5. Try to add the column by either recreating the external table or by doing 
an `alter table add column` in Spark SQL
   6. Query table using Spark SQL using the hive table
   7. All nulls returned for new column
   ```
   spark.sql("""
   SELECT request_id, new_column
     from xxx.yyy
     where  concat(local_year,local_month, local_day) = '20230721'
       and new_column is not null
       limit 100
   """).show(100, False)
   
   +----------+-------------------+
   |request_id|new_column             |
   +----------+-------------------+
   +----------+-------------------+
   ```
   
   **Expected behavior**
   
   Expected to be able to add a column successfully through either method and 
for column to be populated with data and queryable.
   
   **Environment Description**
   
   * Hudi version : 0.13
   
   * Spark version : 3.3
   
   * Hive version : 2
   
   * Hadoop version:
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : Yes, on Kubernetes
   
   
   **Additional context**
   
   spark configuration
   ```
   conf = pyspark.SparkConf()\
       .set("spark.executor.instances", number_of_executors)\
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
       .set("spark.sql.catalogImplementation", "hive")\
       .set("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")\
       .set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")\
       .set("spark.jars.packages", 
"org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1")\
       .set("spark.hadoop.fs.auth.grabfs.detailed.error.msg", "true")\
       .set("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false")
   ```
   
   flink hudi table configuration:
   ```
   WITH (
     'hoodie.clustering.plan.strategy.daybased.lookback.partitions' = '2',
     'compaction.schedule.enabled' = 'true',
     'clustering.plan.strategy.sort.columns' = 
'country_code,card_bin,ip_address,passenger_id,request_id,booking_code,rule_id,version_id',
     'hoodie.datasource.write.reconcile.schema' = 'true',
     'hoodie.write.lock.provider' = 
'org.apache.hudi.client.transaction.lock.InProcessLockProvider',
     'hoodie.schema.on.read.enable' = 'true',
     'clustering.plan.strategy.small.file.limit' = '100',
     'hoodie.parquet.max.file.size' = '26214400',
     'path' = 's3a://location_flink_is_writing_to',
     'hoodie.datasource.write.hive_style_partitioning' = 'true',
     'hoodie.cleaner.hours.retained' = '1',
     'write.merge.max_memory' = '512',
     'hoodie.parquet.small.file.limit' = '13107200',
     'hoodie.cleaner.policy' = 'KEEP_LATEST_BY_HOURS',
     'write.operation' = 'insert',
     'write.insert.cluster' = 'false',
     'table.table' = 'COPY_ON_WRITE',
     'precombine.field' = 'event_datetime_local',
     'write.task.max.size' = '1024',
     'hoodie.clustering.async.max.commits' = '1',
     'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',
     'clustering.tasks' = '30',
     'connector' = 'hudi',
     'clustering.async.enabled' = 'true',
     'hoodie.cleaner.policy.failed.writes' = 'LAZY',
     'clustering.plan.partition.filter.mode' = 'RECENT_DAYS',
     'hoodie.copyonwrite.record.size.estimate' = '104857600',
     'clustering.schedule.enabled' = 'true',
     'write.sort.memory' = '512'
   )
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to