This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 40d4f489389 [HUDI-7577] Avoid MDT compaction instant time conflicts 
(#10992)
40d4f489389 is described below

commit 40d4f489389083e3c6d69954361d3de4aec8186a
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Tue Apr 16 09:12:19 2024 +0800

    [HUDI-7577] Avoid MDT compaction instant time conflicts (#10992)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  6 +++++-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  | 23 +++++++++++-----------
 .../functional/TestHoodieBackedMetadata.java       | 23 +++++++++++-----------
 .../table/timeline/HoodieInstantTimeGenerator.java | 13 ++++++------
 4 files changed, 36 insertions(+), 29 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index dea317e60b7..a541de03cb3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -53,6 +53,7 @@ import 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -1357,7 +1358,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     // The compaction planner will manage to filter out the log files that 
finished with greater completion time.
     // see BaseHoodieCompactionPlanGenerator.generateCompactionPlan for more 
details.
     final String compactionInstantTime = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
-        
.findInstantsBeforeOrEquals(latestDeltacommitTime).firstInstant().map(HoodieInstant::getTimestamp)
+        .findInstantsBeforeOrEquals(latestDeltacommitTime).firstInstant()
+        // minus the pending instant time by 1 millisecond to avoid conflict 
in case when this pending instant was finally been committed
+        // as a delta_commit in MDT.
+        .map(instant -> 
HoodieInstantTimeGenerator.instantTimeMinusMillis(instant.getTimestamp(), 1L))
         .orElse(writeClient.createNewInstantTime(false));
 
     // we need to avoid checking compaction w/ same instant again.
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 35319a6e403..736eee97e85 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -559,34 +559,35 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
             .withMaxNumDeltaCommitsBeforeCompaction(4)
             .build()).build();
     initWriteConfigAndMetatableWriter(writeConfig, true);
-    doWriteOperation(testTable, "0000001", INSERT);
-    String commitInstant = "0000002";
-    doWriteOperation(testTable, commitInstant, INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
 
-    // test multi-writer scenario. lets add 1,2,3,4 where 1,2,4 succeeded, but 
3 is still inflight. so latest delta commit in MDT is 4, while 3 is still 
pending
+    // test multi-writer scenario. let's add 1,2,3,4 where 1,2,4 succeeded, 
but 3 is still inflight. so latest delta commit in MDT is 4, while 3 is still 
pending
     // in DT and not seen by MDT yet. compaction should not trigger until 3 
goes to completion.
 
     // create an inflight commit for 3
-    HoodieCommitMetadata inflightCommitMeta = 
testTable.doWriteOperation("0000003", UPSERT, emptyList(),
+    String inflightInstant = metaClient.createNewInstantTime();
+    HoodieCommitMetadata inflightCommitMeta = 
testTable.doWriteOperation(inflightInstant, UPSERT, emptyList(),
         asList("p1", "p2"), 2, false, true);
-    doWriteOperation(testTable, "0000004");
+    doWriteOperation(testTable, metaClient.createNewInstantTime());
     HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
     // verify that compaction of metadata table does not kick in.
     assertFalse(tableMetadata.getLatestCompactionTime().isPresent());
-    doWriteOperation(testTable, "0000005", INSERT);
-    doWriteOperation(testTable, "0000006", INSERT);
-    doWriteOperation(testTable, "0000007", INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
 
     tableMetadata = metadata(writeConfig, context);
     // verify that compaction of metadata table should kick in.
     assertTrue(tableMetadata.getLatestCompactionTime().isPresent(), 
"Compaction of metadata table does not kick in");
+    
assertEquals(HoodieInstantTimeGenerator.instantTimeMinusMillis(inflightInstant, 
1L), tableMetadata.getLatestCompactionTime().get());
 
     // move inflight to completed
-    testTable.moveInflightCommitToComplete("0000003", inflightCommitMeta);
+    testTable.moveInflightCommitToComplete(inflightInstant, 
inflightCommitMeta);
 
     // we have to add another commit for compaction to trigger. if not, latest 
delta commit in MDT is 7, but the new incoming i.e 3 is still inflight in DT 
while "3"
     // is getting applied to MDT.
-    doWriteOperation(testTable, "0000008", INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
     // verify compaction kicked in now
     tableMetadata = metadata(writeConfig, context);
     assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 5c5d3f73267..951f4c9277d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -872,34 +872,35 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
             .withMaxNumDeltaCommitsBeforeCompaction(4)
             .build()).build();
     initWriteConfigAndMetatableWriter(writeConfig, true);
-    doWriteOperation(testTable, "0000001", INSERT);
-    String commitInstant = "0000002";
-    doWriteOperation(testTable, commitInstant, INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
 
-    // test multi-writer scenario. lets add 1,2,3,4 where 1,2,4 succeeded, but 
3 is still inflight. so latest delta commit in MDT is 4, while 3 is still 
pending
+    // test multi-writer scenario. let's add 1,2,3,4 where 1,2,4 succeeded, 
but 3 is still inflight. so latest delta commit in MDT is 4, while 3 is still 
pending
     // in DT and not seen by MDT yet. compaction should not trigger until 3 
goes to completion.
 
     // create an inflight commit for 3
-    HoodieCommitMetadata inflightCommitMeta = 
testTable.doWriteOperation("0000003", UPSERT, emptyList(),
+    String inflightInstant = metaClient.createNewInstantTime();
+    HoodieCommitMetadata inflightCommitMeta = 
testTable.doWriteOperation(inflightInstant, UPSERT, emptyList(),
         asList("p1", "p2"), 2, false, true);
-    doWriteOperation(testTable, "0000004");
+    doWriteOperation(testTable, metaClient.createNewInstantTime());
     HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
     // verify that compaction of metadata table does not kick in.
     assertFalse(tableMetadata.getLatestCompactionTime().isPresent());
-    doWriteOperation(testTable, "0000005", INSERT);
-    doWriteOperation(testTable, "0000006", INSERT);
-    doWriteOperation(testTable, "0000007", INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
 
     tableMetadata = metadata(writeConfig, context);
     // verify that compaction of metadata table should kick in.
     assertTrue(tableMetadata.getLatestCompactionTime().isPresent(), 
"Compaction of metadata table should kick in");
+    
assertEquals(HoodieInstantTimeGenerator.instantTimeMinusMillis(inflightInstant, 
1L), tableMetadata.getLatestCompactionTime().get());
 
     // move inflight to completed
-    testTable.moveInflightCommitToComplete("0000003", inflightCommitMeta);
+    testTable.moveInflightCommitToComplete(inflightInstant, 
inflightCommitMeta);
 
     // we have to add another commit for compaction to trigger. if not, latest 
delta commit in MDT is 7, but the new incoming i.e 3 is still inflight in DT 
while "3"
     // is getting applied to MDT.
-    doWriteOperation(testTable, "0000008", INSERT);
+    doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
     // verify compaction kicked in now
     tableMetadata = metadata(writeConfig, context);
     assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
index efa9c6f120a..80354195b72 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java
@@ -106,17 +106,18 @@ public class HoodieInstantTimeGenerator {
   }
 
   public static String instantTimeMinusMillis(String timestamp, long 
milliseconds) {
+    final String timestampInMillis = fixInstantTimeCompatibility(timestamp);
     try {
-      String timestampInMillis = fixInstantTimeCompatibility(timestamp);
-      // To work with tests, that generate arbitrary timestamps, we need to 
pad the timestamp with 0s.
-      if (timestampInMillis.length() < MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) 
{
-        return String.format("%0" + timestampInMillis.length() + "d", 0);
-      }
       LocalDateTime dt = LocalDateTime.parse(timestampInMillis, 
MILLIS_INSTANT_TIME_FORMATTER);
       ZoneId zoneId = HoodieTimelineTimeZone.UTC.equals(commitTimeZone) ? 
ZoneId.of("UTC") : ZoneId.systemDefault();
       return 
MILLIS_INSTANT_TIME_FORMATTER.format(dt.atZone(zoneId).toInstant().minusMillis(milliseconds).atZone(zoneId).toLocalDateTime());
     } catch (DateTimeParseException e) {
-      throw new HoodieException(e);
+      // To work with tests, that generate arbitrary timestamps, we need to 
pad the timestamp with 0s.
+      if (isValidInstantTime(timestampInMillis)) {
+        return String.format("%0" + timestampInMillis.length() + "d", 
Long.parseLong(timestampInMillis) - milliseconds);
+      } else {
+        throw new HoodieException(e);
+      }
     }
   }
 

Reply via email to