This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 40d4f489389 [HUDI-7577] Avoid MDT compaction instant time conflicts (#10992) 40d4f489389 is described below commit 40d4f489389083e3c6d69954361d3de4aec8186a Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Tue Apr 16 09:12:19 2024 +0800 [HUDI-7577] Avoid MDT compaction instant time conflicts (#10992) --- .../metadata/HoodieBackedTableMetadataWriter.java | 6 +++++- .../hudi/client/TestJavaHoodieBackedMetadata.java | 23 +++++++++++----------- .../functional/TestHoodieBackedMetadata.java | 23 +++++++++++----------- .../table/timeline/HoodieInstantTimeGenerator.java | 13 ++++++------ 4 files changed, 36 insertions(+), 29 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index dea317e60b7..a541de03cb3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -53,6 +53,7 @@ import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -1357,7 +1358,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // The compaction planner will manage to filter out the log files that finished with greater completion time. // see BaseHoodieCompactionPlanGenerator.generateCompactionPlan for more details. final String compactionInstantTime = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() - .findInstantsBeforeOrEquals(latestDeltacommitTime).firstInstant().map(HoodieInstant::getTimestamp) + .findInstantsBeforeOrEquals(latestDeltacommitTime).firstInstant() + // minus the pending instant time by 1 millisecond to avoid conflict in case when this pending instant was finally been committed + // as a delta_commit in MDT. + .map(instant -> HoodieInstantTimeGenerator.instantTimeMinusMillis(instant.getTimestamp(), 1L)) .orElse(writeClient.createNewInstantTime(false)); // we need to avoid checking compaction w/ same instant again. diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 35319a6e403..736eee97e85 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -559,34 +559,35 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { .withMaxNumDeltaCommitsBeforeCompaction(4) .build()).build(); initWriteConfigAndMetatableWriter(writeConfig, true); - doWriteOperation(testTable, "0000001", INSERT); - String commitInstant = "0000002"; - doWriteOperation(testTable, commitInstant, INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); - // test multi-writer scenario. lets add 1,2,3,4 where 1,2,4 succeeded, but 3 is still inflight. so latest delta commit in MDT is 4, while 3 is still pending + // test multi-writer scenario. let's add 1,2,3,4 where 1,2,4 succeeded, but 3 is still inflight. so latest delta commit in MDT is 4, while 3 is still pending // in DT and not seen by MDT yet. compaction should not trigger until 3 goes to completion. // create an inflight commit for 3 - HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation("0000003", UPSERT, emptyList(), + String inflightInstant = metaClient.createNewInstantTime(); + HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation(inflightInstant, UPSERT, emptyList(), asList("p1", "p2"), 2, false, true); - doWriteOperation(testTable, "0000004"); + doWriteOperation(testTable, metaClient.createNewInstantTime()); HoodieTableMetadata tableMetadata = metadata(writeConfig, context); // verify that compaction of metadata table does not kick in. assertFalse(tableMetadata.getLatestCompactionTime().isPresent()); - doWriteOperation(testTable, "0000005", INSERT); - doWriteOperation(testTable, "0000006", INSERT); - doWriteOperation(testTable, "0000007", INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); tableMetadata = metadata(writeConfig, context); // verify that compaction of metadata table should kick in. assertTrue(tableMetadata.getLatestCompactionTime().isPresent(), "Compaction of metadata table does not kick in"); + assertEquals(HoodieInstantTimeGenerator.instantTimeMinusMillis(inflightInstant, 1L), tableMetadata.getLatestCompactionTime().get()); // move inflight to completed - testTable.moveInflightCommitToComplete("0000003", inflightCommitMeta); + testTable.moveInflightCommitToComplete(inflightInstant, inflightCommitMeta); // we have to add another commit for compaction to trigger. if not, latest delta commit in MDT is 7, but the new incoming i.e 3 is still inflight in DT while "3" // is getting applied to MDT. - doWriteOperation(testTable, "0000008", INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); // verify compaction kicked in now tableMetadata = metadata(writeConfig, context); assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 5c5d3f73267..951f4c9277d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -872,34 +872,35 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { .withMaxNumDeltaCommitsBeforeCompaction(4) .build()).build(); initWriteConfigAndMetatableWriter(writeConfig, true); - doWriteOperation(testTable, "0000001", INSERT); - String commitInstant = "0000002"; - doWriteOperation(testTable, commitInstant, INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); - // test multi-writer scenario. lets add 1,2,3,4 where 1,2,4 succeeded, but 3 is still inflight. so latest delta commit in MDT is 4, while 3 is still pending + // test multi-writer scenario. let's add 1,2,3,4 where 1,2,4 succeeded, but 3 is still inflight. so latest delta commit in MDT is 4, while 3 is still pending // in DT and not seen by MDT yet. compaction should not trigger until 3 goes to completion. // create an inflight commit for 3 - HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation("0000003", UPSERT, emptyList(), + String inflightInstant = metaClient.createNewInstantTime(); + HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation(inflightInstant, UPSERT, emptyList(), asList("p1", "p2"), 2, false, true); - doWriteOperation(testTable, "0000004"); + doWriteOperation(testTable, metaClient.createNewInstantTime()); HoodieTableMetadata tableMetadata = metadata(writeConfig, context); // verify that compaction of metadata table does not kick in. assertFalse(tableMetadata.getLatestCompactionTime().isPresent()); - doWriteOperation(testTable, "0000005", INSERT); - doWriteOperation(testTable, "0000006", INSERT); - doWriteOperation(testTable, "0000007", INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); tableMetadata = metadata(writeConfig, context); // verify that compaction of metadata table should kick in. assertTrue(tableMetadata.getLatestCompactionTime().isPresent(), "Compaction of metadata table should kick in"); + assertEquals(HoodieInstantTimeGenerator.instantTimeMinusMillis(inflightInstant, 1L), tableMetadata.getLatestCompactionTime().get()); // move inflight to completed - testTable.moveInflightCommitToComplete("0000003", inflightCommitMeta); + testTable.moveInflightCommitToComplete(inflightInstant, inflightCommitMeta); // we have to add another commit for compaction to trigger. if not, latest delta commit in MDT is 7, but the new incoming i.e 3 is still inflight in DT while "3" // is getting applied to MDT. - doWriteOperation(testTable, "0000008", INSERT); + doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT); // verify compaction kicked in now tableMetadata = metadata(writeConfig, context); assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java index efa9c6f120a..80354195b72 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -106,17 +106,18 @@ public class HoodieInstantTimeGenerator { } public static String instantTimeMinusMillis(String timestamp, long milliseconds) { + final String timestampInMillis = fixInstantTimeCompatibility(timestamp); try { - String timestampInMillis = fixInstantTimeCompatibility(timestamp); - // To work with tests, that generate arbitrary timestamps, we need to pad the timestamp with 0s. - if (timestampInMillis.length() < MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) { - return String.format("%0" + timestampInMillis.length() + "d", 0); - } LocalDateTime dt = LocalDateTime.parse(timestampInMillis, MILLIS_INSTANT_TIME_FORMATTER); ZoneId zoneId = HoodieTimelineTimeZone.UTC.equals(commitTimeZone) ? ZoneId.of("UTC") : ZoneId.systemDefault(); return MILLIS_INSTANT_TIME_FORMATTER.format(dt.atZone(zoneId).toInstant().minusMillis(milliseconds).atZone(zoneId).toLocalDateTime()); } catch (DateTimeParseException e) { - throw new HoodieException(e); + // To work with tests, that generate arbitrary timestamps, we need to pad the timestamp with 0s. + if (isValidInstantTime(timestampInMillis)) { + return String.format("%0" + timestampInMillis.length() + "d", Long.parseLong(timestampInMillis) - milliseconds); + } else { + throw new HoodieException(e); + } } }