[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r820157441 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java ## @@ -1301,4 +1369,33 @@ public void close() { this.heartbeatClient.stop(); this.txnManager.close(); } + + private void setWriteTimer(HoodieTable table) { +String commitType = table.getMetaClient().getCommitActionType(); +if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) { + writeTimer = metrics.getCommitCtx(); +} else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + writeTimer = metrics.getDeltaCommitCtx(); +} + } + + private void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { +UpgradeDowngrade upgradeDowngrade = +new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper); + +if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { Review comment: @yihua : we are making some minor refactoring relating to upgradedowngrade invocation for java engine. just wanted to keep you in the loop. Prior to this patch, we did not have any explicit upgrade downgrade. Just wanted to ensure unification does not cause any issues. if you can review it once and stamp it, would be nice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r820157190 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java ## @@ -1301,4 +1369,33 @@ public void close() { this.heartbeatClient.stop(); this.txnManager.close(); } + + private void setWriteTimer(HoodieTable table) { +String commitType = table.getMetaClient().getCommitActionType(); +if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) { + writeTimer = metrics.getCommitCtx(); +} else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + writeTimer = metrics.getDeltaCommitCtx(); +} + } + + private void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { +UpgradeDowngrade upgradeDowngrade = +new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper); + +if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { Review comment: I am fine. but just that I want to get a stamp it from devs who work in flink and java. Just don't want to miss anything, bcoz, not many times every engine follows same logic. I checked the code and refactoring once again, seems to be fine. @danny0405 : we are making some minor refactoring relating to upgradedowngrade invocation for flink engine. just wanted to keep you in the loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r818036625 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java ## @@ -1301,4 +1369,33 @@ public void close() { this.heartbeatClient.stop(); this.txnManager.close(); } + + private void setWriteTimer(HoodieTable table) { +String commitType = table.getMetaClient().getCommitActionType(); +if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) { + writeTimer = metrics.getCommitCtx(); +} else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + writeTimer = metrics.getDeltaCommitCtx(); +} + } + + private void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { +UpgradeDowngrade upgradeDowngrade = +new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper); + +if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { Review comment: as per master, this code is available only in SparkEngine. Flink and java does not have this. Even if we wish to unify, I would do it in a separate patch and getting it reviewed by experts who have worked on it. Can you move this to SparkRDDWriteClient for now. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java ## @@ -1246,17 +1243,88 @@ public HoodieMetrics getMetrics() { } /** - * Get HoodieTable and init {@link Timer.Context}. + * Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary + * bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped) * - * @param operationType write operation type + * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS + * NOT REQUIRING EXTERNAL SYNCHRONIZATION + * + * @param metaClient instance of {@link HoodieTableMetaClient} * @param instantTime current inflight instant time - * @return HoodieTable + * @return instantiated {@link HoodieTable} */ - protected abstract HoodieTable getTableAndInitCtx(WriteOperationType operationType, String instantTime); + protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime); /** - * Sets write schema from last instant since deletes may not have schema set in the config. + * Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping + * operations such as: + * + * NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please check on + * {@link #doInitTable(HoodieTableMetaClient, Option)} instead + * + * + * Checking whether upgrade/downgrade is required + * Bootstrapping Metadata Table (if required) + * Initializing metrics contexts + * */ + protected final HoodieTable initTable(WriteOperationType operationType, Option instantTime) { +HoodieTableMetaClient metaClient = createMetaClient(true); +// Setup write schemas for deletes +if (operationType == WriteOperationType.DELETE) { + setWriteSchemaForDeletes(metaClient); +} + +HoodieTable table; + +this.txnManager.beginTransaction(); +try { + tryUpgrade(metaClient, instantTime); + table = doInitTable(metaClient, instantTime); +} finally { + this.txnManager.endTransaction(); +} + +// Validate table properties +metaClient.validateTableProperties(config.getProps(), operationType); +// Make sure that FS View is in sync +table.getHoodieView().sync(); + +switch (operationType) { + case INSERT: + case INSERT_PREPPED: + case UPSERT: + case UPSERT_PREPPED: + case BULK_INSERT: + case BULK_INSERT_PREPPED: + case INSERT_OVERWRITE: + case INSERT_OVERWRITE_TABLE: +setWriteTimer(table); +break; + case CLUSTER: +clusteringTimer = metrics.getClusteringCtx(); +break; + case COMPACT: +compactionTimer = metrics.getCompactionCtx(); +break; + default: +} + +return table; + } + + protected R withLock(Supplier s) { Review comment: is this used anywhere ? ## File path: hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law
[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r80407 ## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ## @@ -362,28 +361,33 @@ private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayl return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map combinedFileInfo = new HashMap<>(); + +// First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } +// Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { -// If the filename wasnt present then we carry it forward -if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); -} else { - if (fileInfo.getIsDeleted()) { -// file deletion -combinedFileInfo.remove(filename); - } else { -// file appends. -combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); -}); - } -} + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { +combinedFileInfo.merge(key, fileInfo, +// Combine previous record w/ the new one, new records taking precedence over +// the old one +// +// NOTE: That if previous listing contains the file that is being deleted by the tombstone +// record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting +// listing as well as drop the tombstone itself. +// However, if file is not present in the previous record we have to persist tombstone +// record in the listing to make sure we carry forward information that this file +// was deleted. This special case could occur since the merging flow is 2-stage: +// - First we merge records from all of the delta log-files +// - Then we merge records from base-files with the delta ones (coming as a result +// of the previous step) +(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); Review comment: thanks a lot for capturing the gist. So, I assume you plan to fix the merge function to do max(old size, new size). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r802253573 ## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ## @@ -362,28 +361,33 @@ private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayl return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map combinedFileInfo = new HashMap<>(); + +// First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } +// Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { -// If the filename wasnt present then we carry it forward -if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); -} else { - if (fileInfo.getIsDeleted()) { -// file deletion -combinedFileInfo.remove(filename); - } else { -// file appends. -combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); -}); - } -} + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { +combinedFileInfo.merge(key, fileInfo, +// Combine previous record w/ the new one, new records taking precedence over +// the old one +// +// NOTE: That if previous listing contains the file that is being deleted by the tombstone +// record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting +// listing as well as drop the tombstone itself. +// However, if file is not present in the previous record we have to persist tombstone +// record in the listing to make sure we carry forward information that this file +// was deleted. This special case could occur since the merging flow is 2-stage: +// - First we merge records from all of the delta log-files +// - Then we merge records from base-files with the delta ones (coming as a result +// of the previous step) +(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); Review comment: I agree. but the scenario which you quoted are unlikely to happen in general. a. since cleaner will not touch latest file slice where regular writes go into (MOR table). b. incase of COW rollback, there is no log file concept and hence a rollback and a commit touching the same file is very unlikely. So, two concurrent writes where one of them deletes while the other updates or adds a new file : I can't think of a valid scenario. may be there is, we need to think hard to see if we can come up w/ one. But the scenario of rollback and concurrent updates to same file slice might happen in MOR table. also, the fix that is being considered may not cause any regression and could possibly help thwart some of the scenarios. i.e taking the max file size rather than latest file size. Do you see any issues specifically to go w/ max of file sizes in merge function? Atleast for the rollback and regular writer in MOR use-case, we have to make the fix. I don't think we can ignore the use-case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r802159013 ## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ## @@ -362,28 +361,33 @@ private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayl return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map combinedFileInfo = new HashMap<>(); + +// First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } +// Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { -// If the filename wasnt present then we carry it forward -if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); -} else { - if (fileInfo.getIsDeleted()) { -// file deletion -combinedFileInfo.remove(filename); - } else { -// file appends. -combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); -}); - } -} + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { +combinedFileInfo.merge(key, fileInfo, +// Combine previous record w/ the new one, new records taking precedence over +// the old one +// +// NOTE: That if previous listing contains the file that is being deleted by the tombstone +// record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting +// listing as well as drop the tombstone itself. +// However, if file is not present in the previous record we have to persist tombstone +// record in the listing to make sure we carry forward information that this file +// was deleted. This special case could occur since the merging flow is 2-stage: +// - First we merge records from all of the delta log-files +// - Then we merge records from base-files with the delta ones (coming as a result +// of the previous step) +(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); Review comment: ok, let me take a stab to see if this is a valid scenario. this could be applicable only for hdfs. Let's say there is a concurrent write to a log file by two diff writers. one of them is doing a rollback and another is appending a log block. Let's say writer1 (who is doing a rollback) updates the log file first and gets size as 200 may be. And later writer2 appends to same log file and gets size as 300. Even though the order in which these writers appended to file could be writer1 followed by writer2. Its not guaranteed that the same order will be maintained when they reach metadata table. So, due to various reasons writer2 could complete its write earlier(for eg writer2 is updating only one file group, where as writer1 is updating some 100 file groups) and could apply its changes to metadata first before writer1. wouldn't the final resolved size from metadata be 200 in this case when both writers are done updating MDT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r802159013 ## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ## @@ -362,28 +361,33 @@ private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayl return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map combinedFileInfo = new HashMap<>(); + +// First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } +// Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { -// If the filename wasnt present then we carry it forward -if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); -} else { - if (fileInfo.getIsDeleted()) { -// file deletion -combinedFileInfo.remove(filename); - } else { -// file appends. -combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); -}); - } -} + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { +combinedFileInfo.merge(key, fileInfo, +// Combine previous record w/ the new one, new records taking precedence over +// the old one +// +// NOTE: That if previous listing contains the file that is being deleted by the tombstone +// record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting +// listing as well as drop the tombstone itself. +// However, if file is not present in the previous record we have to persist tombstone +// record in the listing to make sure we carry forward information that this file +// was deleted. This special case could occur since the merging flow is 2-stage: +// - First we merge records from all of the delta log-files +// - Then we merge records from base-files with the delta ones (coming as a result +// of the previous step) +(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); Review comment: ok, let me take a stab to see if this is a valid scenario. this could be applicable only for hdfs. Let's say there is a concurrent write to a log file by two diff writers. one of them is doing a rollback and another is appending a log block. Let's say writer1 (who is doing a rollback) updates the log file first and gets size as 200 may be. And later writer2 appends to same log file and gets size as 300. Even though the order in which these writers appended to file could be writer1 followed by writer2. Its not guaranteed that the same order will be maintained when they reach metadata table. So, due to various reasons writer2 could complete its write earlier and could apply its changes to metadata first before writer1. wouldn't the final resolved size from metadata be 200 in this case when both writers are done updating MDT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r802157120 ## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ## @@ -362,28 +375,33 @@ private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayl return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map combinedFileInfo = new HashMap<>(); + +// First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } +// Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { -// If the filename wasnt present then we carry it forward -if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); -} else { - if (fileInfo.getIsDeleted()) { -// file deletion -combinedFileInfo.remove(filename); - } else { -// file appends. -combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); -}); - } -} + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { +combinedFileInfo.merge(key, fileInfo, +// Combine previous record w/ the new one, new records taking precedence over +// the old one +// +// NOTE: That if previous listing contains the file that is being deleted by the tombstone +// record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting +// listing as well as drop the tombstone itself. +// However, if file is not present in the previous record we have to persist tombstone +// record in the listing to make sure we carry forward information that this file +// was deleted. This special case could occur since the merging flow is 2-stage: +// - First we merge records from all of the delta log-files +// - Then we merge records from base-files with the delta ones (coming as a result +// of the previous step) +(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); Review comment: guess prashanth meant to check the oldFileInfo. the validate you are referring to validates the new incoming filesystemMetadata. ## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ## @@ -362,28 +361,33 @@ private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayl return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map combinedFileInfo = new HashMap<>(); + +// First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } +// Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { -// If the filename wasnt present then we carry it forward -if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); -} else { - if (fileInfo.getIsDeleted()) { -// file deletion -combinedFileInfo.remove(filename); - } else { -// file appends. -combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); -}); - } -} + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { +combinedFileInfo.merge(key, fileInfo, +// Combine previous record w/ the new one, new records taking precedence over +// the old one +// +// NOTE: That if previous listing contains the file that is being deleted by the tombstone +// record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting +//
[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r800214841 ## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ## @@ -362,28 +361,33 @@ private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayl return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map combinedFileInfo = new HashMap<>(); + +// First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } +// Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { -// If the filename wasnt present then we carry it forward -if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); -} else { - if (fileInfo.getIsDeleted()) { -// file deletion -combinedFileInfo.remove(filename); - } else { -// file appends. -combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); -}); - } -} + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { +combinedFileInfo.merge(key, fileInfo, +// Combine previous record w/ the new one, new records taking precedence over +// the old one +// +// NOTE: That if previous listing contains the file that is being deleted by the tombstone +// record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting +// listing as well as drop the tombstone itself. +// However, if file is not present in the previous record we have to persist tombstone +// record in the listing to make sure we carry forward information that this file +// was deleted. This special case could occur since the merging flow is 2-stage: +// - First we merge records from all of the delta log-files +// - Then we merge records from base-files with the delta ones (coming as a result +// of the previous step) +(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); Review comment: @prashantwason : aren't there any multi-writer scenarios, where a new commit could report a file w/ lower size compared to a previous commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r800214756 ## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ## @@ -362,28 +361,33 @@ private HoodieMetadataColumnStats combineColumnStatsMetadatat(HoodieMetadataPayl return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map combinedFileInfo = new HashMap<>(); + +// First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } +// Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { -// If the filename wasnt present then we carry it forward -if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); -} else { - if (fileInfo.getIsDeleted()) { -// file deletion -combinedFileInfo.remove(filename); - } else { -// file appends. -combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); -}); - } -} + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { +combinedFileInfo.merge(key, fileInfo, +// Combine previous record w/ the new one, new records taking precedence over +// the old one +// +// NOTE: That if previous listing contains the file that is being deleted by the tombstone +// record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting +// listing as well as drop the tombstone itself. +// However, if file is not present in the previous record we have to persist tombstone +// record in the listing to make sure we carry forward information that this file +// was deleted. This special case could occur since the merging flow is 2-stage: +// - First we merge records from all of the delta log-files +// - Then we merge records from base-files with the delta ones (coming as a result +// of the previous step) +(oldFileInfo, newFileInfo) -> newFileInfo.getIsDeleted() ? null : newFileInfo); Review comment: don't we need to take the max of either of sizes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #4739: [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS
nsivabalan commented on a change in pull request #4739: URL: https://github.com/apache/hudi/pull/4739#discussion_r798098988 ## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ## @@ -215,19 +215,16 @@ public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { if (filesystemMetadata != null) { filesystemMetadata.forEach((filename, fileInfo) -> { -// If the filename wasnt present then we carry it forward -if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); +if (fileInfo.getIsDeleted()) { + combinedFileInfo.remove(filename); } else { - if (fileInfo.getIsDeleted()) { -// file deletion -combinedFileInfo.remove(filename); - } else { -// file appends. -combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); -}); - } + // NOTE: There are 2 possible cases here: + //- New file is created: in that case we're simply adding its info + //- File is appended to (only log-files of MOR tables on supported FS): in that case + // we simply pick the info w/ largest file-size as the most recent one, since file's + // sizes are increasing monotonically (meaning that the larger file-size is more recent one) + combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> Review comment: merge func takes care of adding an entry for the first time and hence remove L219 and 220 ? ## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ## @@ -87,40 +89,58 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont * @return a list of metadata table records */ public static List convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String instantTime) { -List records = new LinkedList<>(); -List allPartitions = new LinkedList<>(); -commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { - final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName; - allPartitions.add(partition); - - Map newFiles = new HashMap<>(writeStats.size()); - writeStats.forEach(hoodieWriteStat -> { -String pathWithPartition = hoodieWriteStat.getPath(); -if (pathWithPartition == null) { - // Empty partition - LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat); - return; -} - -int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1; -String filename = pathWithPartition.substring(offset); -long totalWriteBytes = newFiles.containsKey(filename) -? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes() -: hoodieWriteStat.getTotalWriteBytes(); -newFiles.put(filename, totalWriteBytes); - }); - // New files added to a partition - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord( - partition, Option.of(newFiles), Option.empty()); - records.add(record); -}); +List records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size()); + +// Add record bearing partitions list +ArrayList partitionsList = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); + + records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList)); + +// New files added to a partition +List> updatedFilesRecords = +commitMetadata.getPartitionToWriteStats().entrySet() +.stream() +.map(entry -> { + String partitionStatName = entry.getKey(); + List writeStats = entry.getValue(); + + String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName; + + HashMap updatedFilesToSizesMapping = + writeStats.stream().reduce(new HashMap<>(writeStats.size()), + (map, stat) -> { +String pathWithPartition = stat.getPath(); +if (pathWithPartition == null) { + // Empty partition + LOG.warn("Unable to find path in write stat to update metadata table " + stat); + return map; +} + +int offset = partition.equals(NON_PARTITIONED_NAME) +? (pathWithPartition.startsWith("/") ? 1 : 0) +: partition.len