nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r802157120
########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ########## @@ -362,28 +375,33 @@ private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayl return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map<String, HoodieMetadataFileInfo> combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map<String, HoodieMetadataFileInfo> combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>(); + + // First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } + // Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { - // If the filename wasnt present then we carry it forward - if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); - } else { - if (fileInfo.getIsDeleted()) { - // file deletion - combinedFileInfo.remove(filename); - } else { - // file appends. - combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); - }); - } - } + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { + combinedFileInfo.merge(key, fileInfo, + // Combine previous record w/ the new one, new records taking precedence over + // the old one + // + // NOTE: That if previous listing contains the file that is being deleted by the tombstone + // record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting + // listing as well as drop the tombstone itself. + // However, if file is not present in the previous record we have to persist tombstone + // record in the listing to make sure we carry forward information that this file + // was deleted. This special case could occur since the merging flow is 2-stage: + // - First we merge records from all of the delta log-files + // - Then we merge records from base-files with the delta ones (coming as a result + // of the previous step) + (oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); Review comment: guess prashanth meant to check the oldFileInfo. the validate you are referring to validates the new incoming filesystemMetadata. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ########## @@ -362,28 +361,33 @@ private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayl return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map<String, HoodieMetadataFileInfo> combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map<String, HoodieMetadataFileInfo> combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>(); + + // First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } + // Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { - // If the filename wasnt present then we carry it forward - if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); - } else { - if (fileInfo.getIsDeleted()) { - // file deletion - combinedFileInfo.remove(filename); - } else { - // file appends. - combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); - }); - } - } + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { + combinedFileInfo.merge(key, fileInfo, + // Combine previous record w/ the new one, new records taking precedence over + // the old one + // + // NOTE: That if previous listing contains the file that is being deleted by the tombstone + // record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting + // listing as well as drop the tombstone itself. + // However, if file is not present in the previous record we have to persist tombstone + // record in the listing to make sure we carry forward information that this file + // was deleted. This special case could occur since the merging flow is 2-stage: + // - First we merge records from all of the delta log-files + // - Then we merge records from base-files with the delta ones (coming as a result + // of the previous step) + (oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); Review comment: ok, let me take a stab to see if this is a valid scenario. this could be applicable only for hdfs. Let's say there is a concurrent write to a log file by two diff writers. one of them is doing a rollback and another is appending a log block. Let's say writer1 (who is doing a rollback) updates the log file first and gets size as 200 may be. And later writer2 appends to same log file and gets size as 300. Even though the order in which these writers appended to file could be writer1 followed by writer2. Its not guaranteed that the same order will be maintained when they reach metadata table. So, due to various reasons writer2 could complete its write earlier and could apply its changes to metadata first before writer1. wouldn't we revert to a smaller size in this case when writer1 updates metadata table w/ 200 as size. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org