nsivabalan commented on a change in pull request #3590:
URL: https://github.com/apache/hudi/pull/3590#discussion_r716249167



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,83 +94,94 @@ protected void initialize(HoodieEngineContext 
engineContext, HoodieTableMetaClie
   @Override
   protected void commit(List<HoodieRecord> records, String partitionName, 
String instantTime) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to 
as it is not enabled");
-    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
 
     try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
-      writeClient.startCommitWithTime(instantTime);
+      if 
(!metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime))
 {
+        // if this is a new commit being applied to metadata for the first time
+        writeClient.startCommitWithTime(instantTime);
+      } else {
+        // this code path refers to a re-attempted commit that got committed 
to metadata, but failed in dataset.
+        // for eg, lets say compaction c1 on 1st attempt succeeded in metadata 
table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete, compaction will be retried again, which 
will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually remove 
the completed instant and proceed.
+        // and it is for the same reason we enabled 
withAllowMultiWriteOnSameInstant for metadata table.
+        HoodieInstant alreadyCompletedInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> 
entry.getTimestamp().equals(instantTime)).lastInstant().get();
+        FSUtils.deleteInstantFile(metaClient.getFs(), 
metaClient.getMetaPath(), alreadyCompletedInstant);
+        metaClient.reloadActiveTimeline();
+      }
       List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, 
instantTime).collect();
       statuses.forEach(writeStatus -> {
         if (writeStatus.hasErrors()) {
           throw new HoodieMetadataException("Failed to commit metadata table 
records at instant " + instantTime);
         }
       });
-      // trigger cleaning, compaction, with suffixes based on the same instant 
time. This ensures that any future
-      // delta commits synced over will not have an instant time lesser than 
the last completed instant on the
-      // metadata table.
-      if (writeClient.scheduleCompactionAtInstant(instantTime + "001", 
Option.empty())) {
-        writeClient.compact(instantTime + "001");
-      }
-      writeClient.clean(instantTime + "002");
+
+      // reload timeline
+      metaClient.reloadActiveTimeline();
+
+      compactIfNecessary(writeClient, instantTime);
+      doClean(writeClient, instantTime);
     }
 
     // Update total size of the metadata and count of base/log files
-    metrics.ifPresent(m -> {
-      try {
-        Map<String, String> stats = m.getStats(false, metaClient, metadata);
-        
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
-            
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
-            
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
-            
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
-      } catch (HoodieIOException e) {
-        LOG.error("Could not publish metadata size metrics", e);
-      }
-    });
+    metrics.ifPresent(m -> m.updateSizeMetrics(metaClient, metadata));
   }
 
   /**
-   * Tag each record with the location.
+   *  Perform a compaction on the Metadata Table.
+   *
+   * Cases to be handled:
+   *   1. We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
+   *      a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
    *
-   * Since we only read the latest base file in a partition, we tag the 
records with the instant time of the latest
-   * base file.
+   *   2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
+   *      deltacommit.
    */
-  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName) {
-    HoodieTable table = HoodieSparkTable.create(metadataWriteConfig, 
engineContext);
-    TableFileSystemView.SliceView fsView = table.getSliceView();
-    List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
-        .map(FileSlice::getBaseFile)
-        .filter(Option::isPresent)
-        .map(Option::get)
-        .collect(Collectors.toList());
-
-    // All the metadata fits within a single base file
-    if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
-      if (baseFiles.size() > 1) {
-        throw new HoodieMetadataException("Multiple base files found in 
metadata partition");
-      }
+  private void compactIfNecessary(SparkRDDWriteClient writeClient, String 
instantTime) {
+    String latestDeltacommitTime = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+        .get().getTimestamp();
+    List<HoodieInstant> pendingInstants = 
datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+        
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
+
+    if (!pendingInstants.isEmpty()) {
+      LOG.info(String.format("Cannot compact metadata table as there are %d 
inflight instants before latest deltacommit %s: %s",
+          pendingInstants.size(), latestDeltacommitTime, 
Arrays.toString(pendingInstants.toArray())));
+      return;
     }
 
-    JavaSparkContext jsc = ((HoodieSparkEngineContext) 
engineContext).getJavaSparkContext();
-    String fileId;
-    String instantTime;
-    if (!baseFiles.isEmpty()) {
-      fileId = baseFiles.get(0).getFileId();
-      instantTime = baseFiles.get(0).getCommitTime();
-    } else {
-      // If there is a log file then we can assume that it has the data
-      List<HoodieLogFile> logFiles = 
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
-          .map(FileSlice::getLatestLogFile)
-          .filter(Option::isPresent)
-          .map(Option::get)
-          .collect(Collectors.toList());
-      if (logFiles.isEmpty()) {
-        // No base and log files. All are new inserts
-        return jsc.parallelize(records, 1);
-      }
-
-      fileId = logFiles.get(0).getFileId();
-      instantTime = logFiles.get(0).getBaseCommitTime();
+    // Trigger compaction with suffixes based on the same instant time. This 
ensures that any future
+    // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
+    // metadata table.
+    final String compactionInstantTime = latestDeltacommitTime + "001";
+    if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {

Review comment:
       we will not have or use data's write client in this layer at all. So, 
prefer to leave it as is. But I have fixed all other namings. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to