hudi-bot opened a new issue, #14573:
URL: https://github.com/apache/hudi/issues/14573
As of now, in upsert path,
* hudi builds a workloadProfile to understand total inserts and
updates(with location info)
* Following which, small files info are populated
* Then buckets are populated with above info.
* These buckets are later used when getPartition(Object key) is invoked in
UpsertPartitioner.
In step1: to build global workload profile, we had to do an action on entire
JavaRDD<HoodieRecord>s in the driver and hudi does save the workload profile as
well.
For large write intensive batch jobs(COW types), caching this incurs
additional overhead. So, this effort is trying to see if we can avoid doing
this by some means.
## JIRA info
- Link: https://issues.apache.org/jira/browse/HUDI-860
- Type: Improvement
- Epic: https://issues.apache.org/jira/browse/HUDI-1628
---
## Comments
06/Jul/20 13:16;shivnarayan;Here is a proposal on the New Partitioner.
High level idea is to avoid doing any record operation in the driver.
* MapPartitions() with one per hudi partition to collect all bucket infos.
** This mapPartition call will take in Iterator<HoodieRecords> and return
an Iterator<BucketInfo>
** BucketInfo will contain \{PartitionPath, fileId, bucketType,
totalRecords, weight(only for inserts)}
** All buckets for a partition can be populated within each
mapPartitionCall itself and don't need any global stats.
** I am not getting into impl of this mapPartition call since its mostly
similar to what we have in UpsertPartitioner just that most of this calculation
is done in the driver for now and we are moving it to executor with this
proposal.
* Once the mapPartition() call returns to driver, we have all bucketInfo
from all partitions.
* We need to generate few structures to assist in getPartition() call.
** calculate total buckets
** Iterator through all buckets to do the following
*** Assign global bucketNumbers. And populate bucketInfoMap which maps from
bucketNumber to BucketInfo
*** For update bucket, generate updateBucketFileIdToIndexMapping which maps
fileId to bucketNumber.
*** For insert bucket, generate perPartitionInsertBucketWeights.
Map<String, List<Pair<Double, Integer>>>. Key is partitionPath. Value is a List
of meta info about insert buckets. i.e. resp. weight and bucketNumber.
*** For insert bucket, generate totalInsertsPerPartition (Map<String,
Long>) , where key is partitionPath and value is totalInserts for the
partition.
* getPartition(Object key)
** if update, look up in updateBucketFileIdToIndexMapping and return
bucketNumber
** if insert, fetch all target buckets from perPartitionInsertBucketWeights
for the resp. partition where record belongs to. Mod by total inserts for the
partition(from totalInsertsPerPartition) and find the right bucket based on
perPartitionInsertBucketWeights;;;
---
16/Jul/20 11:05;shivnarayan;[~vinoth]: did you get a chance to look at this
proposal. ;;;
---
17/Jul/20 03:44;vinoth;>MapPartitions() with one per hudi partition to
collect all bucket infos.
>Once the mapPartition() call returns to driver, we have all bucketInfo from
all partitions.
This means we cannot avoid caching, right? if you do
RDD<HoodieRecord>.mapPartitions().collect() -> List<BucketInfo>. is this what
you are referring to?;;;
---
17/Jul/20 03:47;vinoth;What I mean is, we need to read the RDD again if we
convert to Iteratore<HoodieRecord> to Iterator<BucketInfo>.. We need to keep
this within getPartition() and we build all the bucketInfo dynamically on the
fly// ;;;
---
17/Jul/20 03:49;shivnarayan;we don't need to build global workload stats.
with mapPartitions will calculate buckets required for the corresponding hoodie
partition, and will return the info to driver. Driver will collect all such
info for all partitions to find total parallelism and then distribute the work
based. ;;;
---
17/Jul/20 03:50;shivnarayan;To determine total parallelism, driver needs to
understand how many buckets are required in total right. not sure how we can
completely delegate work to an executor and get it done w/o getting back to
driver.
;;;
---
17/Jul/20 03:58;vinoth;>and will return the info to driver. Driver will
collect all such info for all partitions to find total parallelism and then
distribute the work based.
This part needs a collect() correct? to return these to the driver? If we do
this, then its all futile.. ;;;
---
06/Oct/21 13:47;vinoth;[~guoyihua] this is a good one to get started on the
optimization pieces
;;;
---
06/Oct/21 23:59;guoyihua;Cool, I'll take a look.;;;
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]