hudi-bot opened a new issue, #16547:
URL: https://github.com/apache/hudi/issues/16547

   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-7975
   - Type: Improvement
   - Fix version(s):
     - 0.16.0
     - 1.1.0
   
   
   ---
   
   
   ## Comments
   
   16/Sep/24 22:45;shivnarayan;If we characterize different workloads, they 
fall into below categories. 
   
   1. pure bulk inserts
   2. bulk inserts + clustering. 
   3. bulk insert once and then insert. 
   4. bulk insert once and then upserts. 
   5. inserts. 
   6. upserts. 
   
   
   - we want to add some marker in the timeline that we took care of cleaning 
up until X instant in the timeline. 
   Why not we make an intelligent guess based on last few commits. 
   
   say someone configured num_commits based cleaning and set the config to say 
25. 
   We can check the last 50 (2x of num_commits config value). 
   - If all latest 50 are bulk inserts w/o any other operation types, we can 
assume its purely bulk insert pipeline. and ignore clean scheduling only. 
   - If at all we find any diff write operation other than bulk insert, then we 
do regular clean planning. 
   If the total active entries in the timeline is < 50, again, we trigger 
regular clean scheduling. 
   2x is mainly to account for "bulk insert + clustering". bcox, file groups 
replaced by the clustering are not immediately available to be cleaned up. We 
can only clean after 25 commits (in this context). So, we consider last 2X (or 
2X + 5) commits to determine if we really need to trigger clean schedule or 
not. 
   
   so, above logic will pan out as below for above 6 scenarios. 
   1: for first 50 commits, clean planning will kick in as usual. After that, 
no clean scheduling will trigger. 
   2: for first 50 commits, clean planning will kick in as usual. and then an 
actual clean might be seen for the clustering commit. once we have a clean in 
the timeline, then incremental cleaner will hold on to the boundary. Depending 
on the cadence of clustering, cleans will be added to timeline. 
   3,4,5,6: regular cleans will happen. 
   
    ;;;
   
   ---
   
   18/Sep/24 00:13;krishen;Thanks for sharing this example, I had a concern 
about there being sudden "large" cleans for a dataset with insert + bulk-insert 
workload. 
   As an example, assume that num_commits  for clean is 25 and num_commits for 
archival is 75, and due to delayed clean+archival the active timeline is in the 
following state.
   (By the way, I am assuming that in the above proposal HUDI will do clean 
planning as usual if it sees at least one replacecommit/cluster, since that 
would be inexpensive to add. )
   
    
   {code:java}
   ECTR = C1
   [c1.commit, c2.commit . . . c10.commit, . . . c99.commit]
   where c1 to c10 are inserts, and c11 - c99 are bulk inserts{code}
   if we apply this proposal to 0.x,  my understanding is that the following 
can happen:
    # Clean sees that all the last 50-55 (2 * num_commits_clean + 5) are all 
bulk inserts, and doesn't schedule clean
    # Archival then runs, and archives c1 to c24. Since there is no 
replace/inflight/savepoint blocking it
   
    
   
   Now timeline is in following state
   {code:java}
   ECTR = C1 
   [c25 . . c99.commit] 
   where c25 - c99 are bulk inserts{code}
   the dataset is not inconsistent, but has partitions with old file versions
   
    
   
   Now lets say time passes and 50 more commit instants get added to timeline, 
but one of the new instants is an insert instead of a bulk insert
   {code:java}
   ECTR = C1 
   [c25 . . c99.commit, c100.commit . . . c124.commit, . . . c149.commit] 
   Where c124 is an insert but all other new instants are bulk inserts{code}
   The next time clean runs, it will find c124 was an insert has left 
to-be-cleaned files in dataset (since c124 was in the lookback window of 50-55 
instants) and will correctly schedule clean, targeting all instants before 
c125. Because the current ECTR C1 is not in active timeline though, it will do 
a "full scan" clean and read all partitions in the dataset and determine which 
are subject to clean. The final list of partitions to process includes not only 
partitions affected by C124, but also partitions affected by c1 to c10.
   
   The concern I had is that this occasional "larger" clean may take up a lot 
of time from writer, causing spikes in (resource x time) that may be 
unpredictable from perspective of the user. (From personal experience with 0.10 
I've seen it cause OOM issues, but for now for simplicity's sake I'll assume 
many of those are already fixed in later hudi versions so I won't bring it up 
as a concern here). And the longer this clean scheduling is deferred, the more 
partitions/files have to be processed+cleaned potentially by this next full 
scan clean. The reason why I am focusing on full scan clean is that afaik this 
full scan clean cannot be "split" across multiple clean instants (that each 
have a "bound" of files/partitions to process). This makes orchestration a bit 
tricker in my opinion, as now you may have to have your ingestion writer block 
out time/resources to do these occasional large cleans, and make sure the 
timeouts/resources you set for the clean writer are enough to account for 
 this case
   
   My understanding based on our offline discussion is that there are two 
issues we would like to resolve with this proposal 
    - The issue I brought up of having an occasional full scan expensive clean 
on a insert-only dataset whenever theres a surge of non-insert instants
    - The optimization you highlighted where HUDI clean planning should reduce 
latency for cleans on insert-only low-latency workloads by not having to read 
every single .commit metadata file (1 I/O + deserialization call per file in 
active write timeline)
   
   Just thinking out loud, I was thinking from here would could explore two 
avenues
    - Continue with your proposal here, but also look into optimizations for 
"full scan" clean, such as allowing a user to schedule & execute multiple 
"partial" cleans that each have a bound. This way a large clean backlog can be 
gradually worked on over the course of multiple jobs (so if a low-latency 
ingestion writer doesn't have many resources, after each write commit it can do 
a smaller clean that will make some progress).
   
    - Store a subset (or subsequence rather) of all instants in the active 
timeline that have updated/replaced a file group (replacecommit, update, small 
file handling, etc) to some file. This will be updated anytime there is a write 
or clean, so it does not need to be computed from reading all instants on the 
timeline. The idea is that using this archival can efficiently block on the 
earliest instant that hasn't been cleaned (lessening the archival -> clean 
dependency that you were concerned about during our offline discussion). And 
clean planner can efficiently see wether or not it should schedule a clean. 
Since now both table services will no longer need to read metadata file of 
every insert-only commit file in the timeline, it should be feasible for a 
low-latency ingestion writer to do.
   
    ;;;
   
   ---
   
   15/Oct/24 03:29;yihua;Deferring this task to Hudi 1.1 as we are still 
discussing the right approach.;;;


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to