danny0405 commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1277329137


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##########
@@ -265,11 +267,36 @@ private int doCompact(JavaSparkContext jsc) throws 
Exception {
           cfg.compactionInstantTime = 
firstCompactionInstant.get().getTimestamp();
           LOG.info("Found the earliest scheduled compaction instant which will 
be executed: "
               + cfg.compactionInstantTime);
-        } else {
-          LOG.info("There is no scheduled compaction in the table.");
-          return 0;
+        }
+
+        // update cfg.compactionInstantTime if finding an expired instant
+        List<HoodieInstant> inflightInstants = metaClient.getActiveTimeline()
+            .filterPendingCompactionTimeline().filterInflights().getInstants();
+        List<String> expiredInstants = 
inflightInstants.stream().filter(instant -> {
+          try {
+            return 
client.getHeartbeatClient().isHeartbeatExpired(instant.getTimestamp());
+          } catch (IOException io) {
+            LOG.info("Failed to check heartbeat for instant " + instant);
+          }
+          return false;
+        }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+        if (!expiredInstants.isEmpty()) {
+          cfg.compactionInstantTime = expiredInstants.get(0);
+          LOG.info("Found expired compaction instant, update the earliest 
scheduled compaction instant which will be executed: "
+              + cfg.compactionInstantTime);
         }
       }
+
+      // do nothing if cfg.compactionInstantTime still is null
+      if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
+        LOG.info("There is no scheduled compaction in the table.");
+        return 0;
+      }
+
+      // start a heartbeat for the instant
+      client.getHeartbeatClient().start(cfg.compactionInstantTime);
+

Review Comment:
   The heartbeat was previously used for lazy cleaning for multi-writers:
   
   ```java
       if (config.getFailedWritesCleanPolicy().isLazy()) {
         this.heartbeatClient.start(instantTime);
       }
   ```
   
   Now it is used for table service rollback, makes sense to me, we just need 
to assure the heartbeat client is working during the whole lifecycle of the 
compaction job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to