kbuci commented on code in PR #10965: URL: https://github.com/apache/hudi/pull/10965#discussion_r1556395911
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ########## @@ -1135,8 +1138,34 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable */ protected HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete) { HoodieTable table = createTable(config, context.getHadoopConf().get()); + Option<HoodieInstant> instantToCompactOption = Option.fromJavaOptional(table.getActiveTimeline() + .filterCompletedAndCompactionInstants() + .getInstants() + .stream() + .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime)) + .findFirst()); + try { + // Transaction serves to ensure only one compact job for this instant will start heartbeat, and any other concurrent + // compact job will abort if they attempt to execute compact before heartbeat expires + // Note that as long as all jobs for this table use this API for compact, then this alone should prevent + // compact rollbacks from running concurrently to compact commits. + txnManager.beginTransaction(instantToCompactOption, txnManager.getLastCompletedTransactionOwner()); + try { + if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) { + throw new HoodieLockException("Cannot compact instant " + compactionInstantTime + " due to heartbeat by existing job"); + } + } catch (IOException e) { + throw new HoodieHeartbeatException("Error accessing heartbeat of instant to compact " + compactionInstantTime, e); + } + this.heartbeatClient.start(compactionInstantTime); + } finally { + txnManager.endTransaction(txnManager.getCurrentTransactionOwner()); + } preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); - return tableServiceClient.compact(compactionInstantTime, shouldComplete); + HoodieWriteMetadata compactMetadata = tableServiceClient.compact(compactionInstantTime, shouldComplete); + this.heartbeatClient.stop(compactionInstantTime, true); Review Comment: I was looking into a UT failure in `org.apache.hudi.table.functional.TestHoodieSparkMergeOnReadTableInsertUpdateDelete#testRepeatedRollbackOfCompaction` where two compact executions of the same instant time are called back to back (my understanding is that this is supposed to verify that the second compact does a no-op and succeeds upon seeing that plan is already committed). I realized that with this change, the second compact call was failing due to calling `isHeartbeatExpired` and seeing an active heartbeat (from the first attempt) still running, despite the fact that here we are stopping the heartbeat after a successfully completing the compact. The reason that `isHeartbeatExpired` was unexpectedly `false` here is that 1. `isHeartbeatExpired` will return false if instant time is too recent, even if the heartbeat has been stopped (in the in-memory mapping) 2. When `org.apache.hudi.client.heartbeat.HoodieHeartbeatClient#stop(java.lang.String)` is called (by the first compact call in UT) the heartbeat file is deleted and the heartbeat in in-memory mapping is stopped (as expected). But this means that the heartbeat cannot be started again (even if (1) is resolved), since heartbeat API doesn't allow caller to start a heartbeat that is present in in-memory mapping and has heartbeatStopped flag set to true. In order to get around this issue, I added another API in heartbeat API similar to stop, except that it removes the desired heartbeat from the in-memory mapping (forcing any future compact call in the same job to re-read the heartbeat files from DFS and create a new heartbeat in the in-memory mapping ). Though not sure if there might be a better approach here. I assume this existing functionality isn't a bug, as it makes sense for commits that cannot be repeatedly re-executed (like ingestion COMMITs), and I assume the issue here stems from the fact that for compact we need to potentially repeatedly restart stopped heartbeats -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org