nsivabalan commented on code in PR #18012:
URL: https://github.com/apache/hudi/pull/18012#discussion_r2886373283


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -309,21 +311,40 @@ protected HoodieWriteMetadata<O> compact(String 
compactionInstantTime, boolean s
    * @return Collection of Write Status
    */
   protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table, 
String compactionInstantTime, boolean shouldComplete) {
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
     InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
     HoodieInstant inflightInstant = 
instantGenerator.getCompactionInflightInstant(compactionInstantTime);
-    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
-      table.getMetaClient().reloadActiveTimeline();
+    try {
+      // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+      // compact job will abort if they attempt to execute compact before 
heartbeat expires
+      // Note that as long as all jobs for this table use this API for compact 
with auto-commit, then this alone should prevent
+      // compact rollbacks from running concurrently to compact commits.
+      txnManager.beginStateChange(Option.of(inflightInstant), 
txnManager.getLastCompletedTransactionOwner());
+      startExecutionHeartbeat(compactionInstantTime);
+      if 
(!table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionInstantTime))
 {
+        throw new HoodieException("Requested compaction instant " + 
compactionInstantTime + " is not present as pending or already completed in the 
active timeline.");
+      }
+      this.heartbeatClient.start(compactionInstantTime);

Review Comment:
   our txn manager is abstracted out properly.
   even for single writer mode, it all neats abstracts out. 
   but the heart beat client is not. 
   so, we should avoid starting heart beat for single writer mode right? 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1299,4 +1324,14 @@ protected void 
updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient
   protected void releaseResources(String instantTime) {
     // do nothing here
   }
+
+  private void startExecutionHeartbeat(String instantTime) {

Review Comment:
   the naming signifies, we are going to start a heart heat here. 
   how about `validateHeartBeat` or something on those lines.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -306,21 +308,47 @@ protected HoodieWriteMetadata<O> compact(String 
compactionInstantTime, boolean s
    * @return Collection of Write Status
    */
   protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table, 
String compactionInstantTime, boolean shouldComplete) {
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
     InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
     HoodieInstant inflightInstant = 
instantGenerator.getCompactionInflightInstant(compactionInstantTime);
-    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
-      table.getMetaClient().reloadActiveTimeline();
+    try {
+      // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+      // compact job will abort if they attempt to execute compact before 
heartbeat expires
+      // Note that as long as all jobs for this table use this API for compact 
with auto-commit, then this alone should prevent
+      // compact rollbacks from running concurrently to compact commits.
+      txnManager.beginStateChange(Option.of(inflightInstant), 
txnManager.getLastCompletedTransactionOwner());
+      try {
+        if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+          throw new HoodieLockException("Cannot compact instant " + 
compactionInstantTime + " due to heartbeat by concurrent writer/job");
+        }
+      } catch (IOException e) {
+        throw new HoodieHeartbeatException("Error accessing heartbeat of 
instant to compact " + compactionInstantTime, e);
+      }
+      if 
(!table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionInstantTime))
 {

Review Comment:
   gotcha.
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -360,32 +381,36 @@ public void commitCompaction(String 
compactionInstantTime, HoodieWriteMetadata<O
    * Commit Compaction and track metrics.
    */
   protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable 
table, String compactionCommitTime, List<HoodieWriteStat> 
partialMetadataWriteStats) {
-    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status and commit compaction: " + config.getTableName());
-    List<HoodieWriteStat> writeStats = metadata.getWriteStats();
-    handleWriteErrors(writeStats, TableServiceType.COMPACT);
-    InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
-    final HoodieInstant compactionInstant = 
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
     try {
-      this.txnManager.beginStateChange(Option.of(compactionInstant), 
Option.empty());
-      finalizeWrite(table, compactionCommitTime, writeStats);
-      // commit to data table after committing to metadata table.
-      writeToMetadataTable(table, compactionCommitTime, metadata, 
partialMetadataWriteStats);
-      log.info("Committing Compaction {}", compactionCommitTime);
-      CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
-      log.debug("Compaction {} finished with result: {}", 
compactionCommitTime, metadata);
+      this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status and commit compaction: " + config.getTableName());
+      List<HoodieWriteStat> writeStats = metadata.getWriteStats();
+      handleWriteErrors(writeStats, TableServiceType.COMPACT);
+      InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
+      final HoodieInstant compactionInstant = 
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
+      try {
+        this.txnManager.beginStateChange(Option.of(compactionInstant), 
Option.empty());
+        finalizeWrite(table, compactionCommitTime, writeStats);
+        // commit to data table after committing to metadata table.
+        writeToMetadataTable(table, compactionCommitTime, metadata, 
partialMetadataWriteStats);
+        log.info("Committing Compaction {}", compactionCommitTime);
+        CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
+        log.debug("Compaction {} finished with result: {}", 
compactionCommitTime, metadata);
+      } finally {
+        this.txnManager.endStateChange(Option.of(compactionInstant));
+        releaseResources(compactionCommitTime);
+      }
+      WriteMarkersFactory.get(config.getMarkersType(), table, 
compactionCommitTime)
+          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+      if (compactionTimer != null) {
+        long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
+        
TimelineUtils.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant
 ->
+            metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, COMPACTION_ACTION)
+        );
+      }
+      log.info("Compacted successfully on commit {}", compactionCommitTime);
     } finally {
-      this.txnManager.endStateChange(Option.of(compactionInstant));
-      releaseResources(compactionCommitTime);
-    }
-    WriteMarkersFactory.get(config.getMarkersType(), table, 
compactionCommitTime)
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-    if (compactionTimer != null) {
-      long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
-      
TimelineUtils.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant
 ->
-          metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, 
metadata, COMPACTION_ACTION)
-      );
+      this.heartbeatClient.stop(compactionCommitTime);

Review Comment:
   this is the only change in L385 to 412 right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to