[ 
https://issues.apache.org/jira/browse/HUDI-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kong Wei resolved HUDI-6019.
----------------------------

> Kafka source support split by count
> -----------------------------------
>
>                 Key: HUDI-6019
>                 URL: https://issues.apache.org/jira/browse/HUDI-6019
>             Project: Apache Hudi
>          Issue Type: New Feature
>          Components: deltastreamer, hudi-utilities
>            Reporter: Kong Wei
>            Assignee: Kong Wei
>            Priority: Major
>              Labels: pull-request-available
>
> For the kafka source, when pulling data from kafka, the default parallelism 
> is the number of kafka partitions, and the only way to increase the 
> parallelism (to speed up) is to add more kafka partitions.
> There are cases:
>  # Pulling large amount of data from kafka (eg. maxEvents=100000000), but the 
> # of kafka partition is not enough, the procedure of the pulling will cost 
> too much of time, even worse can cause the executor OOM
>  # There is huge data skew between kafka partitions, the procedure of the 
> pulling will be blocked by the slowest partition
> to solve those cases, I want to add a parameter 
> {{*hoodie.deltastreamer.source.kafka.per.partition.maxEvents*}} to control 
> the maxEvents in one kafka partition, default Long.MAX_VALUE means not trun 
> this feature on.
>  
> For example, given hoodie.deltastreamer.kafka.source.maxEvents=10000000, 2 
> kafka partitions:
> the best case is pulling 5000000 events from each kafka partition, which may 
> take minutes to finish;
> while worse case may be pulling 9000000 event from one partition, and pulling 
> 1000000 events from another one, which will take more time to finish due to 
> data skew.
>  
> In this example, we set 
> {{hoodie.deltastreamer.source.kafka.per.partition.maxEvents=1000000, then we 
> will split the kafka source into at least 10 parts, each executor will 
> pulling at most 1000000 events from kafka, which will take the advantage of 
> parallelism.}}
> {{}}
> {{}}
> {{**}}
> 3 benefits of this feature:
>  # Avoid a single executor pulling a large amount of data and taking too long 
> ({*}avoid data skew{*})
>  # Avoid a single executor pulling a large amount of data, use too much 
> memory or even OOM ({*}avoid OOM{*})
>  # A single executor pulls a small amount of data, which can make full use of 
> the number of cores to improve concurrency, then reduce the time of the 
> pulling procedure ({*}increase parallelism{*})
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to