[ 
https://issues.apache.org/jira/browse/HUDI-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-7506:
---------------------------------
    Labels: pull-request-available  (was: )

> Compute offsetRanges based on eventsPerPartition allocated in each range 
> -------------------------------------------------------------------------
>
>                 Key: HUDI-7506
>                 URL: https://issues.apache.org/jira/browse/HUDI-7506
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: deltastreamer
>            Reporter: Vinish Reddy
>            Assignee: Vinish Reddy
>            Priority: Critical
>              Labels: pull-request-available
>
> The current logic for computing offset ranges is leading to skews and 
> negative offsets because of the way they are calculated. 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L144]
>  
> Problems faced. 
> 1. We are calculating eventsPerPartition based on available partitions that 
> are not exhausted this can lead to skews where one partition handles only 
> 1-10 messages and the remaining one handles 100K messages, the idea for 
> minPartitions is to increase the parallelism and ensure that each spark task 
> is reading approximately the same number of events. 
> 2. remainingPartitions can become negative when finalRanges exceeds the size 
> of minPartitions. 
> 3. Complicated fork in code when minPartitions > toOffsetsMap, this is not 
> required IMO and the default minPartitions can always fall back 
> toOffsetsMap.size(), this takes care of situations when the partitions 
> increase in kafka as well. 
>  
> New Approach
> 1. Find _eventsPerPartition_ which would be Math.max(1L, actualNumEvents / 
> minPartitions);
> 2. Keep computing offsetRanges unless allocatedEvents < actualNumEvents, 
> compute them in a round-robin manner and keep the upper limit of 
> _eventsPerPartition_ messages for each range.
> 3.  Return all the offsetRanges in the end after sorting them by partition 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to