danny0405 commented on code in PR #8088:
URL: https://github.com/apache/hudi/pull/8088#discussion_r1132010791


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1029,23 +1029,63 @@ protected HoodieData<HoodieRecord> 
prepRecords(Map<MetadataPartitionType,
   /**
    *  Perform a compaction on the Metadata Table.
    *
-   * Cases to be handled:
-   *   1. We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
-   *      a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
-   *
-   *   2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
-   *      deltacommit.
+   * <p>Cases to be handled:
+   * <ol>
+   *   <li>We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
+   *   a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx;</li>
+   *   <li>In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
+   *   deltacommit.</li>
+   * </ol>
    */
   protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String 
instantTime) {
     // finish off any pending compactions if any from previous attempt.
     writeClient.runAnyPendingCompactions();
 
-    String latestDeltaCommitTimeInMetadataTable = 
metadataMetaClient.reloadActiveTimeline()
+    HoodieTimeline metadataCompletedDeltaCommitTimeline = 
metadataMetaClient.reloadActiveTimeline()
         .getDeltaCommitTimeline()
-        .filterCompletedInstants()
+        .filterCompletedInstants();
+    String latestDeltaCommitTimeInMetadataTable = 
metadataCompletedDeltaCommitTimeline
         .lastInstant().orElseThrow(() -> new HoodieMetadataException("No 
completed deltacommit in metadata table"))
         .getTimestamp();
-    List<HoodieInstant> pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+    Set<String> metadataCompletedDeltaCommits = 
metadataCompletedDeltaCommitTimeline.getInstantsAsStream()
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toSet());
+    // pending compactions in DT should not block the compaction of MDT.
+    // a pending compaction on the DT(for MOR table, this is a common case)
+    // could cause the MDT compaction not been triggered in time,
+    // the slow compaction progress of MDT can further affect the timeline 
archiving of DT,
+    // which would result in both timelines from DT and MDT can not be 
archived timely,
+    // that is how the small file issues from both the DT and MDT timelines 
emerge.
+
+    // why we could filter out the compaction commit that has not been 
committed into the MDT?
+
+    // there are 2 preconditions that need to address first:
+    // 1. only the write commits (commit, delta_commit, replace_commit) can 
trigger the MDT compaction;
+    // 2. the MDT is always committed before the DT.
+
+    // there are 3 cases we want to analyze for a compaction instant from DT:
+    // 1. both the DT and MDT does not commit the instant;
+    //    1.1 the compaction in DT is normal, it just lags long time to finish;
+    //    1.2 some error happens to the compaction procedure.
+    // 2. the MDT committed the compaction instant, while the DT hadn't;
+    //    2.1 the job crashed suddenly while the compactor tries to commit to 
the DT right after the MDT has been committed;
+    //    2.2 the job has been canceled manually right after the MDT has been 
committed.
+    // 3. both the DT and MDT commit the instant.
+
+    // the 3rd case should be okay, now let's analyze the first 2 cases:
+    //
+    // the 1st case: if the instant has not been committed yet, the compaction 
of MDT would just ignore the instant,
+    // so the pending instant can not be compacted into the HFile, the instant 
should also not be archived by both of the DT and the MDT(that is how the 
archival mechanism works),
+    // the log reader of MDT would ignore the instant correctly, the result 
view should work!
+
+    // the 2nd case: we can not trigger compact, because once the MDT 
triggers, the MDT archiver can then archive the instant, but this instant has 
not been committed in the DT,
+    // the MDT reader can not filter out the instant correctly, another reason 
is once the instant is compacted into HFile, the subsequent rollback from DT 
may try to look up
+    // the files to be rolled back, an exception could throw(although the 
default behavior is not to throws).
+

Review Comment:
   Let me explain the procedure a little more with a demo:
   
   ```java
   delta_c1 (F3, F4) (MDT)
   delta_c1 (F1, F2) (DT)
   
   c2.inflight (compaction triggers in DT)
   
   delta_c3 (F7, F8) (MDT)
   delta_c3 (F5, F6) (DT)
   
   c2 (F7, F8) (compaction complete in MDT)
   c2 failes to commit to DT
   
   delta_c4 (F9, F10) (MDT)
   -- can we trigger MDT compaction here? The answer is yes
   1. c2 in DT would block the archiving of C2 in MDT
   2. the MDT reader would ignore the C2 too because it is filtered by the c2 
on DT timeline, so the compaction does not include c2
   delta_c4 (F11, F12) (DT)
   
   r5 (to rollback c2) (MDT)
   -F7, -F8
   r5 (to rollback c2) (DT)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to