lokeshj1703 commented on code in PR #13622:
URL: https://github.com/apache/hudi/pull/13622#discussion_r2234066766


##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -42,14 +45,53 @@ public class HoodieDataUtils {
    * @return a Map containing the de-duplicated key-value pairs
    */
   public static <K, V> Map<K, V> dedupeAndCollectAsMap(HoodiePairData<K, V> 
pairData) {
-    // Deduplicate locally before shuffling to reduce data movement
+    // Map each pair to Option<Pair> to handle null keys uniformly
     // If there are multiple entries sharing the same key, use the incoming one
-    return pairData.reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
-            .collectAsList()
-            .stream()
-            .collect(Collectors.toMap(
-                    Pair::getKey,
-                    Pair::getValue
-            ));
+    return pairData.mapToPair(pair -> 
+        Pair.of(
+            Option.ofNullable(pair.getKey()), 
+            pair.getValue()
+        ))
+        .reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
+        .collectAsList()
+        .stream()
+        .collect(HashMap::new,
+            (map, pair) -> {
+              K key = pair.getKey().orElse(null);

Review Comment:
   Should we do null handling for key?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -447,7 +447,7 @@ public abstract HoodiePairData<String, 
HoodieRecord<HoodieMetadataPayload>> getR
    * @param partitionName The partition name where the secondary index records 
are stored
    * @return A collection of pairs where each key is a secondary key and the 
value is a set of record keys that are indexed by that secondary key
    */
-  public abstract HoodiePairData<String, Set<String>> 
getSecondaryIndexRecords(HoodieData<String> keys, String partitionName);
+  public abstract HoodiePairData<String, String> 
getSecondaryIndexRecords(HoodieData<String> keys, String partitionName);

Review Comment:
   We can update the javadoc for the method



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -831,49 +828,23 @@ public HoodiePairData<String, Set<String>> 
getSecondaryIndexRecords(HoodieData<S
     }
   }
 
-  private HoodiePairData<String, Set<String>> 
getSecondaryIndexRecordsV1(HoodieData<String> keys, String partitionName) {
+  private HoodiePairData<String, String> 
getSecondaryIndexRecordsV1(HoodieData<String> keys, String partitionName) {
     if (keys.isEmpty()) {
       return HoodieListPairData.eager(Collections.emptyList());
     }
 
-    Map<String, Set<String>> res = getRecordsByKeyPrefixes(keys, 
partitionName, false, SecondaryIndexKeyUtils::escapeSpecialChars)
-            .map(record -> {
-              if (!record.getData().isDeleted()) {

Review Comment:
   Why do we need to remove this check?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -42,14 +45,53 @@ public class HoodieDataUtils {
    * @return a Map containing the de-duplicated key-value pairs
    */
   public static <K, V> Map<K, V> dedupeAndCollectAsMap(HoodiePairData<K, V> 
pairData) {
-    // Deduplicate locally before shuffling to reduce data movement
+    // Map each pair to Option<Pair> to handle null keys uniformly

Review Comment:
   NIT: mapping to Option<Pair.Key>, V



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -831,49 +828,23 @@ public HoodiePairData<String, Set<String>> 
getSecondaryIndexRecords(HoodieData<S
     }
   }
 
-  private HoodiePairData<String, Set<String>> 
getSecondaryIndexRecordsV1(HoodieData<String> keys, String partitionName) {
+  private HoodiePairData<String, String> 
getSecondaryIndexRecordsV1(HoodieData<String> keys, String partitionName) {
     if (keys.isEmpty()) {
       return HoodieListPairData.eager(Collections.emptyList());
     }
 
-    Map<String, Set<String>> res = getRecordsByKeyPrefixes(keys, 
partitionName, false, SecondaryIndexKeyUtils::escapeSpecialChars)
-            .map(record -> {
-              if (!record.getData().isDeleted()) {
-                return 
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(record.getRecordKey());
-              }
-              return null;
-            })
-            .filter(Objects::nonNull)
-            .collectAsList()
-            .stream()
-            .collect(HashMap::new,
-                    (map, pair) -> map.computeIfAbsent(pair.getKey(), k -> new 
HashSet<>()).add(pair.getValue()),
-                    (map1, map2) -> map2.forEach((k, v) -> 
map1.computeIfAbsent(k, key -> new HashSet<>()).addAll(v)));
-
-
-    return HoodieListPairData.eager(
-            res.entrySet()
-                    .stream()
-                    .collect(Collectors.toMap(
-                            Map.Entry::getKey,
-                            entry -> 
Collections.singletonList(entry.getValue())
-                    ))
-    );
-  }
-
-  private HoodiePairData<String, Set<String>> 
getSecondaryIndexRecordsV2(HoodieData<String> secondaryKeys, String 
partitionName) {
+    return getRecordsByKeyPrefixes(keys, partitionName, false, 
SecondaryIndexKeyUtils::escapeSpecialChars)
+        .mapToPair(hoodieRecord -> 
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
+  }
+
+  private HoodiePairData<String, String> 
getSecondaryIndexRecordsV2(HoodieData<String> secondaryKeys, String 
partitionName) {

Review Comment:
   From what I checked this API is not getting called from v2 APIs. Its called 
only from v1 API and could be redundant?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieDataUtils.java:
##########
@@ -42,14 +45,53 @@ public class HoodieDataUtils {
    * @return a Map containing the de-duplicated key-value pairs
    */
   public static <K, V> Map<K, V> dedupeAndCollectAsMap(HoodiePairData<K, V> 
pairData) {
-    // Deduplicate locally before shuffling to reduce data movement
+    // Map each pair to Option<Pair> to handle null keys uniformly
     // If there are multiple entries sharing the same key, use the incoming one
-    return pairData.reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
-            .collectAsList()
-            .stream()
-            .collect(Collectors.toMap(
-                    Pair::getKey,
-                    Pair::getValue
-            ));
+    return pairData.mapToPair(pair -> 
+        Pair.of(
+            Option.ofNullable(pair.getKey()), 
+            pair.getValue()
+        ))
+        .reduceByKey((existing, incoming) -> incoming, 
pairData.deduceNumPartitions())
+        .collectAsList()
+        .stream()
+        .collect(HashMap::new,
+            (map, pair) -> {
+              K key = pair.getKey().orElse(null);
+              map.put(key, pair.getValue());
+            },
+            HashMap::putAll);
   }
-} 
\ No newline at end of file
+
+  /**
+   * Collects results of the pair data into a {@link Map<K, Set<V>>} where 
values with the same key
+   * are grouped into a set.
+   *
+   * @param pairData Hoodie Pair Data to be collected
+   * @param <K> type of the key
+   * @param <V> type of the value
+   * @return a Map containing keys mapped to sets of values
+   */
+  public static <K, V> Map<K, Set<V>> collectPairDataAsMap(HoodiePairData<K, 
V> pairData) {
+    // Map each pair to Option<Pair> to handle null keys uniformly
+    // If there are multiple entries sharing the same key, combine them into a 
set
+    return pairData.mapToPair(pair -> 
+        Pair.of(
+            Option.ofNullable(pair.getKey()), 
+            Collections.singleton(pair.getValue())
+        ))
+        .reduceByKey((set1, set2) -> {
+          Set<V> combined = new HashSet<>(set1);
+          combined.addAll(set2);
+          return combined;
+        }, pairData.deduceNumPartitions())
+        .collectAsList()
+        .stream()
+        .collect(HashMap::new,
+            (map, pair) -> {
+              K key = pair.getKey().orElse(null);

Review Comment:
   Should we throw exception if key is null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to