[ 
https://issues.apache.org/jira/browse/HUDI-860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152012#comment-17152012
 ] 

sivabalan narayanan commented on HUDI-860:
------------------------------------------

Here is a proposal.

High level idea is to avoid doing any record operation in the driver. 
 * MapPartitions() with one per hudi partition. 
 ** This mapPartition call will take in Iterator<HoodieRecords> and return an 
Iterator<BucketInfo>
 ** BucketInfo will contain \{PartitionPath, fileId, bucketType, totalRecords, 
weight(only for inserts)}
 ** All buckets for a partition can be populated within each mapPartitionCall 
itself and don't need any global stats. 
 ** I am not getting into impl of this mapPartition call since its mostly 
similar to what we have in UpsertPartitioner just that most of this calculation 
is done in the driver for now and we are moving it to executor with this 
proposal. 
 * Once the mapPartition() call returns to driver, we have all bucketInfo from 
all partitions. 
 * We need to generate few structures to assist in getPartition() call. 
 ** calculate total buckets
 ** Iterator through all buckets to do the following
 *** Assign global bucketNumbers. And populate bucketInfoMap which maps from 
bucketNumber to BucketInfo
 *** For update bucket, generate updateBucketFileIdToIndexMapping which maps 
fileId to bucketNumber.
 *** For insert bucket, generate perPartitionInsertBucketWeights. Map<String, 
List<Pair<Double, Integer>>>. Key is partitionPath. Value is a List of meta 
info about insert buckets. i.e. resp. weight and bucketNumber.
 *** For insert bucket, generate totalInsertsPerPartition (Map<String, Long>) , 
where key is partitionPath and value is totalInserts for the partition. 
 * getPartition(Object key)
 ** if update, look up in updateBucketFileIdToIndexMapping and return 
bucketNumber
 ** if insert, fetch all target buckets from perPartitionInsertBucketWeights 
for the resp. partition where record belongs to. Mod by total inserts for the 
partition(from totalInsertsPerPartition) and find the right bucket based on 
perPartitionInsertBucketWeights

> Ability to do small file handling without need for caching
> ----------------------------------------------------------
>
>                 Key: HUDI-860
>                 URL: https://issues.apache.org/jira/browse/HUDI-860
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: Writer Core
>            Reporter: Vinoth Chandar
>            Assignee: sivabalan narayanan
>            Priority: Blocker
>             Fix For: 0.6.0
>
>
> As of now, in upsert path,
>  * hudi builds a workloadProfile to understand total inserts and updates(with 
> location info) 
>  * Following which, small files info are populated
>  * Then buckets are populated with above info. 
>  * These buckets are later used when getPartition(Object key) is invoked in 
> UpsertPartitioner.
> In step1: to build global workload profile, we had to do an action on entire 
> JavaRDD<HoodieRecord>s in the driver and hudi does save the workload profile 
> as well. 
> For large write intensive batch jobs(COW types), caching this incurs 
> additional overhead. So, this effort is trying to see if we can avoid doing 
> this by some means. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to