bvaradar commented on code in PR #8376:
URL: https://github.com/apache/hudi/pull/8376#discussion_r1165017510


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java:
##########
@@ -63,6 +63,17 @@ public class KafkaSourceConfig extends HoodieConfig {
       .defaultValue(5000000L)
       .withDocumentation("Maximum number of records obtained in each batch.");
 
+  // the documentation is copied from the minPartition definition of kafka 
structured streaming
+  public static final ConfigProperty<Long> KAFKA_SOURCE_MIN_PARTITIONS = 
ConfigProperty
+          .key(PREFIX + "minPartitions")
+          .defaultValue(0L)
+          .withDocumentation("Desired minimum number of partitions to read 
from Kafka. "
+              + "By default, Spark has a 1-1 mapping of topicPartitions to 
Spark partitions consuming from Kafka. "

Review Comment:
   Replace Spark with Hudi in the config documentation 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -99,6 +102,19 @@ public static String offsetsToStr(OffsetRange[] ranges) {
       return sb.toString();
     }
 
+    /**
+     * Stringfy the offset ranges, used for logging
+     * Format: topic,0:fromOffset0->toOffset0,1:fromOffset1->toOffset1,...
+     */
+    public static String offsetsStringfy(OffsetRange[] ranges) {

Review Comment:
   Instead of this method, can we simply use Arrays.toString(ranges) in the 
caller.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -148,9 +166,58 @@ public static OffsetRange[] 
computeOffsetRanges(Map<TopicPartition, Long> fromOf
         }
       }
 
+      LOG.info("before split, range count " + ranges.length + ", detail 
ranges: " + CheckpointUtils.offsetsStringfy(ranges));
+      ranges = CheckpointUtils.splitRangesToMinPartitions(ranges, 
minPartitions);
+      LOG.info("after split, range count " + ranges.length + ", detail ranges: 
" + CheckpointUtils.offsetsStringfy(ranges));
+
       return ranges;
     }
 
+    // the number of the result ranges can be less or more than minPartitions 
due to rounding
+    public static OffsetRange[] splitRangesToMinPartitions(OffsetRange[] 
oldRanges, long minPartitions) {
+      if (minPartitions <= 0 || minPartitions <= oldRanges.length) {
+        return oldRanges;
+      }
+
+      List<OffsetRange> newRanges = new ArrayList<>();
+      // split offset ranges to smaller ones.
+      long totalSize = totalNewMessages(oldRanges);
+      for (OffsetRange range : oldRanges) {
+        TopicPartition tp = range.topicPartition();
+        long size = range.count();
+        long parts = Math.max(1, Math.round(1.0 * size / totalSize * 
minPartitions));
+        long remaining = size;
+        long startOffset = range.fromOffset();
+        for (int part = 0; part < parts; part++) {
+          long thisPartition = remaining / (parts - part);
+          remaining -= thisPartition;
+          long endOffset = Math.min(startOffset + thisPartition, 
range.untilOffset());
+          OffsetRange offsetRange = OffsetRange.create(tp, startOffset, 
endOffset);
+          if (offsetRange.count() > 0) {
+            newRanges.add(offsetRange);
+          }
+          startOffset = endOffset;
+        }
+      }
+      return newRanges.toArray(new OffsetRange[0]);
+    }
+
+    /**
+     * Merge ranges by partition, because we need to maintain the checkpoint 
with one offset range per topic partition.
+     * @param oldRanges to merge
+     * @return ranges merged by partition
+     */
+    public static OffsetRange[] mergeRangesByTp(OffsetRange[] oldRanges) {

Review Comment:
   INstead of Tp, lets use TopicPartition. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -148,9 +166,58 @@ public static OffsetRange[] 
computeOffsetRanges(Map<TopicPartition, Long> fromOf
         }

Review Comment:
   Can we have the allocation done without needing to resplit. Allocation 
algorithm can be 
   
   1. First allocate spark partitions uniformly across all TP, such that each 
spark partition reads min(events_remaining_in_tp, MAX_EVENTS_PER_BATCH/#tp). If 
no remaining data in a TP, we can skip it. Lets say, there are k spark 
partitions allocated this way. Lets say if "n" events are allocated, remaining 
= MAX_EVENTS_PER_BATCH - n
   2. If min_partitions > k, then if there "x" TPs where each tp has remaining 
data (after previous step allocation) , allocate 1 spark partition to each such 
topics with min(events_remaining_in_tp, remaining/x). Now remaining = remaining 
- prev_allocated.
   3. Repeat  step 2 until remaining == 0 or all topics are exhausted 
   
   Let me know if this makes sense ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to