wangxianghu commented on a change in pull request #2260:
URL: https://github.com/apache/hudi/pull/2260#discussion_r553699091



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -60,17 +63,32 @@ protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
     Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
         .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
+    String lastCompactionTs;
+    int deltaCommitsSinceLastCompaction;
     if (lastCompaction.isPresent()) {
       lastCompactionTs = lastCompaction.get().getTimestamp();
+      deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
+    } else {
+      lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
+      deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
     }
-
-    int deltaCommitsSinceLastCompaction = 
table.getActiveTimeline().getDeltaCommitTimeline()
-        .findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
-    if (config.getInlineCompactDeltaCommitMax() > 
deltaCommitsSinceLastCompaction) {
-      LOG.info("Not scheduling compaction as only " + 
deltaCommitsSinceLastCompaction
-          + " delta commits was found since last compaction " + 
lastCompactionTs + ". Waiting for "
-          + config.getInlineCompactDeltaCommitMax());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean numCommitEnabled = config.getInlineCompactDeltaNumCommitEnabled();
+    boolean timeEnabled = config.getInlineCompactDeltaElapsedEnabled();
+    boolean compactable;
+    if (numCommitEnabled && !timeEnabled) {
+      compactable = config.getInlineCompactDeltaCommitMax() > 
deltaCommitsSinceLastCompaction;
+    } else if (!numCommitEnabled && timeEnabled) {
+      compactable = parseToTimestamp(lastCompactionTs) + 
config.getInlineCompactDeltaElapsedTimeMax() > parseToTimestamp(instantTime);
+    } else {
+      compactable = config.getInlineCompactDeltaCommitMax() > 
deltaCommitsSinceLastCompaction
+          && parseToTimestamp(lastCompactionTs) + 
config.getInlineCompactDeltaElapsedTimeMax() > parseToTimestamp(instantTime);
+    }

Review comment:
       Since we have added flags for num style compaction and time elapsed 
style compaction. maybe we should check the flags first, to make sure at least 
one of them is enabled. if not, make compact with commits num as default(with a 
warn log).
   
   besides, we got 4 conditions here:
   1. compact with commit num only;
   2. compact with time elapsed only;
   3. compact when both commit num and time elapsed meet requirementsï¼›
   4. compact when one of them is met
   
   WDYT @Karl-WangSK  cc @yanghua 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to