nsivabalan commented on code in PR #9041: URL: https://github.com/apache/hudi/pull/9041#discussion_r1242967907
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java: ########## @@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates( return Arrays.asList(deleteRecord, getTaggedRecord(merged, Option.empty())).iterator(); } }); - return taggedUpdatingRecords.union(newRecords); + return taggedUpdatingRecords.union(taggedNewRecords); + } + + public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords( + HoodieData<HoodieRecord<R>> incomingRecords, + HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations, + boolean mayContainDuplicateLookup, + boolean shouldUpdatePartitionPath, + HoodieWriteConfig config, + HoodieTable table) { + final HoodieRecordMerger merger = config.getRecordMerger(); + + HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords = + incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), record)); + + // Pair of incoming record and the global location if meant for merged lookup in later stage + HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations + = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values() + .map(v -> { + final HoodieRecord<R> incomingRecord = v.getLeft(); + Option<HoodieRecordGlobalLocation> currentLocOpt = Option.ofNullable(v.getRight().orElse(null)); + if (currentLocOpt.isPresent()) { + HoodieRecordGlobalLocation currentLoc = currentLocOpt.get(); + boolean shouldPerformMergedLookUp = mayContainDuplicateLookup + || !Objects.equals(incomingRecord.getPartitionPath(), currentLoc.getPartitionPath()); + if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) { + return Pair.of(incomingRecord, currentLocOpt); + } else { + // - When update partition path is set to false, + // the incoming record will be tagged to the existing record's partition regardless of being equal or not. + // - When update partition path is set to true, + // the incoming record will be tagged to the existing record's partition + // when partition is not updated and the look-up won't have duplicates (e.g. COW, or using RLI). + return Pair.of((HoodieRecord<R>) getTaggedRecord( Review Comment: got it ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java: ########## @@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates( return Arrays.asList(deleteRecord, getTaggedRecord(merged, Option.empty())).iterator(); } }); - return taggedUpdatingRecords.union(newRecords); + return taggedUpdatingRecords.union(taggedNewRecords); + } + + public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords( + HoodieData<HoodieRecord<R>> incomingRecords, + HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations, + boolean mayContainDuplicateLookup, + boolean shouldUpdatePartitionPath, + HoodieWriteConfig config, + HoodieTable table) { + final HoodieRecordMerger merger = config.getRecordMerger(); + + HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords = + incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), record)); + + // Pair of incoming record and the global location if meant for merged lookup in later stage + HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations + = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values() + .map(v -> { + final HoodieRecord<R> incomingRecord = v.getLeft(); + Option<HoodieRecordGlobalLocation> currentLocOpt = Option.ofNullable(v.getRight().orElse(null)); + if (currentLocOpt.isPresent()) { + HoodieRecordGlobalLocation currentLoc = currentLocOpt.get(); + boolean shouldPerformMergedLookUp = mayContainDuplicateLookup + || !Objects.equals(incomingRecord.getPartitionPath(), currentLoc.getPartitionPath()); + if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) { + return Pair.of(incomingRecord, currentLocOpt); + } else { + // - When update partition path is set to false, + // the incoming record will be tagged to the existing record's partition regardless of being equal or not. + // - When update partition path is set to true, + // the incoming record will be tagged to the existing record's partition + // when partition is not updated and the look-up won't have duplicates (e.g. COW, or using RLI). + return Pair.of((HoodieRecord<R>) getTaggedRecord( + createNewHoodieRecord(incomingRecord, currentLoc, merger), Option.of(currentLoc)), + Option.empty()); + } + } else { + return Pair.of(getTaggedRecord(incomingRecord, Option.empty()), Option.empty()); + } + }); + return shouldUpdatePartitionPath + ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations, config, table) Review Comment: I see that for COW, if partition path does not change, we set empty as location. but for MOR, if partition path does not change, do you mean to say we will do a merge since there could be duplicates? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org