[ 
https://issues.apache.org/jira/browse/HUDI-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-7518:
---------------------------------
    Description: 
When there are repeated duplicate deletes to the partition file list in files 
partition of the MDT, the current HoodieMetadataPayload merging logic drops 
such "deletion", causing the file that is deleted from the file system and 
supposed to be deleted from MDT file listing still left in MDT, because of the 
following logic:
{code:java}
private Map<String, HoodieMetadataFileInfo> 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
    Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();

    // First, add all files listed in the previous record
    if (previousRecord.filesystemMetadata != null) {
      combinedFileInfo.putAll(previousRecord.filesystemMetadata);
    }

    // Second, merge in the files listed in the new record
    if (filesystemMetadata != null) {
      validatePayload(type, filesystemMetadata);

      filesystemMetadata.forEach((key, fileInfo) -> {
        combinedFileInfo.merge(key, fileInfo,
            (oldFileInfo, newFileInfo) ->
                newFileInfo.getIsDeleted()
                    ? null
                    : new 
HoodieMetadataFileInfo(Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), 
false));
      });
    } {code}
Here's a concrete example of how this bug causes the ingestion to fail:

(1) A data file and file group are replaced by clustering.  The data file is 
still on the file system and in MDT file listing.

(2) A cleaner plan is generated to delete the data file.

(3) The cleaner plan is executed the first time, and fails before commit due to 
Spark job shutdown.

(4) The ingestion continues and succeeds, and another cleaner plan is generated 
containing the same data file/file group to delete.

(5) The first cleaner plan is successfully executed, incurring deletion to the 
file list with a metadata payload, and this is added to one log file in MDT, 
e.g.,
{code:java}
HoodieMetadataPayload {key=partition, type=2, Files: {creations=[], 
deletions=[7f6b146e-cd43-4fd3-9ce0-118232562569-0_63-29223-5579389_20240303214408245.parquet],
 }}{code}
(6) The second cleaner plan is also successfully executed, incurring deletion 
to the file list with a metadata payload containing the same data file to 
delete, and this is added to a subsequent log file in the same file slice in 
MDT, e.g.,
{code:java}
HoodieMetadataPayload {key=partition, type=2, Files: {creations=[], 
deletions=[7f6b146e-cd43-4fd3-9ce0-118232562569-0_63-29223-5579389_20240303214408245.parquet],
 }} {code}
(7) The replacecommit corresponds to the clustering is archived as the cleaner 
has deleted the replaced file groups.

(8) When reading MDT or MDT compaction happens, the merging of these two 
metadata payloads with identical deletes leads to empty deletion, so the data 
file is not deleted from the partition file list in MDT.  The expected behavior 
is to keep the data file in the "deletions" field.
{code:java}
HoodieMetadataPayload {key=partition, type=2, Files: {creations=[], 
deletions=[], }}{code}
(9) Next time, when doing upsert and indexing, the deleted data file is served 
by the file system view based on MDT (e.g., 
"7f6b146e-cd43-4fd3-9ce0-118232562569-0_63-29223-5579389_20240303214408245.parquet"),
 and the data file cannot be found on the file system, causing the ingestion to 
fail.

  was:
When there are repeated deletes to the partition file list in files partition 
of the MDT, the current HoodieMetadataPayload merging logic drops such 
"deletion", causing the file that is deleted from the file system and supposed 
to be deleted from MDT file listing still left in MDT, because of the following 
logic:
{code:java}
private Map<String, HoodieMetadataFileInfo> 
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
    Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();

    // First, add all files listed in the previous record
    if (previousRecord.filesystemMetadata != null) {
      combinedFileInfo.putAll(previousRecord.filesystemMetadata);
    }

    // Second, merge in the files listed in the new record
    if (filesystemMetadata != null) {
      validatePayload(type, filesystemMetadata);

      filesystemMetadata.forEach((key, fileInfo) -> {
        combinedFileInfo.merge(key, fileInfo,
            (oldFileInfo, newFileInfo) ->
                newFileInfo.getIsDeleted()
                    ? null
                    : new 
HoodieMetadataFileInfo(Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), 
false));
      });
    } {code}
Here's a concrete example of how this bug causes the ingestion to fail:

(1) A data file and file group are replaced by clustering.  The data file is 
still on the file system and in MDT file listing.

(2) A cleaner plan is generated to delete the data file.

(3) The cleaner plan is executed the first time, and fails before commit due to 
Spark job shutdown.

(4) The ingestion continues and succeeds, and another cleaner plan is generated 
containing the same data file/file group to delete.

(5) The first cleaner plan is successfully executed, incurring deletion to the 
file list with a metadata payload, and this is added to one log file in MDT, 
e.g.,
{code:java}
HoodieMetadataPayload {key=partition, type=2, Files: {creations=[], 
deletions=[7f6b146e-cd43-4fd3-9ce0-118232562569-0_63-29223-5579389_20240303214408245.parquet],
 }}{code}
