bvaradar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r489225350



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteResult.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Result of a write operation.
+ */
+public class HoodieWriteResult implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final long RANDOM_SEED = 9038412832L;

Review comment:
       Is this needed ?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -370,4 +430,17 @@ private IndexedRecord convertToAvroRecord(HoodieTimeline 
commitTimeline, HoodieI
     
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY,
 "");
     return avroMetaData;
   }
+
+  public static org.apache.hudi.avro.model.HoodieReplaceCommitMetadata 
convertReplaceCommitMetadata(

Review comment:
       Move this ReplaceHelper class ?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {
+      fg.getAllRawFileSlices().forEach(fileSlice -> {
+        fileSlice.getBaseFile().map(baseFile -> 
deletePath(baseFile.getFileStatus().getPath(), instant));
+        fileSlice.getLogFiles().forEach(logFile -> 
deletePath(logFile.getPath(), instant));
+      });
+    });
+  }
+
+  /**
+   * Because we are creating new 'HoodieTable' and FileSystemView objects in 
this class constructor,
+   * partition view may not be loaded correctly.
+   * Reload all partitions modified by REPLACE action
+   *
+   * TODO find a better way to pass the FileSystemView to this class.
+   */
+  private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant, 
TableFileSystemView fileSystemView) {
+    Option<HoodieInstant> replaceInstantOption = 
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+        .filter(replaceInstant -> 
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+    replaceInstantOption.ifPresent(replaceInstant -> {
+      try {
+        HoodieReplaceCommitMetadata metadata = 
HoodieReplaceCommitMetadata.fromBytes(
+            
metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+            HoodieReplaceCommitMetadata.class);
+
+        metadata.getPartitionToReplaceStats().keySet().forEach(partition -> 
fileSystemView.getAllFileGroups(partition));
+      } catch (IOException e) {
+        throw new HoodieCommitException("Failed to archive because cannot 
delete replace files", e);
+      }
+    });
+  }
+
+  private boolean deletePath(Path path, HoodieInstant instant) {
+    try {
+      LOG.info("Deleting " + path + " before archiving " + instant);
+      metaClient.getFs().delete(path);

Review comment:
       This could become a performance issue when we are deleting lot of 
replaced files. HoodieTimelineArchiveLog.archive() method is taking 
JavaSparkContext. right ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, 
HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant 
instant) throws IOException {

Review comment:
       @satishkotha : We need to handle case when we restore a .replace instant 
for incremental timeline file system view. 
   
   About the implementation -
   In addRestoreInstant(), we need to look at 
HoodieRestoreMetadata.instantsToRollback and for each instants which are 
.replace types, we need to remove the replace file-group mapping kept in the 
file-system view.  We would need a reverse mapping of instant to file-group-id 
and also a way to identify which of the entries in 
HoodieRestoreMetadata.instantsToRollback is replace metadata. Currently, we 
only store commit timestamps in HoodieRestoreMetadata.instantsToRollback. 
   I think it would be useful if we add an additional field in 
HoodieRestoreCommitMetadata and HoodieRollbackCommitMetadata to store both the 
timestamp and commit-action-type and use it here.
   
   Since, we only read committed replace actions, rollback is fine though.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+        .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+    fileGroupsToDelete.forEach(fg -> {

Review comment:
       +1

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * All the metadata that gets stored along with a commit.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieReplaceCommitMetadata.class);
+  protected Map<String, List<String>> partitionToReplaceFileIds;
+
+  // for ser/deser
+  public HoodieReplaceCommitMetadata() {
+    this(false);
+  }
+
+  public HoodieReplaceCommitMetadata(boolean compacted) {
+    super(compacted);
+    partitionToReplaceFileIds = new HashMap<>();
+  }
+
+  public void setPartitionToReplaceFileIds(Map<String, List<String>> 
partitionToReplaceFileIds) {
+    this.partitionToReplaceFileIds = partitionToReplaceFileIds;
+  }
+
+  public void addReplaceFileId(String partitionPath, String fileId) {
+    if (!partitionToReplaceFileIds.containsKey(partitionPath)) {
+      partitionToReplaceFileIds.put(partitionPath, new ArrayList<>());
+    }
+    partitionToReplaceFileIds.get(partitionPath).add(fileId);
+  }
+
+  public List<String> getReplaceFileIds(String partitionPath) {
+    return partitionToReplaceFileIds.get(partitionPath);
+  }
+
+  public Map<String, List<String>> getPartitionToReplaceFileIds() {
+    return partitionToReplaceFileIds;
+  }
+
+  public String toJsonString() throws IOException {

Review comment:
       nit: Can you add @Override annotation ?
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -251,6 +262,28 @@ private void addRollbackInstant(HoodieTimeline timeline, 
HoodieInstant instant)
     LOG.info("Done Syncing rollback instant (" + instant + ")");
   }
 
+  /**
+   * Add newly found REPLACE instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param instant REPLACE Instant
+   */
+  private void addReplaceInstant(HoodieTimeline timeline, HoodieInstant 
instant) throws IOException {

Review comment:
       Also, can you also add test cases for incremental file-system view for 
both addReplaceInstant and removeReplaceInstant in TestIncrementalFSViewSync ?

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext 
jsc, HoodieInstant in
     }
   }
 
+  private void deleteReplacedFiles(HoodieInstant instant) {
+    if (!instant.isCompleted()) {
+      // only delete files for completed instants
+      return;
+    }
+
+    TableFileSystemView fileSystemView = this.table.getFileSystemView();
+    ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+    Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView

Review comment:
       Only commit instants older than oldest pending compaction is allowed to 
be archived. But, if we encode the entire file-group in the replace metadata, 
we will have race conditions with pending compactions. So, I guess it is safer 
to figure out the file-group during the time of archiving when it is guaranteed 
pending compaction is done.
   
   Regarding the requirement for ensureReplacedPartitionsLoadedCorrectly, If 
you look at pending compaction handling in filesystem-view, pending compactions 
are eagerly loaded whenever we construct the filesystem view. This seems to be 
the case also for replace metadata. Then, why do we need to trigger loading 
from outside ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to