prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194268603


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient 
writeClient, String instan
     // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
     // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
     // metadata table.
-    writeClient.clean(instantTime + "002");
+    
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
     writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
-   * This is invoked to initialize metadata table for a dataset.
-   * Initial commit has special handling mechanism due to its scale compared 
to other regular commits.
-   * During cold startup, the list of files to be committed can be huge.
-   * So creating a HoodieCommitMetadata out of these large number of files,
-   * and calling the existing update(HoodieCommitMetadata) function does not 
scale well.
-   * Hence, we have a special commit just for the initialization scenario.
+   * Validates the timeline for both main and metadata tables.
    */
-  private void initialCommit(String createInstantTime, 
List<MetadataPartitionType> partitionTypes) {
-    // List all partitions in the basePath of the containing dataset
-    LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-    engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing 
metadata table by listing files and partitions: " + 
dataWriteConfig.getTableName());
-
-    Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap 
= new HashMap<>();
-
-    // skip file system listing to populate metadata records if it's a fresh 
table.
-    // this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
-    if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
-      // If not, last completed commit in data table will be chosen as the 
initial commit time.
-      LOG.info("Triggering empty Commit to metadata to initialize");
-    } else {
-      List<DirectoryInfo> partitionInfoList = 
listAllPartitions(dataMetaClient);
-      Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
-          .map(p -> {
-            String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-            return Pair.of(partitionName, p.getFileNameToSizeMap());
-          })
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-      int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-      List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-      if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-        // Record which saves the list of all partitions
-        HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-        HoodieData<HoodieRecord> filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-        ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-        partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-      }
-
-      if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
-        final HoodieData<HoodieRecord> recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
-        partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
-      }
-      LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
+  private boolean validateTimelineBeforeSchedulingCompaction(Option<String> 
inFlightInstantTimestamp, String latestDeltacommitTime) {
+    // There should not be any incomplete instants on MDT
+    HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.reloadActiveTimeline();
+    List<HoodieInstant> pendingInstantsOnMetadataTable = 
metadataTimeline.filterInflightsAndRequested().getInstants();
+    if (!pendingInstantsOnMetadataTable.isEmpty()) {
+      LOG.info(String.format(
+              "Cannot compact MDT as there are %d inflight instants: %s",
+              pendingInstantsOnMetadataTable.size(), 
Arrays.toString(pendingInstantsOnMetadataTable.toArray())));
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.SKIP_TABLE_SERVICES, 1));
+      return false;
     }
 
-    commit(createInstantTime, partitionToRecordsMap, false);
-  }
+    // There should not be any incomplete instants on dataset
+    HoodieActiveTimeline datasetTimeline = 
dataMetaClient.reloadActiveTimeline();
+    List<HoodieInstant> pendingInstantsOnDataset = 
datasetTimeline.filterInflightsAndRequested().getInstantsAsStream()
+            .filter(i -> !inFlightInstantTimestamp.isPresent() || 
!i.getTimestamp().equals(inFlightInstantTimestamp.get()))
+            .collect(Collectors.toList());
+    if (!pendingInstantsOnDataset.isEmpty()) {
+      LOG.info(String.format(
+              "Cannot compact MDT as there are %d inflight instants on dataset 
before latest deltacommit %s: %s",
+              pendingInstantsOnDataset.size(), latestDeltacommitTime, 
Arrays.toString(pendingInstantsOnDataset.toArray())));
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.SKIP_TABLE_SERVICES, 1));
+      return false;
+    }
 
-  private HoodieData<HoodieRecord> getFilesPartitionRecords(String 
createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord 
allPartitionRecord) {
-    HoodieData<HoodieRecord> filesPartitionRecords = 
engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
-    if (partitionInfoList.isEmpty()) {
-      return filesPartitionRecords;
+    // Check if the inflight commit is greater than all the completed commits.
+    Option<HoodieInstant> lastCompletedInstant = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+    if (!lastCompletedInstant.isPresent()) {
+      LOG.info("Last completed commit is not present.");
+      return false;
+    }
+    if 
(HoodieTimeline.compareTimestamps(lastCompletedInstant.get().getTimestamp(),
+            HoodieTimeline.GREATER_THAN, inFlightInstantTimestamp.get())) {
+      // Completed commits validation failed.
+      LOG.info(String.format(
+              "Cannot compact MDT as there is %s that is greater than inflight 
instant: %s",
+              lastCompletedInstant.get(), inFlightInstantTimestamp.get()));
+      return false;
     }
 
-    HoodieData<HoodieRecord> fileListRecords = 
engineContext.parallelize(partitionInfoList, 
partitionInfoList.size()).map(partitionInfo -> {
-      Map<String, Long> fileNameToSizeMap = 
partitionInfo.getFileNameToSizeMap();
-      // filter for files that are part of the completed commits
-      Map<String, Long> validFileNameToSizeMap = 
fileNameToSizeMap.entrySet().stream().filter(fileSizePair -> {
-        String commitTime = FSUtils.getCommitTime(fileSizePair.getKey());
-        return HoodieTimeline.compareTimestamps(commitTime, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, createInstantTime);
-      }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    String latestDeltaCommitTimeInMetadataTable = 
metadataMetaClient.reloadActiveTimeline()

Review Comment:
   Removed the reload as the latestDeltaCommitTimeInMetadataTable argument is 
created after reloading in the caller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to