[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-13 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r784462454



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
##
@@ -248,11 +253,32 @@ private HoodieInstant readCommit(GenericRecord record, 
boolean loadDetails) {
   break;
 }
   }
+} catch (Exception originalException) {
+  // merge small archive files may left uncompleted archive file which 
will cause exception.
+  // need to ignore this kind of exception here.
+  try {
+Path planPath = new Path(metaClient.getArchivePath(), 
"mergeArchivePlan");
+HoodieWrapperFileSystem fileSystem = metaClient.getFs();
+if (fileSystem.exists(planPath)) {
+  HoodieMergeArchiveFilePlan plan = 
TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem,
 planPath).get(), HoodieMergeArchiveFilePlan.class);
+  String mergedArchiveFileName = plan.getMergedArchiveFileName();
+  if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && 
fs.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) {
+LOG.warn("Catch exception because of reading uncompleted 
merging archive file " + mergedArchiveFileName + ". Ignore it here.");
+continue;
+  }
+}
+throw originalException;
+  } catch (Exception e) {
+// If anything wrong during parsing merge archive plan, we need to 
throw the original exception.
+// For example corrupted archive file and corrupted plan are both 
existed.
+throw originalException;
+  }

Review comment:
   Hi @nsivabalan and @yihua 
   The common concern is incomplete/duplicate data left after last merging of 
small archive files fails and the current Hudi writer / commit is configured to 
disable archive file merging.
   
   Ideally we need to check and clean dirty data before every archive. 
   
   Why we need this button before do clean works I think are :
   This is a new feature, it's more safe with a default false control here.
   I am pretty worried about multi-writer here, at least we have a way to 
control only one writer could do merge works.
   
   As for making sure that incomplete data will cause no damage for loading 
archived timeline until next clean up:
   1. we use HashSet to avoid duplicate instants during loading archive 
instants. 
   2. we use this try-catch to deal with exception caused by loading incomplete 
merged small archive files.
   
   In the next step, maybe we can take care about multi-writer, runs stable for 
