bvaradar commented on code in PR #8376: URL: https://github.com/apache/hudi/pull/8376#discussion_r1165017510
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java: ########## @@ -63,6 +63,17 @@ public class KafkaSourceConfig extends HoodieConfig { .defaultValue(5000000L) .withDocumentation("Maximum number of records obtained in each batch."); + // the documentation is copied from the minPartition definition of kafka structured streaming + public static final ConfigProperty<Long> KAFKA_SOURCE_MIN_PARTITIONS = ConfigProperty + .key(PREFIX + "minPartitions") + .defaultValue(0L) + .withDocumentation("Desired minimum number of partitions to read from Kafka. " + + "By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. " Review Comment: Replace Spark with Hudi in the config documentation ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java: ########## @@ -99,6 +102,19 @@ public static String offsetsToStr(OffsetRange[] ranges) { return sb.toString(); } + /** + * Stringfy the offset ranges, used for logging + * Format: topic,0:fromOffset0->toOffset0,1:fromOffset1->toOffset1,... + */ + public static String offsetsStringfy(OffsetRange[] ranges) { Review Comment: Instead of this method, can we simply use Arrays.toString(ranges) in the caller. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java: ########## @@ -148,9 +166,58 @@ public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOf } } + LOG.info("before split, range count " + ranges.length + ", detail ranges: " + CheckpointUtils.offsetsStringfy(ranges)); + ranges = CheckpointUtils.splitRangesToMinPartitions(ranges, minPartitions); + LOG.info("after split, range count " + ranges.length + ", detail ranges: " + CheckpointUtils.offsetsStringfy(ranges)); + return ranges; } + // the number of the result ranges can be less or more than minPartitions due to rounding + public static OffsetRange[] splitRangesToMinPartitions(OffsetRange[] oldRanges, long minPartitions) { + if (minPartitions <= 0 || minPartitions <= oldRanges.length) { + return oldRanges; + } + + List<OffsetRange> newRanges = new ArrayList<>(); + // split offset ranges to smaller ones. + long totalSize = totalNewMessages(oldRanges); + for (OffsetRange range : oldRanges) { + TopicPartition tp = range.topicPartition(); + long size = range.count(); + long parts = Math.max(1, Math.round(1.0 * size / totalSize * minPartitions)); + long remaining = size; + long startOffset = range.fromOffset(); + for (int part = 0; part < parts; part++) { + long thisPartition = remaining / (parts - part); + remaining -= thisPartition; + long endOffset = Math.min(startOffset + thisPartition, range.untilOffset()); + OffsetRange offsetRange = OffsetRange.create(tp, startOffset, endOffset); + if (offsetRange.count() > 0) { + newRanges.add(offsetRange); + } + startOffset = endOffset; + } + } + return newRanges.toArray(new OffsetRange[0]); + } + + /** + * Merge ranges by partition, because we need to maintain the checkpoint with one offset range per topic partition. + * @param oldRanges to merge + * @return ranges merged by partition + */ + public static OffsetRange[] mergeRangesByTp(OffsetRange[] oldRanges) { Review Comment: INstead of Tp, lets use TopicPartition. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java: ########## @@ -148,9 +166,58 @@ public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOf } Review Comment: Can we have the allocation done without needing to resplit. Allocation algorithm can be 1. First allocate spark partitions uniformly across all TP, such that each spark partition reads min(events_remaining_in_tp, MAX_EVENTS_PER_BATCH/#tp). If no remaining data in a TP, we can skip it. Lets say, there are k spark partitions allocated this way. Lets say if "n" events are allocated, remaining = MAX_EVENTS_PER_BATCH - n 2. If min_partitions > k, then if there "x" TPs where each tp has remaining data (after previous step allocation) , allocate 1 spark partition to each such topics with min(events_remaining_in_tp, remaining/x). Now remaining = remaining - prev_allocated. 3. Repeat step 2 until remaining == 0 or all topics are exhausted Let me know if this makes sense ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org