kbuci commented on code in PR #10965:
URL: https://github.com/apache/hudi/pull/10965#discussion_r1556395911


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1135,8 +1138,34 @@ protected void 
completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
    */
   protected HoodieWriteMetadata<O> compact(String compactionInstantTime, 
boolean shouldComplete) {
     HoodieTable table = createTable(config, context.getHadoopConf().get());
+    Option<HoodieInstant> instantToCompactOption = 
Option.fromJavaOptional(table.getActiveTimeline()
+        .filterCompletedAndCompactionInstants()
+        .getInstants()
+        .stream()
+        .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime))
+        .findFirst());
+    try {
+      // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+      // compact job will abort if they attempt to execute compact before 
heartbeat expires
+      // Note that as long as all jobs for this table use this API for 
compact, then this alone should prevent
+      // compact rollbacks from running concurrently to compact commits.
+      txnManager.beginTransaction(instantToCompactOption, 
txnManager.getLastCompletedTransactionOwner());
+      try {
+        if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+          throw new HoodieLockException("Cannot compact instant " + 
compactionInstantTime + " due to heartbeat by existing job");
+        }
+      } catch (IOException e) {
+        throw new HoodieHeartbeatException("Error accessing heartbeat of 
instant to compact " + compactionInstantTime, e);
+      }
+      this.heartbeatClient.start(compactionInstantTime);
+    } finally {
+      txnManager.endTransaction(txnManager.getCurrentTransactionOwner());
+    }
     preWrite(compactionInstantTime, WriteOperationType.COMPACT, 
table.getMetaClient());
-    return tableServiceClient.compact(compactionInstantTime, shouldComplete);
+    HoodieWriteMetadata compactMetadata = 
tableServiceClient.compact(compactionInstantTime, shouldComplete);
+    this.heartbeatClient.stop(compactionInstantTime, true);

Review Comment:
   I was looking into a UT failure in 
`org.apache.hudi.table.functional.TestHoodieSparkMergeOnReadTableInsertUpdateDelete#testRepeatedRollbackOfCompaction`
 where two compact executions of the same instant time are called back to back 
(my understanding is that this is supposed to verify that the second compact 
does a no-op and succeeds upon seeing that plan is already committed). 
   I realized that with this change, the second compact call was failing due to 
calling `isHeartbeatExpired` and seeing an active heartbeat (from the first 
attempt) still running, despite the fact that here we are stopping the 
heartbeat after a successfully completing the compact. The reason that 
`isHeartbeatExpired` was unexpectedly `false` here is that
   1.  `isHeartbeatExpired` will return false if instant time is too recent, 
even if the heartbeat has been stopped (in the in-memory mapping)
   2. When 
`org.apache.hudi.client.heartbeat.HoodieHeartbeatClient#stop(java.lang.String)` 
is called (by the first compact call in UT) the heartbeat file is deleted and 
the heartbeat in in-memory mapping is stopped (as expected). But this means 
that the heartbeat cannot be started again (even if (1) is resolved), since 
heartbeat API doesn't allow caller to start a heartbeat that is present in 
in-memory mapping and has heartbeatStopped flag set to true.
   
   In order to get around this issue, I added another API in heartbeat API 
similar to stop, except that it removes the desired heartbeat from the 
in-memory mapping (forcing any future compact call in the same job to re-read 
the heartbeat files from DFS and create a new heartbeat in the in-memory 
mapping ). Though not sure if there might be a better approach here. I assume 
this existing functionality isn't a bug, as it makes sense for commits that 
cannot be repeatedly re-executed  (like ingestion COMMITs), and I assume the 
issue here stems from the fact that for compact we need to potentially 
repeatedly restart stopped heartbeats



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to