hudi-bot opened a new issue, #14573:
URL: https://github.com/apache/hudi/issues/14573

   As of now, in upsert path,
    * hudi builds a workloadProfile to understand total inserts and 
updates(with location info) 
    * Following which, small files info are populated
    * Then buckets are populated with above info. 
    * These buckets are later used when getPartition(Object key) is invoked in 
UpsertPartitioner.
   
   In step1: to build global workload profile, we had to do an action on entire 
JavaRDD<HoodieRecord>s in the driver and hudi does save the workload profile as 
well. 
   
   For large write intensive batch jobs(COW types), caching this incurs 
additional overhead. So, this effort is trying to see if we can avoid doing 
this by some means. 
   
    
   
    
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-860
   - Type: Improvement
   - Epic: https://issues.apache.org/jira/browse/HUDI-1628
   
   
   ---
   
   
   ## Comments
   
   06/Jul/20 13:16;shivnarayan;Here is a proposal on the New Partitioner.
   
   High level idea is to avoid doing any record operation in the driver. 
    * MapPartitions() with one per hudi partition to collect all bucket infos.
    ** This mapPartition call will take in Iterator<HoodieRecords> and return 
an Iterator<BucketInfo>
    ** BucketInfo will contain \{PartitionPath, fileId, bucketType, 
totalRecords, weight(only for inserts)}
    ** All buckets for a partition can be populated within each 
mapPartitionCall itself and don't need any global stats. 
    ** I am not getting into impl of this mapPartition call since its mostly 
similar to what we have in UpsertPartitioner just that most of this calculation 
is done in the driver for now and we are moving it to executor with this 
proposal. 
    * Once the mapPartition() call returns to driver, we have all bucketInfo 
from all partitions. 
    * We need to generate few structures to assist in getPartition() call. 
    ** calculate total buckets
    ** Iterator through all buckets to do the following
    *** Assign global bucketNumbers. And populate bucketInfoMap which maps from 
bucketNumber to BucketInfo
    *** For update bucket, generate updateBucketFileIdToIndexMapping which maps 
fileId to bucketNumber.
    *** For insert bucket, generate perPartitionInsertBucketWeights. 
Map<String, List<Pair<Double, Integer>>>. Key is partitionPath. Value is a List 
of meta info about insert buckets. i.e. resp. weight and bucketNumber.
    *** For insert bucket, generate totalInsertsPerPartition (Map<String, 
Long>) , where key is partitionPath and value is totalInserts for the 
partition. 
    * getPartition(Object key)
    ** if update, look up in updateBucketFileIdToIndexMapping and return 
bucketNumber
    ** if insert, fetch all target buckets from perPartitionInsertBucketWeights 
for the resp. partition where record belongs to. Mod by total inserts for the 
partition(from totalInsertsPerPartition) and find the right bucket based on 
perPartitionInsertBucketWeights;;;
   
   ---
   
   16/Jul/20 11:05;shivnarayan;[~vinoth]: did you get a chance to look at this 
proposal. ;;;
   
   ---
   
   17/Jul/20 03:44;vinoth;>MapPartitions() with one per hudi partition to 
collect all bucket infos.
   >Once the mapPartition() call returns to driver, we have all bucketInfo from 
all partitions. 
   
   This means we cannot avoid caching, right? if you do 
RDD<HoodieRecord>.mapPartitions().collect() -> List<BucketInfo>. is this what 
you are referring to?;;;
   
   ---
   
   17/Jul/20 03:47;vinoth;What I mean is, we need to read the RDD again if we 
convert to Iteratore<HoodieRecord> to Iterator<BucketInfo>.. We need to keep 
this within getPartition() and we build all the bucketInfo dynamically on the 
fly// ;;;
   
   ---
   
   17/Jul/20 03:49;shivnarayan;we don't need to build global workload stats. 
with mapPartitions will calculate buckets required for the corresponding hoodie 
partition, and will return the info to driver. Driver will collect all such 
info for all partitions to find total parallelism and then distribute the work 
based. ;;;
   
   ---
   
   17/Jul/20 03:50;shivnarayan;To determine total parallelism, driver needs to 
understand how many buckets are required in total right. not sure how we can 
completely delegate work to an executor and get it done w/o getting back to 
driver. 
   
    ;;;
   
   ---
   
   17/Jul/20 03:58;vinoth;>and will return the info to driver. Driver will 
collect all such info for all partitions to find total parallelism and then 
distribute the work based.  
   This part needs a collect() correct? to return these to the driver? If we do 
this, then its all futile.. ;;;
   
   ---
   
   06/Oct/21 13:47;vinoth;[~guoyihua] this is a good one to get started on the 
optimization pieces
   
    ;;;
   
   ---
   
   06/Oct/21 23:59;guoyihua;Cool, I'll take a look.;;;


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to