danny0405 commented on code in PR #9035:
URL: https://github.com/apache/hudi/pull/9035#discussion_r1238326624


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -901,6 +901,9 @@ private void startCommit(String instantTime, String 
actionType, HoodieTableMetaC
       metaClient.getActiveTimeline().createNewInstant(new 
HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
               instantTime));
     }
+
+    // populate marker directory for the commit.
+    WriteMarkersFactory.get(config.getMarkersType(), createTable(config, 
hadoopConf), instantTime).createMarkerDir();
   }

Review Comment:
   We can refactor the api to
   
   ```java
   public static WriteMarkers get(MarkerType markerType, HoodieTableMetaClient 
metaClient, String instantTime)
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -512,6 +512,7 @@ public List<WriteStatus> close() {
         status.getStat().setFileSizeInBytes(logFileSize);
       }
 
+      createCompletedMarkerFile(partitionPath, baseInstantTime);

Review Comment:
   Can we create the file only if necessary, when `enforceFinalizeWriteCheck()` 
or `enforceCompletionMarkerCheck()` is true.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    *
    * @param partitionPath Partition path
    */
-  protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType(), config, fileId, 
hoodieTable.getMetaClient().getActiveTimeline());
+  protected void createInProgressMarkerFile(String partitionPath, String 
dataFileName, String markerInstantTime) {
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    if (!writeMarkers.doesMarkerDirExist()) {
+      throw new HoodieIOException(String.format("Marker root directory absent 
: %s/%s (%s)",
+          partitionPath, dataFileName, markerInstantTime));
+    }
+    if (config.enforceFinalizeWriteCheck()
+        && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("", 
"FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) {
+      throw new HoodieCorruptedDataException("Reconciliation for instant " + 
instantTime + " is completed, job is trying to re-write the data files.");

Review Comment:
   Not sure what purpose of the check, seems the file with `FINALIZE_WRITE` was 
never created in the code.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -135,18 +155,42 @@ private String translateMarkerToDataPath(String 
markerPath) {
     return stripMarkerSuffix(rPath);
   }
 
+  public static String stripMarkerSuffix(String path) {
+    return path.substring(0, 
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN));
+  }
+
+  public static String stripOldStyleMarkerSuffix(String path) {
+    // marker file was created by older version of Hudi, with 
INPROGRESS_MARKER_EXTN (f1_w1_c1.marker).
+    // Rename to data file by replacing .marker with .parquet.
+    return String.format("%s%s", path.substring(0, 
path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)),
+        HoodieFileFormat.PARQUET.getFileExtension());
+  }
+
   @Override
   public Set<String> allMarkerFilePaths() throws IOException {
     Set<String> markerFiles = new HashSet<>();
     if (doesMarkerDirExist()) {
       FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
-        
markerFiles.add(MarkerUtils.stripMarkerFolderPrefix(fileStatus.getPath().toString(),
 basePath, instantTime));
+        // Only the inprogres markerFiles are to be included here
+        if 
(fileStatus.getPath().toString().contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN))
 {

Review Comment:
   why only include in-progress marker files.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -86,6 +89,17 @@ public TimelineServerBasedWriteMarkers(HoodieTable table, 
String instantTime) {
     this.timeoutSecs = timeoutSecs;
   }
 
+  @Override
+  protected Path getMarkerPath(String partitionPath, String dataFileName, 
IOType type) {
+    return new Path(partitionPath, getMarkerFileName(dataFileName, type));
+  }

Review Comment:
   Why override the partition path as relative?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -612,6 +612,20 @@ public class HoodieWriteConfig extends HoodieConfig {
       .sinceVersion("0.10.0")
       .withDocumentation("File Id Prefix provider class, that implements 
`org.apache.hudi.fileid.FileIdPrefixProvider`");
 
+  public static final ConfigProperty<String> ENFORCE_COMPLETION_MARKER_CHECKS 
= ConfigProperty
+      .key("hoodie.markers.enforce.completion.checks")
+      .defaultValue("false")
+      .sinceVersion("0.10.0")
+      .withDocumentation("Prevents the creation of duplicate data files, when 
multiple spark tasks are racing to "
+          + "create data files and a completed data file is already present");
+
+  public static final ConfigProperty<String> ENFORCE_FINALIZE_WRITE_CHECK = 
ConfigProperty
+      .key("hoodie.markers.enforce.finalize.write.check")
+      .defaultValue("false")
+      .sinceVersion("0.10.0")

Review Comment:
   FIx the sinceVersion



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -441,6 +441,9 @@ public List<WriteStatus> close() {
 
       performMergeDataValidationCheck(writeStatus);
 
+      // createCompleteMarkerFile throws hoodieException, if marker directory 
is not present.
+      createCompletedMarkerFile(partitionPath, this.instantTime);

Review Comment:
   Ditto: create when necessary.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -119,7 +139,7 @@ public Set<String> 
createdAndMergedDataPaths(HoodieEngineContext context, int pa
         while (itr.hasNext()) {
           FileStatus status = itr.next();
           String pathStr = status.getPath().toString();
-          if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && 
!pathStr.endsWith(IOType.APPEND.name())) {
+          if (pathStr.contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN) 
&& !pathStr.endsWith(IOType.APPEND.name())) {
             result.add(translateMarkerToDataPath(pathStr));

Review Comment:
   Why we ignore the handling of completion marker files.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -217,6 +218,8 @@ public List<WriteStatus> close() {
 
       setupWriteStatus();
 
+      // createCompleteMarkerFile throws hoodieException, if marker directory 
is not present.
+      createCompletedMarkerFile(partitionPath, this.instantTime);

Review Comment:
   Ditto: create when necessary.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -132,6 +153,25 @@ public Set<String> allMarkerFilePaths() {
     }
   }
 
+  @Override
+  public void createMarkerDir() throws HoodieIOException {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath.toString());

Review Comment:
   Do we need a HTTP request for creating the marker dir then, the server 
already located on the driver, and we already create a marker dir when start a 
new instant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to