Karl-WangSK commented on a change in pull request #2260:
URL: https://github.com/apache/hudi/pull/2260#discussion_r559443976



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,90 @@ public 
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
   @Override
   protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean compactable = needCompact(config.getInlineCompactType());
+    if (compactable) {
+      LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
+      HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
+      try {
+        SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
+        Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+            .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+            .collect(Collectors.toSet());
+        // exclude files in pending clustering from compaction.
+        
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+        return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+      } catch (IOException e) {
+        throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+      }
+    }
+
+    return new HoodieCompactionPlan();
+  }
+
+  public Tuple2<Integer, String> checkCompact(CompactType compactType) {
     Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
-        .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+            .filterCompletedInstants().lastInstant();
+    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
+
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction = 0;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
     }
-
-    int deltaCommitsSinceLastCompaction = 
table.getActiveTimeline().getDeltaCommitTimeline()
-        .findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
-    if (config.getInlineCompactDeltaCommitMax() > 
deltaCommitsSinceLastCompaction) {
-      LOG.info("Not scheduling compaction as only " + 
deltaCommitsSinceLastCompaction
-          + " delta commits was found since last compaction " + 
lastCompactionTs + ". Waiting for "
-          + config.getInlineCompactDeltaCommitMax());
-      return new HoodieCompactionPlan();
+    if (compactType != CompactType.TIME_ELAPSED) {
+      if (lastCompaction.isPresent()) {
+        deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
+      } else {
+        deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
+      }
     }
+    return new Tuple2(deltaCommitsSinceLastCompaction, lastCompactionTs);
+  }
 
-    LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
-    HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
-    try {
-      SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
-      Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
-          .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
-          .collect(Collectors.toSet());
-      // exclude files in pending clustering from compaction.
-      
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
-      return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+  public boolean needCompact(CompactType compactType) {
+    boolean compactable;
+    // return deltaCommitsSinceLastCompaction and lastCompactionTs
+    Tuple2<Integer, String> threshold = checkCompact(compactType);
+    switch (compactType) {
+      case COMMIT_NUM:
+        compactable = config.getInlineCompactDeltaCommitMax() <= threshold._1;
+        break;
+      case TIME_ELAPSED:
+        compactable = parseToTimestamp(threshold._2) + 
config.getInlineCompactDeltaElapsedTimeMax() <= parseToTimestamp(instantTime);
+        break;
+      case NUM_OR_TIME:
+        compactable = config.getInlineCompactDeltaCommitMax() <= threshold._1
+            || parseToTimestamp(threshold._2) + 
config.getInlineCompactDeltaElapsedTimeMax() <= parseToTimestamp(instantTime);
+        break;
+      case NUM_AND_TIME:
+        compactable = config.getInlineCompactDeltaCommitMax() <= threshold._1
+            && parseToTimestamp(threshold._2) + 
config.getInlineCompactDeltaElapsedTimeMax() <= parseToTimestamp(instantTime);
+        break;
+      default:
+        throw new HoodieCompactionException("Unsupported compact type: " + 
config.getInlineCompactType());
+    }
 
-    } catch (IOException e) {
-      throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+    if (compactable) {
+      LOG.info(String.format("Scheduling compaction: %s. Delta commits found: 
%s times, and last compaction time is %s.",

Review comment:
       ok, added !




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to