Hi vinoth,


Thanks for reviewing the initial design :)
I know there are many problems at present(e.g shuffling, parallelism issue). We 
can discussed the practicability of the idea first.


> ExternalSpillableMap itself was not the issue right, the serialization was
Right, the new design will not have this issue, because will not use it at all.


> This map is also used on the query side
Right, the proposal aims to improve the merge performance of cow table.


> HoodieWriteClient.java#L546 We cannot collect() the recordRDD at all ... OOM 
> driver
Here, in order to get the Map<fileId, partition>, had executed distinct() 
before collect(), the result is very small.
Also, it can be implemented in FileSystemViewManager, and lazy loading also ok.


> Doesn't this move the problem to tuning spark simply?
there are two serious performance problems in the old merge logic.
1, when upsert many records, it will serialize record to disk, then deserialize 
it when merge old record
2, only single thread comsume the old record one by one, then handle the merge 
process, it is much less efficient.   


> doing a sort based merge repartitionAndSortWithinPartitions
Trying to understand your point :) 


Compare to old version, may there are serveral improvements
1. use spark built-in operators, it's easier to understand.
2. during my testing, the upsert performance doubled.
3. if possible, we can write data in batch by using Dataframe in the futher.


[1] 
https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java


Best,
Lamber-Ken









At 2020-02-29 01:40:36, "Vinoth Chandar" <vin...@apache.org> wrote:
>Does n't this move the problem to tuning spark simply? the
>ExternalSpillableMap itself was not the issue right, the serialization
>was.  This map is also used on the query side btw, where we need something
>like that.
>
>I took a pass at the code. I think we are shuffling data again for the
>reduceByKey step in this approach? For MOR, note that this is unnecessary
>since we simply log the. records and there is no merge. This approach might
>have a better parallelism of merging when that's costly.. But ultimately,
>our write parallelism is limited by number of affected files right?  So its
>not clear to me, that this would be a win always..
>
>On the code itself,
>https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java#L546
> We cannot collect() the recordRDD at all.. It will OOM the driver .. :)
>
>Orthogonally, one thing we think of is : doing a sort based merge.. i.e
>repartitionAndSortWithinPartitions()  the input records to mergehandle, and
>if the file is also sorted on disk (its not today), then we can do a
>merge_sort like algorithm to perform the merge.. We can probably write code
>to bear one time sorting costs... This will eliminate the need for memory
>for merging altogether..
>
>On Wed, Feb 26, 2020 at 10:11 PM lamberken <lamber...@163.com> wrote:
>
>>
>>
>> hi, vinoth
>>
>>
>> > What do you mean by spark built in operators
>> We may can not depency on ExternalSpillableMap again when upsert to cow
>> table.
>>
>>
>> > Are you suggesting that we perform the merging in sql
>> No, just only use spark built-in operators like mapToPair, reduceByKey etc
>>
>>
>> Details has been described in this article[1], also finished draft
>> implementation and test.
>> mainly modified HoodieWriteClient#upsertRecordsInternal method.
>>
>>
>> [1]
>> https://docs.google.com/document/d/1-EHHfemtwtX2rSySaPMjeOAUkg5xfqJCKLAETZHa7Qw/edit?usp=sharing
>> [2]
>> https://github.com/BigDataArtisans/incubator-hudi/blob/new-cow-merge/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
>>
>>
>>
>> At 2020-02-27 13:45:57, "Vinoth Chandar" <vin...@apache.org> wrote:
>> >Hi lamber-ken,
>> >
>> >Thanks for this. I am not quite following the proposal. What do you mean
>> by
>> >spark built in operators? Dont we use the RDD based spark operations.
>> >
>> >Are you suggesting that we perform the merging in sql? Not following.
>> >Please clarify.
>> >
>> >On Wed, Feb 26, 2020 at 10:08 AM lamberken <lamber...@163.com> wrote:
>> >
>> >>
>> >>
>> >> Hi guys,
>> >>
>> >>
>> >> Motivation
>> >> Impove the merge performance for cow table when upsert, handle merge
>> >> operation by using spark built-in operators.
>> >>
>> >>
>> >> Background
>> >> When do a upsert operation, for each bucket, hudi needs to put new input
>> >> elements to memory cache map, and will
>> >> need an external map that spills content to disk when there is
>> >> insufficient space for it to grow.
>> >>
>> >>
>> >> There are several performance issuses:
>> >> 1. We may need an external disk map, serialize / deserialize records
>> >> 2. Only single thread do the I/O operation when check
>> >> 3. Can't take advantage of built-in spark operators
>> >>
>> >>
>> >> Based on above, reworked the merge logic and done draft test.
>> >> If you are also interested in this, please go ahead with this doc[1],
>> any
>> >> suggestion are welcome. :)
>> >>
>> >>
>> >>
>> >>
>> >> Thanks,
>> >> Lamber-Ken
>> >>
>> >>
>> >> [1]
>> >>
>> https://docs.google.com/document/d/1-EHHfemtwtX2rSySaPMjeOAUkg5xfqJCKLAETZHa7Qw/edit?usp=sharing
>> >>
>> >>
>>

Reply via email to