BBency opened a new issue, #9094:
URL: https://github.com/apache/hudi/issues/9094

   **Problem Description**
   
   We have a MOR table which is partitioned by yearmonth(yyyyMM). We would like 
to trigger async clustering after doing the compaction at the end of the day so 
that we can stitch together small files into larger files. Async clustering for 
the table is failing. Below are the different approaches I tried and the error 
messages I got.
   
   **Hudi Config Used**
   
   ```
   "hoodie.table.name" -> hudiTableName,
   "hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.ComplexKeyGenerator",
   "hoodie.datasource.write.precombine.field" -> preCombineKey,
   "hoodie.datasource.write.recordkey.field" -> recordKey,
   "hoodie.datasource.write.operation" -> writeOperation,
   "hoodie.datasource.write.row.writer.enable" -> "true",
   "hoodie.datasource.write.reconcile.schema" -> "true",
   "hoodie.datasource.write.partitionpath.field" -> partitionColumnName,
   "hoodie.datasource.write.hive_style_partitioning" -> "true",
   "hoodie.bulkinsert.sort.mode" -> "GLOBAL_SORT",
   "hoodie.datasource.hive_sync.enable" -> "true",
   "hoodie.datasource.hive_sync.table" -> hudiTableName,
   "hoodie.datasource.hive_sync.database" -> databaseName,
   "hoodie.datasource.hive_sync.partition_fields" -> partitionColumnName,
   "hoodie.datasource.hive_sync.partition_extractor_class" -> 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
   "hoodie.datasource.hive_sync.use_jdbc" -> "false",
   "hoodie.combine.before.upsert" -> "true",
   "hoodie.index.type" -> "BLOOM",
   "spark.hadoop.parquet.avro.write-old-list-structure" -> "false"
   "hoodie.datasource.write.table.type" -> "MERGE_ON_READ"
   "hoodie.compact.inline" -> "false",
   "hoodie.compact.schedule.inline" -> "true",
   "hoodie.compact.inline.trigger.strategy" -> "NUM_COMMITS",
   "hoodie.compact.inline.max.delta.commits" -> "5",
   "hoodie.cleaner.policy" -> "KEEP_LATEST_COMMITS",
   "hoodie.cleaner.commits.retained" -> "3",
   "hoodie.clustering.async.enabled" -> "true",
   "hoodie.clustering.async.max.commits" -> "2",
   "hoodie.clustering.execution.strategy.class" -> 
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
   "hoodie.clustering.plan.strategy.sort.columns" -> recordKey,
   "hoodie.clustering.plan.strategy.small.file.limit" -> "67108864",
   "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "134217728",
   "hoodie.clustering.plan.strategy.max.bytes.per.group" -> "2147483648",
   "hoodie.clustering.plan.strategy.max.num.groups" -> "150",
   "hoodie.clustering.preserve.commit.metadata" -> "true"
   ```
   
   **Approaches Tried**
   
   1. Triggered a clustering job with running mode as scheduleAndExecute
   **Code Used**        
           
   ```     val hudiClusterConfig = new HoodieClusteringJob.Config
        hudiClusterConfig.basePath = <table-path>
        hudiClusterConfig.tableName = <table-name>
        hudiClusterConfig.runningMode = "scheduleAndExecute"
        hudiClusterConfig.retryLastFailedClusteringJob = true
        val configList: util.List[String] = new util.ArrayList()
        configList.add("hoodie.clustering.async.enabled=true")
        configList.add("hoodie.clustering.async.max.commits=2") 
configList.add("hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
        
configList.add("hoodie.clustering.plan.strategy.sort.columns=<sort-columns>")
        
configList.add("hoodie.clustering.plan.strategy.small.file.limit=67108864")
        
configList.add("hoodie.clustering.plan.strategy.target.file.max.bytes=134217728")
        
configList.add("hoodie.clustering.plan.strategy.max.bytes.per.group=2147483648")
        configList.add("hoodie.clustering.plan.strategy.max.num.groups=150")
        configList.add("hoodie.clustering.preserve.commit.metadata=true")
        hudiClusterConfig.configs = configList
        val hudiClusterJob = new HoodieClusteringJob(jsc, hudiClusterConfig)
        val clusterStatus = hudiClusterJob.cluster(1)
        println(clusterStatus)
    ```
   
    **Stacktrace**
   
   ShuffleMapStage 87 (sortBy at RDDCustomColumnsSortPartitioner.java:64) 
