Karl-WangSK commented on a change in pull request #2260:
URL: https://github.com/apache/hudi/pull/2260#discussion_r570746552



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,97 @@ public 
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
   @Override
   protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean compactable = 
needCompact(config.getInlineCompactTriggerStrategy());
+    if (compactable) {
+      LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
+      HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
+      try {
+        SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
+        Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+            .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+            .collect(Collectors.toSet());
+        // exclude files in pending clustering from compaction.
+        
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+        return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+      } catch (IOException e) {
+        throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+      }
+    }
+
+    return new HoodieCompactionPlan();
+  }
+
+  public Tuple2<Integer, String> 
getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
     Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
         .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
+
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction = 0;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
     }
+    if (compactionTriggerStrategy != CompactionTriggerStrategy.TIME_ELAPSED) {
+      if (lastCompaction.isPresent()) {

Review comment:
       yes. I think so. wdyt? @wangxianghu 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to