nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r802157120



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -362,28 +375,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
     return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map<String, HoodieMetadataFileInfo> 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map<String, HoodieMetadataFileInfo> 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
     Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
+
+    // First, add all files listed in the previous record
     if (previousRecord.filesystemMetadata != null) {
       combinedFileInfo.putAll(previousRecord.filesystemMetadata);
     }
 
+    // Second, merge in the files listed in the new record
     if (filesystemMetadata != null) {
-      filesystemMetadata.forEach((filename, fileInfo) -> {
-        // If the filename wasnt present then we carry it forward
-        if (!combinedFileInfo.containsKey(filename)) {
-          combinedFileInfo.put(filename, fileInfo);
-        } else {
-          if (fileInfo.getIsDeleted()) {
-            // file deletion
-            combinedFileInfo.remove(filename);
-          } else {
-            // file appends.
-            combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-              return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-            });
-          }
-        }
+      validatePayload(type, filesystemMetadata);
+
+      filesystemMetadata.forEach((key, fileInfo) -> {
+        combinedFileInfo.merge(key, fileInfo,
+            // Combine previous record w/ the new one, new records taking 
precedence over
+            // the old one
+            //
+            // NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+            //       record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+            //       listing as well as drop the tombstone itself.
+            //       However, if file is not present in the previous record we 
have to persist tombstone
+            //       record in the listing to make sure we carry forward 
information that this file
+            //       was deleted. This special case could occur since the 
merging flow is 2-stage:
+            //          - First we merge records from all of the delta 
log-files
+            //          - Then we merge records from base-files with the delta 
ones (coming as a result
+            //          of the previous step)
+            (oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
       guess prashanth meant to check the oldFileInfo. the validate you are 
referring to validates the new incoming filesystemMetadata. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -362,28 +361,33 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadatat(HoodieMetadataPayl
     return filesystemMetadata.entrySet().stream().filter(e -> 
e.getValue().getIsDeleted() == isDeleted);
   }
 
-  private Map<String, HoodieMetadataFileInfo> 
combineFilesystemMetadata(HoodieMetadataPayload previousRecord) {
+  private Map<String, HoodieMetadataFileInfo> 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
     Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
+
+    // First, add all files listed in the previous record
     if (previousRecord.filesystemMetadata != null) {
       combinedFileInfo.putAll(previousRecord.filesystemMetadata);
     }
 
+    // Second, merge in the files listed in the new record
     if (filesystemMetadata != null) {
-      filesystemMetadata.forEach((filename, fileInfo) -> {
-        // If the filename wasnt present then we carry it forward
-        if (!combinedFileInfo.containsKey(filename)) {
-          combinedFileInfo.put(filename, fileInfo);
-        } else {
-          if (fileInfo.getIsDeleted()) {
-            // file deletion
-            combinedFileInfo.remove(filename);
-          } else {
-            // file appends.
-            combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-              return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-            });
-          }
-        }
+      validatePayload(type, filesystemMetadata);
+
+      filesystemMetadata.forEach((key, fileInfo) -> {
+        combinedFileInfo.merge(key, fileInfo,
+            // Combine previous record w/ the new one, new records taking 
precedence over
+            // the old one
+            //
+            // NOTE: That if previous listing contains the file that is being 
deleted by the tombstone
+            //       record (`IsDeleted` = true) in the new one, we simply 
delete the file from the resulting
+            //       listing as well as drop the tombstone itself.
+            //       However, if file is not present in the previous record we 
have to persist tombstone
+            //       record in the listing to make sure we carry forward 
information that this file
+            //       was deleted. This special case could occur since the 
merging flow is 2-stage:
+            //          - First we merge records from all of the delta 
log-files
+            //          - Then we merge records from base-files with the delta 
ones (coming as a result
+            //          of the previous step)
+            (oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : 
newFileInfo);

Review comment:
       ok, let me take a stab to see if this is a valid scenario. this could be 
applicable only for hdfs.
   
   Let's say there is a concurrent write to a log file by two diff writers. one 
of them is doing a rollback and another is appending a log block. Let's say 
writer1 (who is doing a rollback) updates the log file first and gets size as 
200 may be. And later writer2 appends to same log file and gets size as 300. 
Even though the order in which these writers appended to file could be writer1 
followed by writer2. Its not guaranteed that the same order will be maintained 
when they reach metadata table. So, due to various reasons writer2 could 
complete its write earlier and could apply its changes to metadata first before 
writer1. 
   
   wouldn't we revert to a smaller size in this case when writer1 updates 
metadata table w/ 200 as size. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to