Davis-Zhang-Onehouse commented on code in PR #13622:
URL: https://github.com/apache/hudi/pull/13622#discussion_r2234118429


##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -42,14 +45,53 @@ public class HoodieDataUtils {
    * @return a Map containing the de-duplicated key-value pairs
    */
   public static <K, V> Map<K, V> dedupeAndCollectAsMap(HoodiePairData<K, V> 
pairData) {
-    // Deduplicate locally before shuffling to reduce data movement
+    // Map each pair to Option<Pair> to handle null keys uniformly
     // If there are multiple entries sharing the same key, use the incoming one
-    return pairData.reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
-            .collectAsList()
-            .stream()
-            .collect(Collectors.toMap(
-                    Pair::getKey,
-                    Pair::getValue
-            ));
+    return pairData.mapToPair(pair -> 
+        Pair.of(
+            Option.ofNullable(pair.getKey()), 
+            pair.getValue()
+        ))
+        .reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
+        .collectAsList()
+        .stream()
+        .collect(HashMap::new,
+            (map, pair) -> {
+              K key = pair.getKey().orElse(null);
+              map.put(key, pair.getValue());
+            },
+            HashMap::putAll);
   }
-} 
\ No newline at end of file
+
+  /**
+   * Collects results of the pair data into a {@link Map<K, Set<V>>} where 
values with the same key
+   * are grouped into a set.
+   *
+   * @param pairData Hoodie Pair Data to be collected
+   * @param <K> type of the key
+   * @param <V> type of the value
+   * @return a Map containing keys mapped to sets of values
+   */
+  public static <K, V> Map<K, Set<V>> collectPairDataAsMap(HoodiePairData<K, 
V> pairData) {
+    // Map each pair to Option<Pair> to handle null keys uniformly
+    // If there are multiple entries sharing the same key, combine them into a 
set
+    return pairData.mapToPair(pair -> 
+        Pair.of(
+            Option.ofNullable(pair.getKey()), 
+            Collections.singleton(pair.getValue())
+        ))
+        .reduceByKey((set1, set2) -> {
+          Set<V> combined = new HashSet<>(set1);
+          combined.addAll(set2);
+          return combined;
+        }, pairData.deduceNumPartitions())
+        .collectAsList()
+        .stream()
+        .collect(HashMap::new,
+            (map, pair) -> {
+              K key = pair.getKey().orElse(null);

Review Comment:
   no, the use case requires null value is processed as normal value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to