(6) The second cleaner plan is also successfully executed, incurring deletion 
to the file list with a metadata payload containing the same data file to 
delete, and this is added to a subsequent log file in the same file slice in 
MDT, e.g.,
{code:java}
HoodieMetadataPayload {key=partition, type=2, Files: {creations=[], 
deletions=[7f6b146e-cd43-4fd3-9ce0-118232562569-0_63-29223-5579389_20240303214408245.parquet],
 }} {code}
(7) The replacecommit corresponds to the clustering is archived as the cleaner 
has deleted the replaced file groups.

(8) When reading MDT or MDT compaction happens, the merging of these two 
metadata payloads with identical deletes leads to empty deletion, so the data 
file is not deleted from the partition file list in MDT.  The expected behavior 
is to keep the data file in the "deletions" field.
{code:java}
HoodieMetadataPayload {key=partition, type=2, Files: {creations=[], 
deletions=[], }}{code}
(9) Next time, when doing upsert and indexing, the deleted data file is served 
by the file system view based on MDT (e.g., 
"7f6b146e-cd43-4fd3-9ce0-118232562569-0_63-29223-5579389_20240303214408245.parquet"),
 and the data file cannot be found on the file system, causing the ingestion to 
fail.


> Fix HoodieMetadataPayload merging logic around repeated deletes
> ---------------------------------------------------------------
>
>                 Key: HUDI-7518
>                 URL: https://issues.apache.org/jira/browse/HUDI-7518
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Ethan Guo
>            Assignee: Ethan Guo
>            Priority: Blocker
>             Fix For: 0.15.0, 1.0.0
>
>
> When there are repeated duplicate deletes to the partition file list in files 
> partition of the MDT, the current HoodieMetadataPayload merging logic drops 
> such "deletion", causing the file that is deleted from the file system and 
> supposed to be deleted from MDT file listing still left in MDT, because of 
> the following logic:
> {code:java}
> private Map<String, HoodieMetadataFileInfo> 
> combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
>     Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
>     // First, add all files listed in the previous record
>     if (previousRecord.filesystemMetadata != null) {
>       combinedFileInfo.putAll(previousRecord.filesystemMetadata);
>     }
>     // Second, merge in the files listed in the new record
>     if (filesystemMetadata != null) {
>       validatePayload(type, filesystemMetadata);
>       filesystemMetadata.forEach((key, fileInfo) -> {
>         combinedFileInfo.merge(key, fileInfo,
>             (oldFileInfo, newFileInfo) ->
>                 newFileInfo.getIsDeleted()
>                     ? null
>                     : new 
> HoodieMetadataFileInfo(Math.max(newFileInfo.getSize(), 
> oldFileInfo.getSize()), false));
>       });
>     } {code}
> Here's a concrete example of how this bug causes the ingestion to fail:
> (1) A data file and file group are replaced by clustering.  The data file is 
> still on the file system and in MDT file listing.
> (2) A cleaner plan is generated to delete the data file.
> (3) The cleaner plan is executed the first time, and fails before commit due 
> to Spark job shutdown.
> (4) The ingestion continues and succeeds, and another cleaner plan is 
> generated containing the same data file/file group to delete.
> (5) The first cleaner plan is successfully executed, incurring deletion to 
> the file list with a metadata payload, and this is added to one log file in 
> MDT, e.g.,
> {code:java}
> HoodieMetadataPayload {key=partition, type=2, Files: {creations=[], 
> deletions=[7f6b146e-cd43-4fd3-9ce0-118232562569-0_63-29223-5579389_20240303214408245.parquet],
>  }}{code}
> (6) The second cleaner plan is also successfully executed, incurring deletion 
> to the file list with a metadata payload containing the same data file to 
> delete, and this is added to a subsequent log file in the same file slice in 
> MDT, e.g.,
> {code:java}
> HoodieMetadataPayload {key=partition, type=2, Files: {creations=[], 
> deletions=[7f6b146e-cd43-4fd3-9ce0-118232562569-0_63-29223-5579389_20240303214408245.parquet],
>  }} {code}
> (7) The replacecommit corresponds to the clustering is archived as the 
> cleaner has deleted the replaced file groups.
> (8) When reading MDT or MDT compaction happens, the merging of these two 
> metadata payloads with identical deletes leads to empty deletion, so the data 
> file is not deleted from the partition file list in MDT.  The expected 
> behavior is to keep the data file in the "deletions" field.
> {code:java}
> HoodieMetadataPayload {key=partition, type=2, Files: {creations=[], 
> deletions=[], }}{code}
> (9) Next time, when doing upsert and indexing, the deleted data file is 
> served by the file system view based on MDT (e.g., 
> "7f6b146e-cd43-4fd3-9ce0-118232562569-0_63-29223-5579389_20240303214408245.parquet"),
>  and the data file cannot be found on the file system, causing the ingestion 
> to fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to