luffyd opened a new issue #1866:
URL: https://github.com/apache/hudi/issues/1866


   **_Tips before filing an issue_**
   
   - Have you gone through our 
[FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)? yes
   
   - Join the mailing list to engage in conversations and get faster support at 
dev-subscr...@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   I have a simple while(true) in which I have been committing data to hudi for 
MOR table. I was able to add successfully >1000 commits.
   But I noticed the table size kept growing continuously and no clean jobs 
have been run.
   
   Used hudi cli command `cleans show` to confirm this
   
   I did run `cleans run` and it cleaned lots of data. After the run data size 
became 500GB from 25TB .
   
   **To Reproduce**
   
   
   Steps to reproduce the behavior:
   
   Code snippet to run:
        val startTime = System.currentTimeMillis()
   
         val parallelism = options.getOrElse("parallelism", Math.max(2, 
upsertCount/100000).toString).toInt
         println("parallelism", parallelism)
         (inputDF
           .write
           .format("org.apache.hudi")
           .option(HoodieWriteConfig.TABLE_NAME, options.getOrElse("tableName", 
"facestest"))
           .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
           .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partitionKey")
           .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "nodeId")
           .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
           .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
           .option("hoodie.upsert.shuffle.parallelism", parallelism)//not the 
ideal way but works
           .option("hoodie.bulkinsert.shuffle.parallelism", parallelism)//not 
the ideal way but works
   
           .mode(SaveMode.Append)
           .save(getHudiPath(spark)))
   
         val endTime = System.currentTimeMillis()
         val diff = endTime - startTime
         timings = diff :: timings
         CloudWatchWriter.addTimeMetric("faceUpsertTimeForLoop_"+run, diff, 
spark.sparkContext.isLocal)
   
   **Expected behavior**
   
   Clean jobs should have ran.
   
   **Environment Description**
   
   * Hudi version : 0.5.3
   
   * Spark version : 2.4
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Adding some loggin and used hudi-cli to rectify the situation.
   
   I did run `cleans run` and it cleaned lots of data. After the run data size 
became 500GB from 25TB .
   
   I am guessing this line is resolving to false and clean up was never 
triggered.
   
https://github.com/apache/hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java#L354
   
   **Stacktrace**
   
   ***Hudi Trace logs***
   ```20/07/21 23:09:10 WARN IncrementalTimelineSyncFileSystemView: Incremental 
Sync of timeline is turned off or deemed unsafe. Will revert to full syncing
   20/07/21 23:09:10 INFO FileSystemViewHandler: TimeTakenMillis[Total=2647, 
Refresh=2646, handle=1, Check=0], Success=true, 
Query=basepath=s3%3A%2F%2Fchelan-dev-mock-faces%2FTestFacesUpserForLoop%2Feight&lastinstantts=20200721225935&timelinehash=44494fd19bd1c8ab3d67910abfd9
   06fe922a8c118ec2279044f586278833480b, 
Host=ip-10-0-1-147.us-west-2.compute.internal:41753, synced=true
   20/07/21 23:09:10 INFO CleanPlanner: No earliest commit to retain. No need 
to scan partitions !!
   20/07/21 23:09:10 INFO CleanActionExecutor: Nothing to clean here. It is 
already clean
   ```
   
   *** Some custom logging in the application***
   ```
    val metaClient = new 
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, 
getHudiPath(spark), true)
        println("metaClient.getActiveTimeline().countInstants()", 
metaClient.getActiveTimeline().countInstants())
        
println("metaClient.getCommitTimeline.filterCompletedInstants.countInstants()", 
metaClient.getCommitTimeline.filterCompletedInstants.countInstants())
        
println("metaClient.getCommitTimeline.filterCompletedAndCompactionInstants.countInstants()",
 
metaClient.getCommitTimeline.filterCompletedAndCompactionInstants().countInstants())
   ```
   
   Results for above:
   ```
   (metaClient.getActiveTimeline().countInstants(),33)
   (metaClient.getCommitTimeline.filterCompletedInstants.countInstants(),4)
   
(metaClient.getCommitTimeline.filterCompletedAndCompactionInstants.countInstants(),4)
   ```
    
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to