kbuci commented on code in PR #10965:
URL: https://github.com/apache/hudi/pull/10965#discussion_r1556157237


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1135,8 +1138,36 @@ protected void 
completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
    */
   protected HoodieWriteMetadata<O> compact(String compactionInstantTime, 
boolean shouldComplete) {
     HoodieTable table = createTable(config, context.getHadoopConf().get());
+    Option<HoodieInstant> instantToCompactOption = 
Option.fromJavaOptional(table.getActiveTimeline()
+        .filterCompletedAndCompactionInstants()
+        .getInstants()
+        .stream()
+        .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime))
+        .findFirst());
+    try {
+      // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+      // compact job will abort if they attempt to execute compact before 
heartbeat expires
+      // Note that as long as all jobs for this table use this API for 
compact, then this alone should prevent
+      // compact rollbacks from running concurrently to compact commits.
+      txnManager.beginTransaction(instantToCompactOption, 
txnManager.getLastCompletedTransactionOwner());

Review Comment:
   > 1.2) if the heartbeat does not expire, just can the execution of this run 
and log a wanning log there.
   Just to clarify, do you mean throwing an exception in this run? Since I'm 
not sure if we can make the current run a no-op if a concurrent heartbeat is 
detected, since 
   a) I'm not sure what HoodieWriteMetadata value to return if we make this a 
no-op
   b) If we don't explicitly throw an exception and fail, then the caller will 
assume compaction happened succesfully or already happened. This wouldn't be a 
correct assumption, since the other concurrent writer (that is currently 
executing this compact plan) may either fail or take a long time to finish.
   
   > if the state if still REQUESTED, we can execute it direcly?
   Ah that's a good point, it might actually be safe for two jobs to execute a 
compact plan at same time as long as neither of them are doing a roll back. 
Despite that though, I don't think it's safe to skip acquiring+starting 
heartbeat even if the compact plan only has .requested , since the following 
(unlikely) scenario can happen still:
   1. Table has a compact plan C.requested created in timeline
   2. Job (A) calls compact on C. It starts a heartbeat of C and then starts 
executing C
   3. Job (B) calls compact on C. Although it sees a heartbeat for C, since C 
has no C.inflight it starts executing C
   4. Job (A) and/or Job (B) create a C.inflight
   5. Job (A) fails. 
   6. Heartbeat that Job (A) created expires
   7. Job (C) calls compact on C. It sees that there is a C.inflight and no 
heartbeat (because Job (B) did not start any heartbeat). Therefore, it starts 
executing C, and rolls back the existing C.inflight
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1135,8 +1138,36 @@ protected void 
completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
    */
   protected HoodieWriteMetadata<O> compact(String compactionInstantTime, 
boolean shouldComplete) {
     HoodieTable table = createTable(config, context.getHadoopConf().get());
+    Option<HoodieInstant> instantToCompactOption = 
Option.fromJavaOptional(table.getActiveTimeline()
+        .filterCompletedAndCompactionInstants()
+        .getInstants()
+        .stream()
+        .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime))
+        .findFirst());
+    try {
+      // Transaction serves to ensure only one compact job for this instant 
will start heartbeat, and any other concurrent
+      // compact job will abort if they attempt to execute compact before 
heartbeat expires
+      // Note that as long as all jobs for this table use this API for 
compact, then this alone should prevent
+      // compact rollbacks from running concurrently to compact commits.
+      txnManager.beginTransaction(instantToCompactOption, 
txnManager.getLastCompletedTransactionOwner());

Review Comment:
   > 1.2) if the heartbeat does not expire, just can the execution of this run 
and log a wanning log there.
   
   Just to clarify, do you mean throwing an exception in this run? Since I'm 
not sure if we can make the current run a no-op if a concurrent heartbeat is 
detected, since 
   a) I'm not sure what HoodieWriteMetadata value to return if we make this a 
no-op
   b) If we don't explicitly throw an exception and fail, then the caller will 
assume compaction happened succesfully or already happened. This wouldn't be a 
correct assumption, since the other concurrent writer (that is currently 
executing this compact plan) may either fail or take a long time to finish.
   
   > if the state if still REQUESTED, we can execute it direcly?
   
   Ah that's a good point, it might actually be safe for two jobs to execute a 
compact plan at same time as long as neither of them are doing a roll back. 
Despite that though, I don't think it's safe to skip acquiring+starting 
heartbeat even if the compact plan only has .requested , since the following 
(unlikely) scenario can happen still:
   1. Table has a compact plan C.requested created in timeline
   2. Job (A) calls compact on C. It starts a heartbeat of C and then starts 
executing C
   3. Job (B) calls compact on C. Although it sees a heartbeat for C, since C 
has no C.inflight it starts executing C
   4. Job (A) and/or Job (B) create a C.inflight
   5. Job (A) fails. 
   6. Heartbeat that Job (A) created expires
   7. Job (C) calls compact on C. It sees that there is a C.inflight and no 
heartbeat (because Job (B) did not start any heartbeat). Therefore, it starts 
executing C, and rolls back the existing C.inflight
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to