[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r784462454 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java ## @@ -248,11 +253,32 @@ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) { break; } } +} catch (Exception originalException) { + // merge small archive files may left uncompleted archive file which will cause exception. + // need to ignore this kind of exception here. + try { +Path planPath = new Path(metaClient.getArchivePath(), "mergeArchivePlan"); +HoodieWrapperFileSystem fileSystem = metaClient.getFs(); +if (fileSystem.exists(planPath)) { + HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class); + String mergedArchiveFileName = plan.getMergedArchiveFileName(); + if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && fs.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) { +LOG.warn("Catch exception because of reading uncompleted merging archive file " + mergedArchiveFileName + ". Ignore it here."); +continue; + } +} +throw originalException; + } catch (Exception e) { +// If anything wrong during parsing merge archive plan, we need to throw the original exception. +// For example corrupted archive file and corrupted plan are both existed. +throw originalException; + } Review comment: Hi @nsivabalan and @yihua The common concern is incomplete/duplicate data left after last merging of small archive files fails and the current Hudi writer / commit is configured to disable archive file merging. Ideally we need to check and clean dirty data before every archive. Why we need this button before do clean works I think are : This is a new feature, it's more safe with a default false control here. I am pretty worried about multi-writer here, at least we have a way to control only one writer could do merge works. As for making sure that incomplete data will cause no damage for loading archived timeline until next clean up: 1. we use HashSet to avoid duplicate instants during loading archive instants. 2. we use this try-catch to deal with exception caused by loading incomplete merged small archive files. In the next step, maybe we can take care about multi-writer, runs stable for some time in my staging/production environment and finally removed this strict restrictions for `verifyLastMergeArchiveFilesIfNecessary ` here :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r782002899 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which is not supported append operation. + * The hole merge small archive files operation has four stages: + * 1. Build merge plan with merge candidates/merged file name infos. + * 2. Do merge. + * 3. Delete all the candidates. + * 4. Delete the merge plan. + * @param context HoodieEngineContext + * @throws IOException + */ + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( +new Path(metaClient.getArchivePath() + "/.commits_.archive*")); +// Sort files by version suffix in reverse (implies reverse chronological order) +Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator()); + +int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize(); +long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + +List mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses); + +if (mergeCandidate.size() >= archiveFilesMergeBatch) { + List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + // before merge archive files build merge plan + String logFileName = computeLogFileName(); + buildArchiveMergePlan(candidateFiles, planPath, logFileName); + // merge archive files + mergeArchiveFiles(mergeCandidate); + // after merge, delete the small archive files. + deleteFilesParallelize(metaClient, candidateFiles, context, true); + // finally, delete archiveMergePlan which means merge small archive files operatin is succeed. + metaClient.getFs().delete(planPath, false); +} + } + + /** + * Find the latest 'huge archive file' index as a break point and only check/merge newer archive files. + * Because we need to keep the original order of archive files which is important when loading archived instants with time filter. + * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function commitsFilter) + * @param smallFileLimitBytes small File Limit Bytes + * @param fsStatuses Sort by version suffix in reverse + * @return merge candidates + */ + private List getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) { +int index = 0; +for (; index < fsStatuses.length; index++) { + if (fsStatuses[index].getLen() > smallFileLimitBytes) { +break; + } +} +return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList()); + } + + /** + * Get final written archive file name based on storageSchemes support append or not. + */ + private String computeLogFileName() throws IOException { +String logWriteToken = writer.getLogFile().getLogWriteToken(); +HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken); +return hoodieLogFile.getFileName(); + } + + /** + * Check/Solve if there is any failed and unfinished merge small archive files operation + * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. + * @throws IOException + */ + private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { Review comment: Changed. Mentioned below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r782002120 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which is not supported append operation. + * The hole merge small archive files operation has four stages: + * 1. Build merge plan with merge candidates/merged file name infos. + * 2. Do merge. + * 3. Delete all the candidates. + * 4. Delete the merge plan. + * @param context HoodieEngineContext + * @throws IOException + */ + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( +new Path(metaClient.getArchivePath() + "/.commits_.archive*")); +// Sort files by version suffix in reverse (implies reverse chronological order) +Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator()); + +int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize(); +long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + +List mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses); + +if (mergeCandidate.size() >= archiveFilesMergeBatch) { + List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + // before merge archive files build merge plan + String logFileName = computeLogFileName(); + buildArchiveMergePlan(candidateFiles, planPath, logFileName); + // merge archive files + mergeArchiveFiles(mergeCandidate); + // after merge, delete the small archive files. + deleteFilesParallelize(metaClient, candidateFiles, context, true); + // finally, delete archiveMergePlan which means merge small archive files operatin is succeed. + metaClient.getFs().delete(planPath, false); +} + } + + /** + * Find the latest 'huge archive file' index as a break point and only check/merge newer archive files. + * Because we need to keep the original order of archive files which is important when loading archived instants with time filter. + * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function commitsFilter) + * @param smallFileLimitBytes small File Limit Bytes + * @param fsStatuses Sort by version suffix in reverse + * @return merge candidates + */ + private List getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) { +int index = 0; +for (; index < fsStatuses.length; index++) { + if (fsStatuses[index].getLen() > smallFileLimitBytes) { +break; + } +} +return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList()); + } + + /** + * Get final written archive file name based on storageSchemes support append or not. + */ + private String computeLogFileName() throws IOException { +String logWriteToken = writer.getLogFile().getLogWriteToken(); +HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken); +return hoodieLogFile.getFileName(); + } + + /** + * Check/Solve if there is any failed and unfinished merge small archive files operation + * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. + * @throws IOException + */ + private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { + Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); + HoodieWrapperFileSystem fs = metaClient.getFs(); + // If plan exist, last merge small archive files was failed. + // we need to revert or complete last action. + if (fs.exists(planPath)) { +HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(readDataFromPath(planPath).get(), HoodieMergeArchiveFilePlan.class); +Path mergedArchiveFile = new Path(metaClient.getArchivePath(), plan.getMergedArchiveFileName()); +List candidates =
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r782001297 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which is not supported append operation. + * The hole merge small archive files operation has four stages: + * 1. Build merge plan with merge candidates/merged file name infos. + * 2. Do merge. + * 3. Delete all the candidates. + * 4. Delete the merge plan. + * @param context HoodieEngineContext + * @throws IOException + */ + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( +new Path(metaClient.getArchivePath() + "/.commits_.archive*")); +// Sort files by version suffix in reverse (implies reverse chronological order) +Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator()); + +int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize(); +long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + +List mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses); + +if (mergeCandidate.size() >= archiveFilesMergeBatch) { + List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); Review comment: Sure. https://issues.apache.org/jira/browse/HUDI-3212 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r781998272 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which is not supported append operation. + * The hole merge small archive files operation has four stages: + * 1. Build merge plan with merge candidates/merged file name infos. + * 2. Do merge. + * 3. Delete all the candidates. + * 4. Delete the merge plan. + * @param context HoodieEngineContext + * @throws IOException + */ + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( +new Path(metaClient.getArchivePath() + "/.commits_.archive*")); +// Sort files by version suffix in reverse (implies reverse chronological order) +Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator()); + +int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize(); +long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + +List mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses); + +if (mergeCandidate.size() >= archiveFilesMergeBatch) { + List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + // before merge archive files build merge plan + String logFileName = computeLogFileName(); + buildArchiveMergePlan(candidateFiles, planPath, logFileName); + // merge archive files + mergeArchiveFiles(mergeCandidate); + // after merge, delete the small archive files. + deleteFilesParallelize(metaClient, candidateFiles, context, true); + // finally, delete archiveMergePlan which means merge small archive files operatin is succeed. + metaClient.getFs().delete(planPath, false); +} + } + + /** + * Find the latest 'huge archive file' index as a break point and only check/merge newer archive files. + * Because we need to keep the original order of archive files which is important when loading archived instants with time filter. + * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function commitsFilter) + * @param smallFileLimitBytes small File Limit Bytes + * @param fsStatuses Sort by version suffix in reverse + * @return merge candidates + */ + private List getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) { +int index = 0; +for (; index < fsStatuses.length; index++) { + if (fsStatuses[index].getLen() > smallFileLimitBytes) { +break; + } +} +return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList()); + } + + /** + * Get final written archive file name based on storageSchemes support append or not. + */ + private String computeLogFileName() throws IOException { +String logWriteToken = writer.getLogFile().getLogWriteToken(); +HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken); +return hoodieLogFile.getFileName(); + } + + /** + * Check/Solve if there is any failed and unfinished merge small archive files operation + * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. + * @throws IOException + */ + private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { + Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); + HoodieWrapperFileSystem fs = metaClient.getFs(); + // If plan exist, last merge small archive files was failed. + // we need to revert or complete last action. + if (fs.exists(planPath)) { +HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(readDataFromPath(planPath).get(), HoodieMergeArchiveFilePlan.class); +Path mergedArchiveFile = new Path(metaClient.getArchivePath(), plan.getMergedArchiveFileName()); +List candidates =
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r781997631 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which is not supported append operation. + * The hole merge small archive files operation has four stages: + * 1. Build merge plan with merge candidates/merged file name infos. + * 2. Do merge. + * 3. Delete all the candidates. + * 4. Delete the merge plan. + * @param context HoodieEngineContext + * @throws IOException + */ + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( +new Path(metaClient.getArchivePath() + "/.commits_.archive*")); +// Sort files by version suffix in reverse (implies reverse chronological order) +Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator()); + +int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize(); +long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + +List mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses); + +if (mergeCandidate.size() >= archiveFilesMergeBatch) { + List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + // before merge archive files build merge plan + String logFileName = computeLogFileName(); + buildArchiveMergePlan(candidateFiles, planPath, logFileName); + // merge archive files + mergeArchiveFiles(mergeCandidate); + // after merge, delete the small archive files. + deleteFilesParallelize(metaClient, candidateFiles, context, true); + // finally, delete archiveMergePlan which means merge small archive files operatin is succeed. + metaClient.getFs().delete(planPath, false); +} + } + + /** + * Find the latest 'huge archive file' index as a break point and only check/merge newer archive files. + * Because we need to keep the original order of archive files which is important when loading archived instants with time filter. + * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function commitsFilter) + * @param smallFileLimitBytes small File Limit Bytes + * @param fsStatuses Sort by version suffix in reverse + * @return merge candidates + */ + private List getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) { +int index = 0; +for (; index < fsStatuses.length; index++) { + if (fsStatuses[index].getLen() > smallFileLimitBytes) { +break; + } +} +return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList()); + } + + /** + * Get final written archive file name based on storageSchemes support append or not. + */ + private String computeLogFileName() throws IOException { +String logWriteToken = writer.getLogFile().getLogWriteToken(); +HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken); +return hoodieLogFile.getFileName(); + } + + /** + * Check/Solve if there is any failed and unfinished merge small archive files operation + * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. + * @throws IOException + */ + private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { + Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); + HoodieWrapperFileSystem fs = metaClient.getFs(); + // If plan exist, last merge small archive files was failed. + // we need to revert or complete last action. + if (fs.exists(planPath)) { +HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(readDataFromPath(planPath).get(), HoodieMergeArchiveFilePlan.class); Review comment: Nice catch! We need to take care of parsing merge plan failed. Changed. -- This is an automated
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r781995912 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which is not supported append operation. + * The hole merge small archive files operation has four stages: Review comment: Chanegd. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which is not supported append operation. + * The hole merge small archive files operation has four stages: Review comment: Changed. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which is not supported append operation. + * The hole merge small archive files operation has four stages: + * 1. Build merge plan with merge candidates/merged file name infos. + * 2. Do merge. + * 3. Delete all the candidates. + * 4. Delete the merge plan. + * @param context HoodieEngineContext + * @throws IOException + */ + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write Review comment: Changed. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which is not supported append operation. + * The hole merge small archive files operation has four stages: + * 1. Build merge plan with merge candidates/merged file name infos. + * 2. Do merge. + * 3. Delete all the candidates. + * 4. Delete the merge plan. + * @param context HoodieEngineContext + * @throws IOException + */ + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( +new Path(metaClient.getArchivePath() + "/.commits_.archive*")); +// Sort files by version suffix in reverse (implies reverse chronological order) +Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator()); + +int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize(); +long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + +List mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses); + +if (mergeCandidate.size() >= archiveFilesMergeBatch) { + List candidateFiles = mergeCandidate.stream().map(fs ->
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r781995466 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,24 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty ARCHIVE_FILES_MERGE_BATCH_SIZE = ConfigProperty + .key("hoodie.archive.files.merge.batch.size") + .defaultValue(String.valueOf(10)) + .withDocumentation("The numbers of small archive files are merged at once."); + + public static final ConfigProperty ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty + .key("hoodie.archive.merge.small.file.limit.bytes") + .defaultValue(String.valueOf(20 * 1024 * 1024)) + .withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file."); + + public static final ConfigProperty ARCHIVE_AUTO_MERGE_ENABLE = ConfigProperty + .key("hoodie.archive.auto.merge.enable") + .defaultValue("false") + .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's" + + " useful when storage scheme doesn't support append operation."); + + + Review comment: Changed. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,24 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty ARCHIVE_FILES_MERGE_BATCH_SIZE = ConfigProperty + .key("hoodie.archive.files.merge.batch.size") + .defaultValue(String.valueOf(10)) + .withDocumentation("The numbers of small archive files are merged at once."); + + public static final ConfigProperty ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty + .key("hoodie.archive.merge.small.file.limit.bytes") + .defaultValue(String.valueOf(20 * 1024 * 1024)) + .withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file."); + + public static final ConfigProperty ARCHIVE_AUTO_MERGE_ENABLE = ConfigProperty + .key("hoodie.archive.auto.merge.enable") + .defaultValue("false") + .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's" + + " useful when storage scheme doesn't support append operation."); Review comment: Changed. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,24 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty ARCHIVE_FILES_MERGE_BATCH_SIZE = ConfigProperty + .key("hoodie.archive.files.merge.batch.size") + .defaultValue(String.valueOf(10)) + .withDocumentation("The numbers of small archive files are merged at once."); Review comment: Changed. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,24 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty ARCHIVE_FILES_MERGE_BATCH_SIZE = ConfigProperty + .key("hoodie.archive.files.merge.batch.size") Review comment: Changed. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,24 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty ARCHIVE_FILES_MERGE_BATCH_SIZE = ConfigProperty + .key("hoodie.archive.files.merge.batch.size") + .defaultValue(String.valueOf(10)) + .withDocumentation("The numbers of small archive files are merged at once."); + + public static final ConfigProperty ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty + .key("hoodie.archive.merge.small.file.limit.bytes") + .defaultValue(String.valueOf(20 * 1024 * 1024)) + .withDocumentation("This config sets the archive file size limit below which an archive
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r781948726 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java ## @@ -248,11 +253,32 @@ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) { break; } } +} catch (Exception originalException) { + // merge small archive files may left uncompleted archive file which will cause exception. + // need to ignore this kind of exception here. + try { +Path planPath = new Path(metaClient.getArchivePath(), "mergeArchivePlan"); +HoodieWrapperFileSystem fileSystem = metaClient.getFs(); +if (fileSystem.exists(planPath)) { + HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class); + String mergedArchiveFileName = plan.getMergedArchiveFileName(); + if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && fs.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) { +LOG.warn("Catch exception because of reading uncompleted merging archive file " + mergedArchiveFileName + ". Ignore it here."); +continue; + } +} +throw originalException; + } catch (Exception e) { +// If anything wrong during parsing merge archive plan, we need to throw the original exception. +// For example corrupted archive file and corrupted plan are both existed. +throw originalException; + } Review comment: We use these code to check if originalException is caused by corrupted mergedArchiveFile and ignore it. Anything else needs to be threw again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777924466 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java ## @@ -218,7 +219,7 @@ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) { // Sort files by version suffix in reverse (implies reverse chronological order) Arrays.sort(fsStatuses, new ArchiveFileVersionComparator()); - List instantsInRange = new ArrayList<>(); + Set instantsInRange = new HashSet<>(); Review comment: ``` @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } HoodieInstant that = (HoodieInstant) o; return state == that.state && Objects.equals(action, that.action) && Objects.equals(timestamp, that.timestamp); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777896842 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java ## @@ -218,7 +219,7 @@ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) { // Sort files by version suffix in reverse (implies reverse chronological order) Arrays.sort(fsStatuses, new ArchiveFileVersionComparator()); - List instantsInRange = new ArrayList<>(); + Set instantsInRange = new HashSet<>(); Review comment: Maybe we could use HashSet to avoid duplicate instants during loading archive instants. This is based on there is no same HoodieInstant in the hoodie time line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777896842 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java ## @@ -218,7 +219,7 @@ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) { // Sort files by version suffix in reverse (implies reverse chronological order) Arrays.sort(fsStatuses, new ArchiveFileVersionComparator()); - List instantsInRange = new ArrayList<>(); + Set instantsInRange = new HashSet<>(); Review comment: Maybe we could use HashSet to avoid duplicate instants. This is based on there is no same HoodieInstant in the hoodie time line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777896842 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java ## @@ -218,7 +219,7 @@ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) { // Sort files by version suffix in reverse (implies reverse chronological order) Arrays.sort(fsStatuses, new ArchiveFileVersionComparator()); - List instantsInRange = new ArrayList<>(); + Set instantsInRange = new HashSet<>(); Review comment: Maybe we could use HashSet to avoid duplicate instants. This is based on there are no same instant time in the hoodie time line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777901212 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable()) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( Review comment: Nice catch here, I just find out that maybe it's important to keep the original instants order of small archive files. https://github.com/apache/hudi/blob/b5f05fd153df29a8be377404a14a0ced2f00b4bf/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java#L219 When load archived instants, hoodie will use this order to optimize skipping reading unnecessary archived files https://github.com/apache/hudi/blob/b5f05fd153df29a8be377404a14a0ced2f00b4bf/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java#L243 So just use the same order compactor here. What do you think? :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777892769 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable()) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( +new Path(metaClient.getArchivePath() + "/.commits_.archive*")); +List mergeCandidate = new ArrayList<>(); +int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize(); +long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + +for (FileStatus fs: fsStatuses) { + if (fs.getLen() < smallFileLimitBytes) { +mergeCandidate.add(fs); + } + if (mergeCandidate.size() >= archiveFilesCompactBatch) { +break; + } +} + +if (mergeCandidate.size() >= archiveFilesCompactBatch) { + List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + // before merge archive files build merge plan + String logFileName = computeLogFileName(); + buildArchiveMergePlan(candidateFiles, planPath, logFileName); + // merge archive files + mergeArchiveFiles(mergeCandidate); + // after merge, delete the small archive files. + deleteFilesParallelize(metaClient, candidateFiles, context, true); + // finally, delete archiveMergePlan which means merge small archive files operatin is succeed. + metaClient.getFs().delete(planPath, false); +} + } + + /** + * Get final written archive file name based on storageSchemes support append or not. + */ + private String computeLogFileName() throws IOException { +if (!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { + String logWriteToken = writer.getLogFile().getLogWriteToken(); + HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken); + return hoodieLogFile.getFileName(); +} else { + return writer.getLogFile().getFileName(); +} + } + + /** + * Check/Solve if there is any failed and unfinished merge small archive files operation + * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. + * @throws IOException + */ + private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +if (config.getArchiveAutoMergeEnable()) { Review comment: When enable and disable: 1. disable using graceful way, there 's nothing left by this patch. 2. Disable using un-graceful way, there may be left duplicate archive instant informations until enable again, but i think there may be cause no damaged if we used HasSet when loading archive instants in this pr changed. Why we need this button before do clean works I think are : 1. This is a new feature, it's more safe with a default false control here. 2. I am pretty worried about multi-writer here, at least we have a way to control only one writer could do merge works. In the next step, maybe we can take care about multi-writer, also removed this button here :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777898257 ## File path: hudi-common/src/main/avro/HoodieMergeArchiveFilePlan.avsc ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace":"org.apache.hudi.avro.model", + "type":"record", + "name":"HoodieMergeArchiveFilePlan", + "fields":[ Review comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777896842 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java ## @@ -218,7 +219,7 @@ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) { // Sort files by version suffix in reverse (implies reverse chronological order) Arrays.sort(fsStatuses, new ArchiveFileVersionComparator()); - List instantsInRange = new ArrayList<>(); + Set instantsInRange = new HashSet<>(); Review comment: Maybe we could use HashSet to avoid duplicate instants. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777895427 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable()) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { Review comment: Added. Thanks a lot for your review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777895319 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable()) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( +new Path(metaClient.getArchivePath() + "/.commits_.archive*")); +List mergeCandidate = new ArrayList<>(); +int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize(); +long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + +for (FileStatus fs: fsStatuses) { + if (fs.getLen() < smallFileLimitBytes) { +mergeCandidate.add(fs); + } + if (mergeCandidate.size() >= archiveFilesCompactBatch) { +break; + } +} + +if (mergeCandidate.size() >= archiveFilesCompactBatch) { + List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + // before merge archive files build merge plan + String logFileName = computeLogFileName(); + buildArchiveMergePlan(candidateFiles, planPath, logFileName); + // merge archive files + mergeArchiveFiles(mergeCandidate); + // after merge, delete the small archive files. + deleteFilesParallelize(metaClient, candidateFiles, context, true); Review comment: Just create a new log file here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777895112 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable()) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( +new Path(metaClient.getArchivePath() + "/.commits_.archive*")); +List mergeCandidate = new ArrayList<>(); +int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize(); +long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + +for (FileStatus fs: fsStatuses) { + if (fs.getLen() < smallFileLimitBytes) { +mergeCandidate.add(fs); + } + if (mergeCandidate.size() >= archiveFilesCompactBatch) { +break; + } +} + +if (mergeCandidate.size() >= archiveFilesCompactBatch) { + List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + // before merge archive files build merge plan + String logFileName = computeLogFileName(); + buildArchiveMergePlan(candidateFiles, planPath, logFileName); + // merge archive files + mergeArchiveFiles(mergeCandidate); + // after merge, delete the small archive files. + deleteFilesParallelize(metaClient, candidateFiles, context, true); + // finally, delete archiveMergePlan which means merge small archive files operatin is succeed. + metaClient.getFs().delete(planPath, false); +} + } + + /** + * Get final written archive file name based on storageSchemes support append or not. + */ + private String computeLogFileName() throws IOException { +if (!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { + String logWriteToken = writer.getLogFile().getLogWriteToken(); + HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken); + return hoodieLogFile.getFileName(); +} else { + return writer.getLogFile().getFileName(); Review comment: > Do we create a new log block in log10 which will contain all merged log blocks from log1 to log10 ? Or should we explicitly create a new log file (log11) Nice catch! It is better for option 2 that we create a new log11 each time during merging small archive files. > Do we even need to consider adding this new feature for storage schemes where append is supported. In other words, should we consider enabling this feature just for storage schemes where append is not supported. and leave it as no-op for storage schemes appends are supported. It's better to enable this feature just for storage schemes where append is not supported -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777892769 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable()) { +mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); +// Flush reminded content if existed and open a new write +reOpenWriter(); +// List all archive files +FileStatus[] fsStatuses = metaClient.getFs().globStatus( +new Path(metaClient.getArchivePath() + "/.commits_.archive*")); +List mergeCandidate = new ArrayList<>(); +int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize(); +long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + +for (FileStatus fs: fsStatuses) { + if (fs.getLen() < smallFileLimitBytes) { +mergeCandidate.add(fs); + } + if (mergeCandidate.size() >= archiveFilesCompactBatch) { +break; + } +} + +if (mergeCandidate.size() >= archiveFilesCompactBatch) { + List candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + // before merge archive files build merge plan + String logFileName = computeLogFileName(); + buildArchiveMergePlan(candidateFiles, planPath, logFileName); + // merge archive files + mergeArchiveFiles(mergeCandidate); + // after merge, delete the small archive files. + deleteFilesParallelize(metaClient, candidateFiles, context, true); + // finally, delete archiveMergePlan which means merge small archive files operatin is succeed. + metaClient.getFs().delete(planPath, false); +} + } + + /** + * Get final written archive file name based on storageSchemes support append or not. + */ + private String computeLogFileName() throws IOException { +if (!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { + String logWriteToken = writer.getLogFile().getLogWriteToken(); + HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken); + return hoodieLogFile.getFileName(); +} else { + return writer.getLogFile().getFileName(); +} + } + + /** + * Check/Solve if there is any failed and unfinished merge small archive files operation + * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. + * @throws IOException + */ + private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +if (config.getArchiveAutoMergeEnable()) { Review comment: When enable and disable: 1. disable using graceful way, there 's nothing left by this patch. 2. Disable using un-graceful way, there may be left duplicate archive instant informations until enable again, but i think there may be cause no damaged if we used HasSet when loading archive instants in this pr changed. Why we need this button before do clean works I think are : 1. This is a new feature, it's more safe with a default false control here. 2. I am pretty worried about multi-writer here at least we have a way to control only one writer could do merge works. In the next step, maybe we can take care about multi-writer, also removed this button here :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r777887971 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -106,6 +116,23 @@ private Writer openWriter() { } } + public void reOpenWriter() { +try { + if (this.writer != null) { +this.writer.close(); Review comment: Sure thing, changed. Just changed the reOpenWriter function which need to be used in UT to remove duplicate codes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r773735747 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,21 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty ARCHIVE_MAX_FILES = ConfigProperty + .key("hoodie.archive.max.files") + .noDefaultValue() + .withDocumentation("The numbers of kept archive files under archived."); + + public static final ConfigProperty ARCHIVE_AUTO_TRIM_ENABLE = ConfigProperty Review comment: Sure, will compact these small archive files instead of deleting. This may need to do design work. Will upgrade the code and design ASAP. Thanks a lot for your review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r771087760 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,21 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty MAX_ARCHIVE_FILES_TO_KEEP_PROP = ConfigProperty + .key("hoodie.max.archive.files") + .noDefaultValue() + .withDocumentation("The numbers of kept archive files under archived."); + + public static final ConfigProperty AUTO_TRIM_ARCHIVE_FILES_DROP = ConfigProperty + .key("hoodie.auto.trim.archive.files") Review comment: Changed. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,21 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty MAX_ARCHIVE_FILES_TO_KEEP_PROP = ConfigProperty + .key("hoodie.max.archive.files") Review comment: Changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r770425205 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -140,6 +146,48 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException } } + private void trimArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Stream allLogFiles = FSUtils.getAllLogFiles(metaClient.getFs(), +archiveFilePath.getParent(), +archiveFilePath.getName(), +HoodieArchivedLogFile.ARCHIVE_EXTENSION, +""); +List sortedLogFilesList = allLogFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).collect(Collectors.toList()); +if (!sortedLogFilesList.isEmpty()) { + List skipped = sortedLogFilesList.stream().skip(maxArchiveFilesToKeep).map(HoodieLogFile::getPath).map(Path::toString).collect(Collectors.toList()); + if (!skipped.isEmpty()) { +LOG.info("Deleting archive files : " + skipped); +context.setJobStatus(this.getClass().getSimpleName(), "Delete archive files"); +Map result = deleteFilesParallelize(metaClient, skipped, context, true); Review comment: Changed. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -140,6 +146,48 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException } } + private void trimArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { +Stream allLogFiles = FSUtils.getAllLogFiles(metaClient.getFs(), +archiveFilePath.getParent(), +archiveFilePath.getName(), +HoodieArchivedLogFile.ARCHIVE_EXTENSION, +""); +List sortedLogFilesList = allLogFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).collect(Collectors.toList()); +if (!sortedLogFilesList.isEmpty()) { + List skipped = sortedLogFilesList.stream().skip(maxArchiveFilesToKeep).map(HoodieLogFile::getPath).map(Path::toString).collect(Collectors.toList()); Review comment: Changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r770425379 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java ## @@ -183,6 +217,41 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio } } + @ParameterizedTest + @MethodSource("testArchiveTableWithArchivalTrim") + public void testArchiveTableWithArchivalCleanUp(boolean enableMetadata, boolean enableArchiveTrim, int archiveFilesToKeep) throws Exception { +HashSet archiveFilesExisted = new HashSet<>(); +ArrayList currentExistArchiveFiles = new ArrayList<>(); +HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 3, 2, enableArchiveTrim, archiveFilesToKeep); +String archivePath = metaClient.getArchivePath(); +for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // trigger archival + archiveAndGetCommitsList(writeConfig); + RemoteIterator iter = metaClient.getFs().listFiles(new Path(archivePath), false); + ArrayList files = new ArrayList<>(); + while (iter.hasNext()) { +files.add(iter.next().getPath().toString()); + } + archiveFilesExisted.addAll(files); + currentExistArchiveFiles = files; +} + +assertEquals(archiveFilesToKeep, currentExistArchiveFiles.size()); + +if (enableArchiveTrim) { + // sort archive files path + List sorted = archiveFilesExisted.stream().sorted().collect(Collectors.toList()); + List archiveFilesDeleted = sorted.subList(0, 3 - archiveFilesToKeep); + List archiveFilesKept = sorted.subList(3 - archiveFilesToKeep, sorted.size()); + + // assert older archive files are deleted + assertFalse(currentExistArchiveFiles.containsAll(archiveFilesDeleted)); + // assert most recent archive files are preserved + assertTrue(currentExistArchiveFiles.containsAll(archiveFilesKept)); +} Review comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r770425009 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,19 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty MAX_ARCHIVE_FILES_TO_KEEP_PROP = ConfigProperty + .key("hoodie.max.archive.files") + .defaultValue("10") + .withDocumentation("The numbers of kept archive files under archived."); + + public static final ConfigProperty AUTO_TRIM_ARCHIVE_FILES_DROP = ConfigProperty + .key("hoodie.auto.trim.archive.files") + .defaultValue("false") + .withDocumentation("When enabled, Hoodie will keep the most recent " + MAX_ARCHIVE_FILES_TO_KEEP_PROP.key() + + " archive files and delete older one which lose part of archived instants information."); Review comment: Emmm, because all the archive related configs such as `hoodie.archive.automatic`, `hoodie.commits.archival.batch` and `hoodie.keep.min.commits`, etc are all lived in `HoodieCompactionConfig `, maybe it's better to be the same :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r770423255 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,19 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty MAX_ARCHIVE_FILES_TO_KEEP_PROP = ConfigProperty + .key("hoodie.max.archive.files") + .defaultValue("10") + .withDocumentation("The numbers of kept archive files under archived."); + + public static final ConfigProperty AUTO_TRIM_ARCHIVE_FILES_DROP = ConfigProperty + .key("hoodie.auto.trim.archive.files") + .defaultValue("false") + .withDocumentation("When enabled, Hoodie will keep the most recent " + MAX_ARCHIVE_FILES_TO_KEEP_PROP.key() Review comment: Appreciate it. Changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r770422959 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,19 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty MAX_ARCHIVE_FILES_TO_KEEP_PROP = ConfigProperty + .key("hoodie.max.archive.files") + .defaultValue("10") Review comment: Sure, thing. Changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r769306591 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java ## @@ -183,6 +217,24 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio } } + @ParameterizedTest + @MethodSource("testArchiveTableWithArchivalCleanUp") + public void testArchiveTableWithArchivalCleanUp(boolean enableMetadata, boolean enableArchiveClean, int archiveFilesToKeep) throws Exception { +HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 3, 2, enableArchiveClean, archiveFilesToKeep); +for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // trigger archival + archiveAndGetCommitsList(writeConfig); +} +String archivePath = metaClient.getArchivePath(); +RemoteIterator iter = metaClient.getFs().listFiles(new Path(archivePath), false); +ArrayList files = new ArrayList<>(); +while (iter.hasNext()) { + files.add(iter.next()); +} +assertEquals(archiveFilesToKeep, files.size()); Review comment: Changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.
zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r769306460 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java ## @@ -249,6 +249,18 @@ + "record size estimate compute dynamically based on commit metadata. " + " This is critical in computing the insert parallelism and bin-packing inserts into small files."); + public static final ConfigProperty ARCHIVE_FILES_TO_KEEP_PROP = ConfigProperty + .key("hoodie.keep.archive.files") + .defaultValue("10") + .withDocumentation("The numbers of kept archive files under archived"); + + public static final ConfigProperty CLEAN_ARCHIVE_FILE_ENABLE_DROP = ConfigProperty + .key("hoodie.archive.clean.enable") Review comment: Tanks for your review. Changed as `hoodie.auto.trim.archive.files` and `hoodie.max.archive.files` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org