kbuci commented on code in PR #18012:
URL: https://github.com/apache/hudi/pull/18012#discussion_r2874951668


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -306,21 +308,47 @@ protected HoodieWriteMetadata<O> compact(String 
compactionInstantTime, boolean s
    * @return Collection of Write Status
    */
   protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table, 
String compactionInstantTime, boolean shouldComplete) {
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
     InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
     HoodieInstant inflightInstant = 
instantGenerator.getCompactionInflightInstant(compactionInstantTime);
-    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
-      table.getMetaClient().reloadActiveTimeline();
+    try {
+      // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+      // compact job will abort if they attempt to execute compact before 
heartbeat expires
+      // Note that as long as all jobs for this table use this API for compact 
with auto-commit, then this alone should prevent
+      // compact rollbacks from running concurrently to compact commits.
+      txnManager.beginStateChange(Option.of(inflightInstant), 
txnManager.getLastCompletedTransactionOwner());
+      try {
+        if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+          throw new HoodieLockException("Cannot compact instant " + 
compactionInstantTime + " due to heartbeat by concurrent writer/job");
+        }
+      } catch (IOException e) {
+        throw new HoodieHeartbeatException("Error accessing heartbeat of 
instant to compact " + compactionInstantTime, e);
+      }
+      if 
(!table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionInstantTime))
 {

Review Comment:
   In between the start of this function and acquiring the table lock, another 
writer might have attempted the compaction plan. This current writer needs to 
reload the timeline (now that is has started the heartbeat and confirm that 
it's the sole writer currently executing the plan) to see what is the current 
state `REQUESTED/INFLIGHT/COMMITTED` of the compaction plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to