[ https://issues.apache.org/jira/browse/HUDI-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated HUDI-7506: --------------------------------- Labels: pull-request-available (was: ) > Compute offsetRanges based on eventsPerPartition allocated in each range > ------------------------------------------------------------------------- > > Key: HUDI-7506 > URL: https://issues.apache.org/jira/browse/HUDI-7506 > Project: Apache Hudi > Issue Type: Improvement > Components: deltastreamer > Reporter: Vinish Reddy > Assignee: Vinish Reddy > Priority: Critical > Labels: pull-request-available > > The current logic for computing offset ranges is leading to skews and > negative offsets because of the way they are calculated. > [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L144] > > Problems faced. > 1. We are calculating eventsPerPartition based on available partitions that > are not exhausted this can lead to skews where one partition handles only > 1-10 messages and the remaining one handles 100K messages, the idea for > minPartitions is to increase the parallelism and ensure that each spark task > is reading approximately the same number of events. > 2. remainingPartitions can become negative when finalRanges exceeds the size > of minPartitions. > 3. Complicated fork in code when minPartitions > toOffsetsMap, this is not > required IMO and the default minPartitions can always fall back > toOffsetsMap.size(), this takes care of situations when the partitions > increase in kafka as well. > > New Approach > 1. Find _eventsPerPartition_ which would be Math.max(1L, actualNumEvents / > minPartitions); > 2. Keep computing offsetRanges unless allocatedEvents < actualNumEvents, > compute them in a round-robin manner and keep the upper limit of > _eventsPerPartition_ messages for each range. > 3. Return all the offsetRanges in the end after sorting them by partition -- This message was sent by Atlassian Jira (v8.20.10#820010)