danny0405 commented on code in PR #9035: URL: https://github.com/apache/hudi/pull/9035#discussion_r1238326624
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ########## @@ -901,6 +901,9 @@ private void startCommit(String instantTime, String actionType, HoodieTableMetaC metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType, instantTime)); } + + // populate marker directory for the commit. + WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime).createMarkerDir(); } Review Comment: We can refactor the api to ```java public static WriteMarkers get(MarkerType markerType, HoodieTableMetaClient metaClient, String instantTime) ``` ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java: ########## @@ -512,6 +512,7 @@ public List<WriteStatus> close() { status.getStat().setFileSizeInBytes(logFileSize); } + createCompletedMarkerFile(partitionPath, baseInstantTime); Review Comment: Can we create the file only if necessary, when `enforceFinalizeWriteCheck()` or `enforceCompletionMarkerCheck()` is true. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java: ########## @@ -138,9 +139,35 @@ protected Path makeNewFilePath(String partitionPath, String fileName) { * * @param partitionPath Partition path */ - protected void createMarkerFile(String partitionPath, String dataFileName) { - WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime) - .create(partitionPath, dataFileName, getIOType(), config, fileId, hoodieTable.getMetaClient().getActiveTimeline()); + protected void createInProgressMarkerFile(String partitionPath, String dataFileName, String markerInstantTime) { + WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime); + if (!writeMarkers.doesMarkerDirExist()) { + throw new HoodieIOException(String.format("Marker root directory absent : %s/%s (%s)", + partitionPath, dataFileName, markerInstantTime)); + } + if (config.enforceFinalizeWriteCheck() + && writeMarkers.markerExists(writeMarkers.getCompletionMarkerPath("", "FINALIZE_WRITE", markerInstantTime, IOType.CREATE))) { + throw new HoodieCorruptedDataException("Reconciliation for instant " + instantTime + " is completed, job is trying to re-write the data files."); Review Comment: Not sure what purpose of the check, seems the file with `FINALIZE_WRITE` was never created in the code. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java: ########## @@ -135,18 +155,42 @@ private String translateMarkerToDataPath(String markerPath) { return stripMarkerSuffix(rPath); } + public static String stripMarkerSuffix(String path) { + return path.substring(0, path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)); + } + + public static String stripOldStyleMarkerSuffix(String path) { + // marker file was created by older version of Hudi, with INPROGRESS_MARKER_EXTN (f1_w1_c1.marker). + // Rename to data file by replacing .marker with .parquet. + return String.format("%s%s", path.substring(0, path.indexOf(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)), + HoodieFileFormat.PARQUET.getFileExtension()); + } + @Override public Set<String> allMarkerFilePaths() throws IOException { Set<String> markerFiles = new HashSet<>(); if (doesMarkerDirExist()) { FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> { - markerFiles.add(MarkerUtils.stripMarkerFolderPrefix(fileStatus.getPath().toString(), basePath, instantTime)); + // Only the inprogres markerFiles are to be included here + if (fileStatus.getPath().toString().contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN)) { Review Comment: why only include in-progress marker files. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java: ########## @@ -86,6 +89,17 @@ public TimelineServerBasedWriteMarkers(HoodieTable table, String instantTime) { this.timeoutSecs = timeoutSecs; } + @Override + protected Path getMarkerPath(String partitionPath, String dataFileName, IOType type) { + return new Path(partitionPath, getMarkerFileName(dataFileName, type)); + } Review Comment: Why override the partition path as relative? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java: ########## @@ -612,6 +612,20 @@ public class HoodieWriteConfig extends HoodieConfig { .sinceVersion("0.10.0") .withDocumentation("File Id Prefix provider class, that implements `org.apache.hudi.fileid.FileIdPrefixProvider`"); + public static final ConfigProperty<String> ENFORCE_COMPLETION_MARKER_CHECKS = ConfigProperty + .key("hoodie.markers.enforce.completion.checks") + .defaultValue("false") + .sinceVersion("0.10.0") + .withDocumentation("Prevents the creation of duplicate data files, when multiple spark tasks are racing to " + + "create data files and a completed data file is already present"); + + public static final ConfigProperty<String> ENFORCE_FINALIZE_WRITE_CHECK = ConfigProperty + .key("hoodie.markers.enforce.finalize.write.check") + .defaultValue("false") + .sinceVersion("0.10.0") Review Comment: FIx the sinceVersion ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java: ########## @@ -441,6 +441,9 @@ public List<WriteStatus> close() { performMergeDataValidationCheck(writeStatus); + // createCompleteMarkerFile throws hoodieException, if marker directory is not present. + createCompletedMarkerFile(partitionPath, this.instantTime); Review Comment: Ditto: create when necessary. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java: ########## @@ -119,7 +139,7 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa while (itr.hasNext()) { FileStatus status = itr.next(); String pathStr = status.getPath().toString(); - if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { + if (pathStr.contains(HoodieTableMetaClient.INPROGRESS_MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { result.add(translateMarkerToDataPath(pathStr)); Review Comment: Why we ignore the handling of completion marker files. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java: ########## @@ -217,6 +218,8 @@ public List<WriteStatus> close() { setupWriteStatus(); + // createCompleteMarkerFile throws hoodieException, if marker directory is not present. + createCompletedMarkerFile(partitionPath, this.instantTime); Review Comment: Ditto: create when necessary. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java: ########## @@ -132,6 +153,25 @@ public Set<String> allMarkerFilePaths() { } } + @Override + public void createMarkerDir() throws HoodieIOException { + HoodieTimer timer = new HoodieTimer().startTimer(); + Map<String, String> paramsMap = new HashMap<>(); + paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); Review Comment: Do we need a HTTP request for creating the marker dir then, the server already located on the driver, and we already create a marker dir when start a new instant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org