some time in my staging/production environment and finally removed this strict 
restrictions for `verifyLastMergeArchiveFilesIfNecessary ` here :)
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-11 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r782002899



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+// Sort files by version suffix in reverse (implies reverse chronological 
order)
+Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+List mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+  List candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+  // before merge archive files build merge plan
+  String logFileName = computeLogFileName();
+  buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+  // merge archive files
+  mergeArchiveFiles(mergeCandidate);
+  // after merge, delete the small archive files.
+  deleteFilesParallelize(metaClient, candidateFiles, context, true);
+  // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+  metaClient.getFs().delete(planPath, false);
+}
+  }
+
+  /**
+   * Find the latest 'huge archive file' index as a break point and only 
check/merge newer archive files.
+   * Because we need to keep the original order of archive files which is 
important when loading archived instants with time filter.
+   * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, 
boolean loadInstantDetails, Function commitsFilter)
+   * @param smallFileLimitBytes small File Limit Bytes
+   * @param fsStatuses Sort by version suffix in reverse
+   * @return merge candidates
+   */
+  private List getMergeCandidates(long smallFileLimitBytes, 
FileStatus[] fsStatuses) {
+int index = 0;
+for (; index < fsStatuses.length; index++) {
+  if (fsStatuses[index].getLen() > smallFileLimitBytes) {
+break;
+  }
+}
+return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+String logWriteToken = writer.getLogFile().getLogWriteToken();
+HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+return hoodieLogFile.getFileName();
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {

Review comment:
   Changed. Mentioned below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-11 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r782002120



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+// Sort files by version suffix in reverse (implies reverse chronological 
order)
+Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+List mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+  List candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+  // before merge archive files build merge plan
+  String logFileName = computeLogFileName();
+  buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+  // merge archive files
+  mergeArchiveFiles(mergeCandidate);
+  // after merge, delete the small archive files.
+  deleteFilesParallelize(metaClient, candidateFiles, context, true);
+  // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+  metaClient.getFs().delete(planPath, false);
+}
+  }
+
+  /**
+   * Find the latest 'huge archive file' index as a break point and only 
check/merge newer archive files.
+   * Because we need to keep the original order of archive files which is 
important when loading archived instants with time filter.
+   * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, 
boolean loadInstantDetails, Function commitsFilter)
+   * @param smallFileLimitBytes small File Limit Bytes
+   * @param fsStatuses Sort by version suffix in reverse
+   * @return merge candidates
+   */
+  private List getMergeCandidates(long smallFileLimitBytes, 
FileStatus[] fsStatuses) {
+int index = 0;
+for (; index < fsStatuses.length; index++) {
+  if (fsStatuses[index].getLen() > smallFileLimitBytes) {
+break;
+  }
+}
+return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+String logWriteToken = writer.getLogFile().getLogWriteToken();
+HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+return hoodieLogFile.getFileName();
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+  Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+  HoodieWrapperFileSystem fs = metaClient.getFs();
+  // If plan exist, last merge small archive files was failed.
+  // we need to revert or complete last action.
+  if (fs.exists(planPath)) {
+HoodieMergeArchiveFilePlan plan = 
TimelineMetadataUtils.deserializeAvroMetadata(readDataFromPath(planPath).get(), 
HoodieMergeArchiveFilePlan.class);
+Path mergedArchiveFile = new Path(metaClient.getArchivePath(), 
plan.getMergedArchiveFileName());
+List candidates = 

[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-11 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r782001297



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+// Sort files by version suffix in reverse (implies reverse chronological 
order)
+Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+List mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+  List candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());

Review comment:
   Sure. https://issues.apache.org/jira/browse/HUDI-3212




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-11 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r781998272



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+// Sort files by version suffix in reverse (implies reverse chronological 
order)
+Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+List mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+  List candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+  // before merge archive files build merge plan
+  String logFileName = computeLogFileName();
+  buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+  // merge archive files
+  mergeArchiveFiles(mergeCandidate);
+  // after merge, delete the small archive files.
+  deleteFilesParallelize(metaClient, candidateFiles, context, true);
+  // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+  metaClient.getFs().delete(planPath, false);
+}
+  }
+
+  /**
+   * Find the latest 'huge archive file' index as a break point and only 
check/merge newer archive files.
+   * Because we need to keep the original order of archive files which is 
important when loading archived instants with time filter.
+   * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, 
boolean loadInstantDetails, Function commitsFilter)
+   * @param smallFileLimitBytes small File Limit Bytes
+   * @param fsStatuses Sort by version suffix in reverse
+   * @return merge candidates
+   */
+  private List getMergeCandidates(long smallFileLimitBytes, 
FileStatus[] fsStatuses) {
+int index = 0;
+for (; index < fsStatuses.length; index++) {
+  if (fsStatuses[index].getLen() > smallFileLimitBytes) {
+break;
+  }
+}
+return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+String logWriteToken = writer.getLogFile().getLogWriteToken();
+HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+return hoodieLogFile.getFileName();
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+  Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+  HoodieWrapperFileSystem fs = metaClient.getFs();
+  // If plan exist, last merge small archive files was failed.
+  // we need to revert or complete last action.
+  if (fs.exists(planPath)) {
+HoodieMergeArchiveFilePlan plan = 
TimelineMetadataUtils.deserializeAvroMetadata(readDataFromPath(planPath).get(), 
HoodieMergeArchiveFilePlan.class);
+Path mergedArchiveFile = new Path(metaClient.getArchivePath(), 
plan.getMergedArchiveFileName());
+List candidates = 

[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-11 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r781997631



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+// Sort files by version suffix in reverse (implies reverse chronological 
order)
+Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+List mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+  List candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+  // before merge archive files build merge plan
+  String logFileName = computeLogFileName();
+  buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+  // merge archive files
+  mergeArchiveFiles(mergeCandidate);
+  // after merge, delete the small archive files.
+  deleteFilesParallelize(metaClient, candidateFiles, context, true);
+  // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+  metaClient.getFs().delete(planPath, false);
+}
+  }
+
+  /**
+   * Find the latest 'huge archive file' index as a break point and only 
check/merge newer archive files.
+   * Because we need to keep the original order of archive files which is 
important when loading archived instants with time filter.
+   * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, 
boolean loadInstantDetails, Function commitsFilter)
+   * @param smallFileLimitBytes small File Limit Bytes
+   * @param fsStatuses Sort by version suffix in reverse
+   * @return merge candidates
+   */
+  private List getMergeCandidates(long smallFileLimitBytes, 
FileStatus[] fsStatuses) {
+int index = 0;
+for (; index < fsStatuses.length; index++) {
+  if (fsStatuses[index].getLen() > smallFileLimitBytes) {
+break;
+  }
+}
+return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+String logWriteToken = writer.getLogFile().getLogWriteToken();
+HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+return hoodieLogFile.getFileName();
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+  Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+  HoodieWrapperFileSystem fs = metaClient.getFs();
+  // If plan exist, last merge small archive files was failed.
+  // we need to revert or complete last action.
+  if (fs.exists(planPath)) {
+HoodieMergeArchiveFilePlan plan = 
TimelineMetadataUtils.deserializeAvroMetadata(readDataFromPath(planPath).get(), 
HoodieMergeArchiveFilePlan.class);

Review comment:
   Nice catch! We need to take care of parsing merge plan failed.
   Changed.




-- 
This is an automated 

[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-11 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r781995912



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:

Review comment:
   Chanegd.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:

Review comment:
   Changed.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write

Review comment:
   Changed.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+// Sort files by version suffix in reverse (implies reverse chronological 
order)
+Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+List mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+  List candidateFiles = mergeCandidate.stream().map(fs -> 

[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-11 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r781995466



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,24 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty ARCHIVE_FILES_MERGE_BATCH_SIZE = 
ConfigProperty
+  .key("hoodie.archive.files.merge.batch.size")
+  .defaultValue(String.valueOf(10))
+  .withDocumentation("The numbers of small archive files are merged at 
once.");
+
+  public static final ConfigProperty 
ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+  .key("hoodie.archive.merge.small.file.limit.bytes")
+  .defaultValue(String.valueOf(20 * 1024 * 1024))
+  .withDocumentation("This config sets the archive file size limit below 
which an archive file becomes a candidate to be selected as such a small 
file.");
+
+  public static final ConfigProperty ARCHIVE_AUTO_MERGE_ENABLE = 
ConfigProperty
+  .key("hoodie.archive.auto.merge.enable")
+  .defaultValue("false")
+  .withDocumentation("When enable, hoodie will auto merge several small 
archive files into larger one. It's"
+  + " useful when storage scheme doesn't support append operation.");
+
+
+

Review comment:
   Changed.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,24 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty ARCHIVE_FILES_MERGE_BATCH_SIZE = 
ConfigProperty
+  .key("hoodie.archive.files.merge.batch.size")
+  .defaultValue(String.valueOf(10))
+  .withDocumentation("The numbers of small archive files are merged at 
once.");
+
+  public static final ConfigProperty 
ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+  .key("hoodie.archive.merge.small.file.limit.bytes")
+  .defaultValue(String.valueOf(20 * 1024 * 1024))
+  .withDocumentation("This config sets the archive file size limit below 
which an archive file becomes a candidate to be selected as such a small 
file.");
+
+  public static final ConfigProperty ARCHIVE_AUTO_MERGE_ENABLE = 
ConfigProperty
+  .key("hoodie.archive.auto.merge.enable")
+  .defaultValue("false")
+  .withDocumentation("When enable, hoodie will auto merge several small 
archive files into larger one. It's"
+  + " useful when storage scheme doesn't support append operation.");

Review comment:
   Changed.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,24 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty ARCHIVE_FILES_MERGE_BATCH_SIZE = 
ConfigProperty
+  .key("hoodie.archive.files.merge.batch.size")
+  .defaultValue(String.valueOf(10))
+  .withDocumentation("The numbers of small archive files are merged at 
once.");

Review comment:
   Changed.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,24 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty ARCHIVE_FILES_MERGE_BATCH_SIZE = 
ConfigProperty
+  .key("hoodie.archive.files.merge.batch.size")

Review comment:
   Changed.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,24 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty ARCHIVE_FILES_MERGE_BATCH_SIZE = 
ConfigProperty
+  .key("hoodie.archive.files.merge.batch.size")
+  .defaultValue(String.valueOf(10))
+  .withDocumentation("The numbers of small archive files are merged at 
once.");
+
+  public static final ConfigProperty 
ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+  .key("hoodie.archive.merge.small.file.limit.bytes")
+  .defaultValue(String.valueOf(20 * 1024 * 1024))
+  .withDocumentation("This config sets the archive file size limit below 
which an archive 

[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-11 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r781948726



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
##
@@ -248,11 +253,32 @@ private HoodieInstant readCommit(GenericRecord record, 
boolean loadDetails) {
   break;
 }
   }
+} catch (Exception originalException) {
+  // merge small archive files may left uncompleted archive file which 
will cause exception.
+  // need to ignore this kind of exception here.
+  try {
+Path planPath = new Path(metaClient.getArchivePath(), 
"mergeArchivePlan");
+HoodieWrapperFileSystem fileSystem = metaClient.getFs();
+if (fileSystem.exists(planPath)) {
+  HoodieMergeArchiveFilePlan plan = 
TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem,
 planPath).get(), HoodieMergeArchiveFilePlan.class);
+  String mergedArchiveFileName = plan.getMergedArchiveFileName();
+  if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && 
fs.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) {
+LOG.warn("Catch exception because of reading uncompleted 
merging archive file " + mergedArchiveFileName + ". Ignore it here.");
+continue;
+  }
+}
+throw originalException;
+  } catch (Exception e) {
+// If anything wrong during parsing merge archive plan, we need to 
throw the original exception.
+// For example corrupted archive file and corrupted plan are both 
existed.
+throw originalException;
+  }

Review comment:
   We use these code to check if originalException is caused by corrupted 
mergedArchiveFile and ignore it.
   Anything else needs to be threw again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777924466



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
##
@@ -218,7 +219,7 @@ private HoodieInstant readCommit(GenericRecord record, 
boolean loadDetails) {
   // Sort files by version suffix in reverse (implies reverse 
chronological order)
   Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
 
-  List instantsInRange = new ArrayList<>();
+  Set instantsInRange = new HashSet<>();

Review comment:
   ```
 @Override
 public boolean equals(Object o) {
   if (this == o) {
 return true;
   }
   if (o == null || getClass() != o.getClass()) {
 return false;
   }
   HoodieInstant that = (HoodieInstant) o;
   return state == that.state && Objects.equals(action, that.action) && 
Objects.equals(timestamp, that.timestamp);
 }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777896842



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
##
@@ -218,7 +219,7 @@ private HoodieInstant readCommit(GenericRecord record, 
boolean loadDetails) {
   // Sort files by version suffix in reverse (implies reverse 
chronological order)
   Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
 
-  List instantsInRange = new ArrayList<>();
+  Set instantsInRange = new HashSet<>();

Review comment:
   Maybe we could use HashSet to avoid duplicate instants during loading 
archive instants.
   This is based on there is no same HoodieInstant in the hoodie time line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777896842



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
##
@@ -218,7 +219,7 @@ private HoodieInstant readCommit(GenericRecord record, 
boolean loadDetails) {
   // Sort files by version suffix in reverse (implies reverse 
chronological order)
   Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
 
-  List instantsInRange = new ArrayList<>();
+  Set instantsInRange = new HashSet<>();

Review comment:
   Maybe we could use HashSet to avoid duplicate instants.
   This is based on there is no same HoodieInstant in the hoodie time line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777896842



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
##
@@ -218,7 +219,7 @@ private HoodieInstant readCommit(GenericRecord record, 
boolean loadDetails) {
   // Sort files by version suffix in reverse (implies reverse 
chronological order)
   Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
 
-  List instantsInRange = new ArrayList<>();
+  Set instantsInRange = new HashSet<>();

Review comment:
   Maybe we could use HashSet to avoid duplicate instants.
   This is based on there are no same instant time in the hoodie time line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777901212



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable()) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(

Review comment:
   Nice catch here, I just find out that maybe it's important to keep the 
original instants order of small archive files.
   
https://github.com/apache/hudi/blob/b5f05fd153df29a8be377404a14a0ced2f00b4bf/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java#L219
   When load archived instants, hoodie will use this order to optimize skipping 
reading unnecessary archived files
   
https://github.com/apache/hudi/blob/b5f05fd153df29a8be377404a14a0ced2f00b4bf/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java#L243
   
   So just use the same order compactor here.
   What do you think? :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777892769



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable()) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+List mergeCandidate = new ArrayList<>();
+int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize();
+long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+for (FileStatus fs: fsStatuses) {
+  if (fs.getLen() < smallFileLimitBytes) {
+mergeCandidate.add(fs);
+  }
+  if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+break;
+  }
+}
+
+if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+  List candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+  // before merge archive files build merge plan
+  String logFileName = computeLogFileName();
+  buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+  // merge archive files
+  mergeArchiveFiles(mergeCandidate);
+  // after merge, delete the small archive files.
+  deleteFilesParallelize(metaClient, candidateFiles, context, true);
+  // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+  metaClient.getFs().delete(planPath, false);
+}
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+if (!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+  String logWriteToken = writer.getLogFile().getLogWriteToken();
+  HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+  return hoodieLogFile.getFileName();
+} else {
+  return writer.getLogFile().getFileName();
+}
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+if (config.getArchiveAutoMergeEnable()) {

Review comment:
   When enable and disable:
   1. disable using graceful way, there 's nothing left by this patch.
   2. Disable using un-graceful way, there may be left duplicate archive 
instant informations until enable again, but i think there may be cause no 
damaged if we used HasSet when loading archive instants in this pr changed. 
   
   Why we need this button before do clean works I think are :
   1. This is a new feature, it's more safe with a default false control here. 
   2. I am pretty worried about multi-writer here, at least we have a way to 
control only one writer could do merge works.
   
   In the next step, maybe we can take care about multi-writer, also removed 
this button here :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777898257



##
File path: hudi-common/src/main/avro/HoodieMergeArchiveFilePlan.avsc
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+   "namespace":"org.apache.hudi.avro.model",
+   "type":"record",
+   "name":"HoodieMergeArchiveFilePlan",
+   "fields":[

Review comment:
   Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777896842



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
##
@@ -218,7 +219,7 @@ private HoodieInstant readCommit(GenericRecord record, 
boolean loadDetails) {
   // Sort files by version suffix in reverse (implies reverse 
chronological order)
   Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
 
-  List instantsInRange = new ArrayList<>();
+  Set instantsInRange = new HashSet<>();

Review comment:
   Maybe we could use HashSet to avoid duplicate instants.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777895427



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable()) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {

Review comment:
   Added. Thanks a lot for your review




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777895319



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable()) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+List mergeCandidate = new ArrayList<>();
+int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize();
+long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+for (FileStatus fs: fsStatuses) {
+  if (fs.getLen() < smallFileLimitBytes) {
+mergeCandidate.add(fs);
+  }
+  if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+break;
+  }
+}
+
+if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+  List candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+  // before merge archive files build merge plan
+  String logFileName = computeLogFileName();
+  buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+  // merge archive files
+  mergeArchiveFiles(mergeCandidate);
+  // after merge, delete the small archive files.
+  deleteFilesParallelize(metaClient, candidateFiles, context, true);

Review comment:
   Just create a new log file here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777895112



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable()) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+List mergeCandidate = new ArrayList<>();
+int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize();
+long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+for (FileStatus fs: fsStatuses) {
+  if (fs.getLen() < smallFileLimitBytes) {
+mergeCandidate.add(fs);
+  }
+  if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+break;
+  }
+}
+
+if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+  List candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+  // before merge archive files build merge plan
+  String logFileName = computeLogFileName();
+  buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+  // merge archive files
+  mergeArchiveFiles(mergeCandidate);
+  // after merge, delete the small archive files.
+  deleteFilesParallelize(metaClient, candidateFiles, context, true);
+  // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+  metaClient.getFs().delete(planPath, false);
+}
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+if (!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+  String logWriteToken = writer.getLogFile().getLogWriteToken();
+  HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+  return hoodieLogFile.getFileName();
+} else {
+  return writer.getLogFile().getFileName();

Review comment:
   > Do we create a new log block in log10 which will contain all merged 
log blocks from log1 to log10 ? Or should we explicitly create a new log file 
(log11)
   
   Nice catch! It is better for option 2 that we create a new log11 each time 
during merging small archive files.
   
   > Do we even need to consider adding this new feature for storage schemes 
where append is supported. In other words, should we consider enabling this 
feature just for storage schemes where append is not supported. and leave it as 
no-op for storage schemes appends are supported.
   
   It's better to enable this feature just for storage schemes where append is 
not supported




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777892769



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 LOG.info("No Instants to archive");
   }
 
+  if (config.getArchiveAutoMergeEnable()) {
+mergeArchiveFilesIfNecessary(context);
+  }
   return success;
 } finally {
   close();
 }
   }
 
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+// Flush reminded content if existed and open a new write
+reOpenWriter();
+// List all archive files
+FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+List mergeCandidate = new ArrayList<>();
+int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize();
+long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+for (FileStatus fs: fsStatuses) {
+  if (fs.getLen() < smallFileLimitBytes) {
+mergeCandidate.add(fs);
+  }
+  if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+break;
+  }
+}
+
+if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+  List candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+  // before merge archive files build merge plan
+  String logFileName = computeLogFileName();
+  buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+  // merge archive files
+  mergeArchiveFiles(mergeCandidate);
+  // after merge, delete the small archive files.
+  deleteFilesParallelize(metaClient, candidateFiles, context, true);
+  // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+  metaClient.getFs().delete(planPath, false);
+}
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+if (!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+  String logWriteToken = writer.getLogFile().getLogWriteToken();
+  HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+  return hoodieLogFile.getFileName();
+} else {
+  return writer.getLogFile().getFileName();
+}
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+if (config.getArchiveAutoMergeEnable()) {

Review comment:
   When enable and disable:
   1. disable using graceful way, there 's nothing left by this patch.
   2. Disable using un-graceful way, there may be left duplicate archive 
instant informations until enable again, but i think there may be cause no 
damaged if we used HasSet when loading archive instants in this pr changed. 
   
   Why we need this button before do clean works I think are :
   1. This is a new feature, it's more safe with a default false control here. 
   2. I am pretty worried about multi-writer here at least we have a way to 
control only one writer could do merge works.
   
   In the next step, maybe we can take care about multi-writer, also removed 
this button here :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2022-01-04 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777887971



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -106,6 +116,23 @@ private Writer openWriter() {
 }
   }
 
+  public void reOpenWriter() {
+try {
+  if (this.writer != null) {
+this.writer.close();

Review comment:
   Sure thing, changed. Just changed the reOpenWriter function which need 
to be used in UT to remove duplicate codes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2021-12-22 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r773735747



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,21 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty ARCHIVE_MAX_FILES = ConfigProperty
+  .key("hoodie.archive.max.files")
+  .noDefaultValue()
+  .withDocumentation("The numbers of kept archive files under archived.");
+
+  public static final ConfigProperty ARCHIVE_AUTO_TRIM_ENABLE = 
ConfigProperty

Review comment:
   Sure, will compact these small archive files instead of deleting. This 
may need to do design work.
   Will upgrade the code and design ASAP.
   Thanks a lot for your review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2021-12-16 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r771087760



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,21 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty MAX_ARCHIVE_FILES_TO_KEEP_PROP = 
ConfigProperty
+  .key("hoodie.max.archive.files")
+  .noDefaultValue()
+  .withDocumentation("The numbers of kept archive files under archived.");
+
+  public static final ConfigProperty AUTO_TRIM_ARCHIVE_FILES_DROP = 
ConfigProperty
+  .key("hoodie.auto.trim.archive.files")

Review comment:
   Changed.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,21 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty MAX_ARCHIVE_FILES_TO_KEEP_PROP = 
ConfigProperty
+  .key("hoodie.max.archive.files")

Review comment:
   Changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2021-12-16 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r770425205



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -140,6 +146,48 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 }
   }
 
+  private void trimArchiveFilesIfNecessary(HoodieEngineContext context) throws 
IOException {
+Stream allLogFiles = 
FSUtils.getAllLogFiles(metaClient.getFs(),
+archiveFilePath.getParent(),
+archiveFilePath.getName(),
+HoodieArchivedLogFile.ARCHIVE_EXTENSION,
+"");
+List sortedLogFilesList = 
allLogFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).collect(Collectors.toList());
+if (!sortedLogFilesList.isEmpty()) {
+  List skipped = 
sortedLogFilesList.stream().skip(maxArchiveFilesToKeep).map(HoodieLogFile::getPath).map(Path::toString).collect(Collectors.toList());
+  if (!skipped.isEmpty()) {
+LOG.info("Deleting archive files :  " + skipped);
+context.setJobStatus(this.getClass().getSimpleName(), "Delete archive 
files");
+Map result = deleteFilesParallelize(metaClient, 
skipped, context, true);

Review comment:
   Changed.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -140,6 +146,48 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
 }
   }
 
+  private void trimArchiveFilesIfNecessary(HoodieEngineContext context) throws 
IOException {
+Stream allLogFiles = 
FSUtils.getAllLogFiles(metaClient.getFs(),
+archiveFilePath.getParent(),
+archiveFilePath.getName(),
+HoodieArchivedLogFile.ARCHIVE_EXTENSION,
+"");
+List sortedLogFilesList = 
allLogFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).collect(Collectors.toList());
+if (!sortedLogFilesList.isEmpty()) {
+  List skipped = 
sortedLogFilesList.stream().skip(maxArchiveFilesToKeep).map(HoodieLogFile::getPath).map(Path::toString).collect(Collectors.toList());

Review comment:
   Changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2021-12-16 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r770425379



##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
##
@@ -183,6 +217,41 @@ public void testArchiveTableWithArchival(boolean 
enableMetadata) throws Exceptio
 }
   }
 
+  @ParameterizedTest
+  @MethodSource("testArchiveTableWithArchivalTrim")
+  public void testArchiveTableWithArchivalCleanUp(boolean enableMetadata, 
boolean enableArchiveTrim, int archiveFilesToKeep) throws Exception {
+HashSet archiveFilesExisted = new HashSet<>();
+ArrayList currentExistArchiveFiles = new ArrayList<>();
+HoodieWriteConfig writeConfig = 
initTestTableAndGetWriteConfig(enableMetadata, 2, 3, 2, enableArchiveTrim, 
archiveFilesToKeep);
+String archivePath = metaClient.getArchivePath();
+for (int i = 1; i < 10; i++) {
+  testTable.doWriteOperation("000" + i, WriteOperationType.UPSERT, i 
== 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", 
"p2"), 2);
+  // trigger archival
+  archiveAndGetCommitsList(writeConfig);
+  RemoteIterator iter = 
metaClient.getFs().listFiles(new Path(archivePath), false);
+  ArrayList files = new ArrayList<>();
+  while (iter.hasNext()) {
+files.add(iter.next().getPath().toString());
+  }
+  archiveFilesExisted.addAll(files);
+  currentExistArchiveFiles = files;
+}
+
+assertEquals(archiveFilesToKeep, currentExistArchiveFiles.size());
+
+if (enableArchiveTrim) {
+  // sort archive files path
+  List sorted = 
archiveFilesExisted.stream().sorted().collect(Collectors.toList());
+  List archiveFilesDeleted = sorted.subList(0, 3 - 
archiveFilesToKeep);
+  List archiveFilesKept = sorted.subList(3 - archiveFilesToKeep, 
sorted.size());
+
+  // assert older archive files are deleted
+  assertFalse(currentExistArchiveFiles.containsAll(archiveFilesDeleted));
+  // assert most recent archive files are preserved
+  assertTrue(currentExistArchiveFiles.containsAll(archiveFilesKept));
+}

Review comment:
   Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2021-12-16 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r770425009



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,19 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty MAX_ARCHIVE_FILES_TO_KEEP_PROP = 
ConfigProperty
+  .key("hoodie.max.archive.files")
+  .defaultValue("10")
+  .withDocumentation("The numbers of kept archive files under archived.");
+
+  public static final ConfigProperty AUTO_TRIM_ARCHIVE_FILES_DROP = 
ConfigProperty
+  .key("hoodie.auto.trim.archive.files")
+  .defaultValue("false")
+  .withDocumentation("When enabled, Hoodie will keep the most recent " + 
MAX_ARCHIVE_FILES_TO_KEEP_PROP.key()
+  + " archive files and delete older one which lose part of archived 
instants information.");

Review comment:
   Emmm, because all the archive related configs such as 
`hoodie.archive.automatic`, `hoodie.commits.archival.batch` and 
`hoodie.keep.min.commits`, etc are all lived in `HoodieCompactionConfig `, 
maybe it's better to be the same :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2021-12-16 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r770423255



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,19 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty MAX_ARCHIVE_FILES_TO_KEEP_PROP = 
ConfigProperty
+  .key("hoodie.max.archive.files")
+  .defaultValue("10")
+  .withDocumentation("The numbers of kept archive files under archived.");
+
+  public static final ConfigProperty AUTO_TRIM_ARCHIVE_FILES_DROP = 
ConfigProperty
+  .key("hoodie.auto.trim.archive.files")
+  .defaultValue("false")
+  .withDocumentation("When enabled, Hoodie will keep the most recent " + 
MAX_ARCHIVE_FILES_TO_KEEP_PROP.key()

Review comment:
   Appreciate it. Changed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2021-12-16 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r770422959



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,19 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty MAX_ARCHIVE_FILES_TO_KEEP_PROP = 
ConfigProperty
+  .key("hoodie.max.archive.files")
+  .defaultValue("10")

Review comment:
   Sure, thing. Changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2021-12-14 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r769306591



##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
##
@@ -183,6 +217,24 @@ public void testArchiveTableWithArchival(boolean 
enableMetadata) throws Exceptio
 }
   }
 
+  @ParameterizedTest
+  @MethodSource("testArchiveTableWithArchivalCleanUp")
+  public void testArchiveTableWithArchivalCleanUp(boolean enableMetadata, 
boolean enableArchiveClean, int archiveFilesToKeep) throws Exception {
+HoodieWriteConfig writeConfig = 
initTestTableAndGetWriteConfig(enableMetadata, 2, 3, 2, enableArchiveClean, 
archiveFilesToKeep);
+for (int i = 1; i < 10; i++) {
+  testTable.doWriteOperation("000" + i, WriteOperationType.UPSERT, i 
== 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", 
"p2"), 2);
+  // trigger archival
+  archiveAndGetCommitsList(writeConfig);
+}
+String archivePath = metaClient.getArchivePath();
+RemoteIterator iter = metaClient.getFs().listFiles(new 
Path(archivePath), false);
+ArrayList files = new ArrayList<>();
+while (iter.hasNext()) {
+  files.add(iter.next());
+}
+assertEquals(archiveFilesToKeep, files.size());

Review comment:
   Changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4078: [HUDI-2833] Clean up unused archive files instead of expanding indefinitely.

2021-12-14 Thread GitBox


zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r769306460



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##
@@ -249,6 +249,18 @@
   + "record size estimate compute dynamically based on commit 
metadata. "
   + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty ARCHIVE_FILES_TO_KEEP_PROP = 
ConfigProperty
+  .key("hoodie.keep.archive.files")
+  .defaultValue("10")
+  .withDocumentation("The numbers of kept archive files under archived");
+
+  public static final ConfigProperty CLEAN_ARCHIVE_FILE_ENABLE_DROP = 
ConfigProperty
+  .key("hoodie.archive.clean.enable")

Review comment:
   Tanks for your review. Changed as `hoodie.auto.trim.archive.files` and 
`hoodie.max.archive.files`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org