failed in 1.098 s due to Job aborted due to stage failure: task 0.0 in stage 
28.0 (TID 367) had a not serializable result: 
org.apache.avro.generic.GenericData$Record
   Serialization stack:
        - object not serializable (class: 
org.apache.avro.generic.GenericData$Record, value:
        
   2. Used the procedure run_clustering to schedule and trigger clustering. We 
found that the replacecommit created through the procedure run had lesser data 
compared to what it was created when scheduled from the code in approach 1
   **Code Used**
        
   ```query_run_clustering = f"call run_clustering(path => '{path}')"
       spark_df_run_clustering = spark.sql(query_run_clustering)
       spark_df_run_clustering.show()
   ```
   
   **Stacktrace**
   
   An error occurred while calling o97.sql.
        : org.apache.hudi.exception.HoodieClusteringException: Clustering 
failed to write to 
files:c94cb139-70cf-4195-ad87-c56527ab5ccf-0,bc2c65f1-39fc-4879-ba83-5003fc9757b0-0,7e699100-39a3-46f7-ac7d-42e9cfaad2e1-0,a6076357-8a7f-4ae1-b6ec-2dd509d9818e-0,9a6752a4-1bcb-4dfb-ad82-80877d07cbdc-0,e5573f8c-c5bc-45b4-a670-1bcd9257726d-0,b00372f1-bd6d-4e46-9add-0ceca84f005a-0,6eb6bc42-b086-4aa0-a899-0b0ff602b7bf-0,35a06cda-57df-457f-aa8c-4792fd52cf33-0,78c75d85-ab08-4e97-9127-6b350d07e8f8-0,18ed0a15-9d42-495b-a43c-140b08dbc852-0,e2f5f9da-0717-4b8e-95b3-09639f2fc4a9-0,700a07e2-2114-4d50-9673-0e3dc885da55-0,1836db85-1320-4ff8-8aea-fc5dbbe267c7-0,b6c0eb8a-fd1e-40e6-bc8c-3e3b6180d916-0,225b791e-ac7b-4a6d-a295-e547c3e6a558-0,e567f6fb-bf27-496a-9c67-d26a5824870e-0,7a40f1c3-c3f5-433f-9cb8-5773de8d9557-0,b4f336b9-6669-4510-a2eb-c300fdae2320-0,1f4ef584-c199-449a-ba82-19b79531432e-0,b3b06f51-32e5-4a94-9ffe-035c08ae7f50-0,debcc1fc-8a67-4a0b-8691-d28b96c0403a-0,c40a0b32-8394-4c0c-8d41-a58e247e44c9-0,942b69
 
c8-a292-4ba6-86a6-9c3e344a9cd6-0,80f06951-1497-4cca-861e-22addd451ddb-0,2eb68890-154a-4963-90fd-47a1a32dceaf-0,5f05cffc-7a4b-4817-8e3e-14905fd81b9b-0,1acba9bf-1ef8-40e8-8a1d-7a54ebc6387e-0,008fd3cc-987b-4855-8125-b5d0529a26a1-0,dfaf9d4c-f23e-49d4-98df-078622fb9383-0
                at 
org.apache.hudi.client.SparkRDDWriteClient.completeClustering(SparkRDDWriteClient.java:381)
   
   Would appreciate any help/ inputs
   
   **Expected behavior**
   
   Clustering should stitch together the smaller files
   
   **Environment Description**
   
   * Platform: AWS Glue v4.0
   
   * Hudi version : 0.12.1
   
   * Spark version : 3.3
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to