lw309637554 commented on a change in pull request #2379: URL: https://github.com/apache/hudi/pull/2379#discussion_r551986565
########## File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java ########## @@ -682,6 +693,58 @@ public void testInlineClustering() throws Exception { }); } + private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, + String clusteringInstantTime, boolean runSchedule) { + HoodieClusteringJob.Config config = new HoodieClusteringJob.Config(); + config.basePath = basePath; + config.clusteringInstantTime = clusteringInstantTime; + config.runSchedule = runSchedule; + config.propsFilePath = dfsBasePath + "/clusteringjob.properties"; + return config; + } + + @Test + public void testHoodieAsyncClusteringJob() throws Exception { + String tableBasePath = dfsBasePath + "/asyncClustering"; + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP)); + cfg.configs.add(String.format("%s=true", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY)); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + // for not confiict with delta streamer commit, just add 3600s + String clusterInstantTime = HoodieActiveTimeline.COMMIT_FORMATTER + .format(new Date(System.currentTimeMillis() + 3600 * 1000)); + LOG.info("Cluster instant time " + clusterInstantTime); + HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, + clusterInstantTime, true); + HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, scheduleClusteringConfig); + int scheduleClusteringResult = scheduleClusteringJob.cluster(scheduleClusteringConfig.retry); + if (scheduleClusteringResult == 0) { + LOG.info("Schedule clustering success, now cluster"); + HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, + clusterInstantTime, false); + HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig); + clusterClusteringJob.cluster(clusterClusteringConfig.retry); + LOG.info("Cluster success"); + } else { + LOG.warn("Schedule clustering failed"); + } + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true); + int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length; + int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length; + System.out.println("PendingReplaceSize=" + pendingReplaceSize + ",completeReplaceSize = " + completeReplaceSize); + return completeReplaceSize > 0; Review comment: Because if always completeReplaceSize <= 0 the runner will throw time out exception.Now i add the assert for completeReplaceSize == 1. As the unit test mainly test async clustering schedule and cluster, just assert completeReplaceSize will be ok. For records check can cover in cluster unit test. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org