Karl-WangSK commented on a change in pull request #2260: URL: https://github.com/apache/hudi/pull/2260#discussion_r570748423
########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java ########## @@ -58,36 +62,97 @@ public SparkScheduleCompactionActionExecutor(HoodieEngineContext context, @Override protected HoodieCompactionPlan scheduleCompaction() { LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); + // judge if we need to compact according to num delta commits and time elapsed + boolean compactable = needCompact(config.getInlineCompactTriggerStrategy()); + if (compactable) { + LOG.info("Generating compaction plan for merge on read table " + config.getBasePath()); Review comment: yes.just exchange the order. ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java ########## @@ -58,36 +62,97 @@ public SparkScheduleCompactionActionExecutor(HoodieEngineContext context, @Override protected HoodieCompactionPlan scheduleCompaction() { LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); + // judge if we need to compact according to num delta commits and time elapsed + boolean compactable = needCompact(config.getInlineCompactTriggerStrategy()); + if (compactable) { + LOG.info("Generating compaction plan for merge on read table " + config.getBasePath()); + HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor(); + try { + SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView(); + Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toSet()); + // exclude files in pending clustering from compaction. + fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); + return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering); + } catch (IOException e) { + throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); + } + } + + return new HoodieCompactionPlan(); + } + + public Tuple2<Integer, String> getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) { Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline() .filterCompletedInstants().lastInstant(); - String lastCompactionTs = "0"; + HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline(); + + String lastCompactionTs; + int deltaCommitsSinceLastCompaction = 0; if (lastCompaction.isPresent()) { lastCompactionTs = lastCompaction.get().getTimestamp(); + } else { + lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp(); } + if (compactionTriggerStrategy != CompactionTriggerStrategy.TIME_ELAPSED) { + if (lastCompaction.isPresent()) { + deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(lastCompactionTs, Integer.MAX_VALUE).countInstants(); + } else { + deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(lastCompactionTs, Integer.MAX_VALUE).countInstants(); + } + } + return new Tuple2(deltaCommitsSinceLastCompaction, lastCompactionTs); Review comment: sure ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org