vinothchandar commented on code in PR #13489:
URL: https://github.com/apache/hudi/pull/13489#discussion_r2202159530


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java:
##########
@@ -269,4 +271,58 @@ public Configuration hadoopConfiguration() {
   public <T> JavaRDD<T> emptyRDD() {
     return javaSparkContext.emptyRDD();
   }
+
+  @Override
+  public <S extends Comparable<S>, V extends Comparable<V>, R> HoodieData<R> 
processValuesOfTheSameShards(
+      HoodiePairData<S, V> data, SerializableFunction<Iterator<V>, 
Iterator<R>> func, List<S> shardIndices, boolean preservesPartitioning) {
+    HoodiePairData<S, V> repartitionedData = rangeBasedRepartitionForEachKey(
+        data, shardIndices, 0.1, 100000, System.nanoTime());
+    return repartitionedData.values().mapPartitions(func, 
preservesPartitioning);
+  }
+
+  /**
+   * Performs range-based repartitioning of data based on key distribution to 
optimize partition sizes.
+   *
+   * <p>This method achieves efficient data distribution by:</p>
+   * <ol>
+   *   <li><strong>Sampling:</strong> Samples a fraction of data for each key 
to understand the distribution
+   *       without processing the entire dataset</li>
+   *   <li><strong>Range Analysis:</strong> Analyzes the sampled data to 
determine optimal partition boundaries
+   *       that ensure each partition contains a balanced number of keys</li>
+   *   <li><strong>Repartitioning:</strong> Redistributes the original data 
across partitions based on the
+   *       computed range boundaries, ensuring keys within the same range are 
co-located</li>
+   *   <li><strong>Sorting:</strong> Sorts data within each partition for 
efficient processing</li>
+   * </ol>
+   *
+   * <p>The method is particularly useful for: Balancing workload across 
partitions for better parallel processing</p>
+   *
+   * @param data The input data as key-value pairs where keys are integers and 
values are of type V
+   * @param shardIndices The set must cover all possible keys of the given data
+   * @param sampleFraction The fraction of data to sample for each key 
(between 0 and 1).
+   *                       A higher fraction provides better distribution 
analysis but increases sampling overhead.
+   *                       It typically should be smaller than 0.05 for large 
datasets.
+   * @param maxKeyPerBucket The maximum number of keys allowed per partition 
to prevent partition skew
+   * @param seed The random seed for reproducible sampling results
+   * @param <V> Type of the value in the input data (must be Comparable)
+   * @return A repartitioned and sorted HoodiePairData with optimized key 
distribution across partitions
+   * @throws IllegalArgumentException if sampleFraction is not between 0 and 1
+   */
+  public <S extends Comparable<S>, V extends Comparable<V>> HoodiePairData<S, 
V> rangeBasedRepartitionForEachKey(

Review Comment:
   review this fully in the next iteration



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to