vinothchandar commented on code in PR #13489:
URL: https://github.com/apache/hudi/pull/13489#discussion_r2202159530
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java:
##########
@@ -269,4 +271,58 @@ public Configuration hadoopConfiguration() {
public <T> JavaRDD<T> emptyRDD() {
return javaSparkContext.emptyRDD();
}
+
+ @Override
+ public <S extends Comparable<S>, V extends Comparable<V>, R> HoodieData<R>
processValuesOfTheSameShards(
+ HoodiePairData<S, V> data, SerializableFunction<Iterator<V>,
Iterator<R>> func, List<S> shardIndices, boolean preservesPartitioning) {
+ HoodiePairData<S, V> repartitionedData = rangeBasedRepartitionForEachKey(
+ data, shardIndices, 0.1, 100000, System.nanoTime());
+ return repartitionedData.values().mapPartitions(func,
preservesPartitioning);
+ }
+
+ /**
+ * Performs range-based repartitioning of data based on key distribution to
optimize partition sizes.
+ *
+ * <p>This method achieves efficient data distribution by:</p>
+ * <ol>
+ * <li><strong>Sampling:</strong> Samples a fraction of data for each key
to understand the distribution
+ * without processing the entire dataset</li>
+ * <li><strong>Range Analysis:</strong> Analyzes the sampled data to
determine optimal partition boundaries
+ * that ensure each partition contains a balanced number of keys</li>
+ * <li><strong>Repartitioning:</strong> Redistributes the original data
across partitions based on the
+ * computed range boundaries, ensuring keys within the same range are
co-located</li>
+ * <li><strong>Sorting:</strong> Sorts data within each partition for
efficient processing</li>
+ * </ol>
+ *
+ * <p>The method is particularly useful for: Balancing workload across
partitions for better parallel processing</p>
+ *
+ * @param data The input data as key-value pairs where keys are integers and
values are of type V
+ * @param shardIndices The set must cover all possible keys of the given data
+ * @param sampleFraction The fraction of data to sample for each key
(between 0 and 1).
+ * A higher fraction provides better distribution
analysis but increases sampling overhead.
+ * It typically should be smaller than 0.05 for large
datasets.
+ * @param maxKeyPerBucket The maximum number of keys allowed per partition
to prevent partition skew
+ * @param seed The random seed for reproducible sampling results
+ * @param <V> Type of the value in the input data (must be Comparable)
+ * @return A repartitioned and sorted HoodiePairData with optimized key
distribution across partitions
+ * @throws IllegalArgumentException if sampleFraction is not between 0 and 1
+ */
+ public <S extends Comparable<S>, V extends Comparable<V>> HoodiePairData<S,
V> rangeBasedRepartitionForEachKey(
Review Comment:
review this fully in the next iteration
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]