[ https://issues.apache.org/jira/browse/HUDI-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kong Wei resolved HUDI-6019. ---------------------------- > Kafka source support split by count > ----------------------------------- > > Key: HUDI-6019 > URL: https://issues.apache.org/jira/browse/HUDI-6019 > Project: Apache Hudi > Issue Type: New Feature > Components: deltastreamer, hudi-utilities > Reporter: Kong Wei > Assignee: Kong Wei > Priority: Major > Labels: pull-request-available > > For the kafka source, when pulling data from kafka, the default parallelism > is the number of kafka partitions, and the only way to increase the > parallelism (to speed up) is to add more kafka partitions. > There are cases: > # Pulling large amount of data from kafka (eg. maxEvents=100000000), but the > # of kafka partition is not enough, the procedure of the pulling will cost > too much of time, even worse can cause the executor OOM > # There is huge data skew between kafka partitions, the procedure of the > pulling will be blocked by the slowest partition > to solve those cases, I want to add a parameter > {{*hoodie.deltastreamer.source.kafka.per.partition.maxEvents*}} to control > the maxEvents in one kafka partition, default Long.MAX_VALUE means not trun > this feature on. > > For example, given hoodie.deltastreamer.kafka.source.maxEvents=10000000, 2 > kafka partitions: > the best case is pulling 5000000 events from each kafka partition, which may > take minutes to finish; > while worse case may be pulling 9000000 event from one partition, and pulling > 1000000 events from another one, which will take more time to finish due to > data skew. > > In this example, we set > {{hoodie.deltastreamer.source.kafka.per.partition.maxEvents=1000000, then we > will split the kafka source into at least 10 parts, each executor will > pulling at most 1000000 events from kafka, which will take the advantage of > parallelism.}} > {{}} > {{}} > {{**}} > 3 benefits of this feature: > # Avoid a single executor pulling a large amount of data and taking too long > ({*}avoid data skew{*}) > # Avoid a single executor pulling a large amount of data, use too much > memory or even OOM ({*}avoid OOM{*}) > # A single executor pulls a small amount of data, which can make full use of > the number of cores to improve concurrency, then reduce the time of the > pulling procedure ({*}increase parallelism{*}) > -- This message was sent by Atlassian Jira (v8.20.10#820010)