[GitHub] [hudi] danny0405 commented on a diff in pull request #9117: [HUDI-6437] Fixing/optimizing record updates to RLI

2023-07-03 Thread via GitHub


danny0405 commented on code in PR #9117:
URL: https://github.com/apache/hudi/pull/9117#discussion_r1251402017


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##
@@ -257,6 +257,10 @@ private Option prepareRecord(HoodieRecord 
hoodieRecord) {
 recordsWritten++;
   } else {
 finalRecordOpt = Option.empty();
+// Clear the new location as the record was deleted
+hoodieRecord.unseal();
+hoodieRecord.clearNewLocation();
+hoodieRecord.seal();

Review Comment:
   Yeah, we should also update locations for mor logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #9117: [HUDI-6437] Fixing/optimizing record updates to RLI

2023-07-03 Thread via GitHub


danny0405 commented on code in PR #9117:
URL: https://github.com/apache/hudi/pull/9117#discussion_r1251401570


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -1161,25 +1165,36 @@ protected boolean 
validateTimelineBeforeSchedulingCompaction(Option inFl
* @param writeStatuses {@code WriteStatus} from the write operation
*/
   private HoodieData 
getRecordIndexUpdates(HoodieData writeStatuses) {
-// 1. List
-// 2. Reduce by key: accept keys only when new location is not
-return writeStatuses.map(writeStatus -> 
writeStatus.getWrittenRecordDelegates().stream()
-.map(recordDelegate -> Pair.of(recordDelegate.getRecordKey(), 
recordDelegate)))
-.flatMapToPair(Stream::iterator)
-.reduceByKey((recordDelegate1, recordDelegate2) -> {
-  if 
(recordDelegate1.getRecordKey().equals(recordDelegate2.getRecordKey())) {
-if (recordDelegate1.getNewLocation().isPresent() && 
recordDelegate1.getNewLocation().get().getFileId() != null) {
-  return recordDelegate1;
-} else if (recordDelegate2.getNewLocation().isPresent() && 
recordDelegate2.getNewLocation().get().getFileId() != null) {
-  return recordDelegate2;
+HoodiePairData recordKeyDelegatePairs = null;
+// if update partition path is true, chances that we might get two records 
(1 delete in older partition and 1 insert to new partition)
+// and hence we might have to do reduce By key before ingesting to RLI 
partition.
+if (dataWriteConfig.getRecordIndexUpdatePartitionPath()) {
+  recordKeyDelegatePairs = writeStatuses.map(writeStatus -> 
writeStatus.getWrittenRecordDelegates().stream()
+  .map(recordDelegate -> Pair.of(recordDelegate.getRecordKey(), 
recordDelegate)))
+  .flatMapToPair(Stream::iterator)
+  .reduceByKey((recordDelegate1, recordDelegate2) -> {
+if 
(recordDelegate1.getRecordKey().equals(recordDelegate2.getRecordKey())) {
+  if (recordDelegate1.getNewLocation().isPresent() && 
recordDelegate2.getNewLocation().isPresent()) {
+throw new HoodieIOException("Both version of records does not 
have location set. Record V1 " + recordDelegate1.toString()
++ ", Record V2 " + recordDelegate2.toString());
+  }
+  if (recordDelegate1.getNewLocation().isPresent()) {
+return recordDelegate1;
+  } else {
+// if record delegate 1 does not have location set, record 
delegate 2 should have location set.
+return recordDelegate2;
+  }
 } else {
-  // should not come here, one of the above must have a new 
location set
-  return null;
+  return recordDelegate1;
 }
-  } else {
-return recordDelegate1;
-  }
-}, 1)
+  }, Math.max(1, writeStatuses.getNumPartitions()));
+} else {

Review Comment:
   Math.max(1, xxx) does not make sense if the xxx is always >= 1, maybe just 
`writeStatuses.getNumPartitions()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org