nsivabalan commented on code in PR #9523:
URL: https://github.com/apache/hudi/pull/9523#discussion_r1304953163


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -130,6 +132,53 @@ public Set<String> 
createdAndMergedDataPaths(HoodieEngineContext context, int pa
     return dataFiles;
   }
 
+  public Set<String> appendedLogPaths(HoodieEngineContext context, int 
parallelism) throws IOException {

Review Comment:
   we might need to fix createdAndMergedDataPaths to not return any log file 
markers. for eg, we could have CREATE markers for log files. and due to spark 
retries, we could have more than 1. 
   So, essentially we change createdAndMergedDataPaths to return just data 
files and not an log files). 
   
   here within appendedLogPaths, we can return any marker for log 
files(irrespective of whether its CREATE or APPEND) 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -276,4 +283,33 @@ protected static Option<IndexedRecord> 
toAvroRecord(HoodieRecord record, Schema
       return Option.empty();
     }
   }
+
+  protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback 
{
+    // here we distinguish log files created from log files being appended. 
Considering following scenario:
+    // An appending task write to log file.
+    // (1) append to existing file file_instant_writetoken1.log.1
+    // (2) rollover and create file file_instant_writetoken2.log.2
+    // Then this task failed and retry by a new task.
+    // (3) append to existing file file_instant_writetoken1.log.1
+    // (4) rollover and create file file_instant_writetoken3.log.2
+    // finally file_instant_writetoken2.log.2 should not be committed to hudi, 
we use marker file to delete it.

Review Comment:
   minor: this documentation needs to be fixed. 
   we need to say, we will commit all files to hudi and sync all duplicated 
files if any to Metadata table (if enabled) as well. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java:
##########
@@ -60,6 +80,78 @@ public 
BaseSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, Hood
     super(context, config, table, instantTime, operationType, extraMetadata);
   }
 
+  protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat> 
writeStats) {
+    String actionType = getCommitActionType();
+    LOG.info("Committing " + instantTime + ", action Type " + actionType + ", 
operation Type " + operationType);
+    result.setCommitted(true);
+    result.setWriteStats(writeStats);
+    // Finalize write
+    finalizeWrite(instantTime, writeStats, result);
+    try {
+      HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+      HoodieCommitMetadata metadata = addMissingLogFileIfNeeded(result);
+      writeTableMetadata(metadata, result.getWriteStatuses(), actionType);
+      activeTimeline.saveAsComplete(new HoodieInstant(true, 
getCommitActionType(), instantTime),
+          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+      LOG.info("Committed " + instantTime);
+      result.setCommitMetadata(Option.of(metadata));
+    } catch (IOException e) {
+      throw new HoodieCommitException("Failed to complete commit " + 
config.getBasePath() + " at time " + instantTime,
+          e);
+    }
+  }
+
+  /* In spark mor table, any failed spark task may generate log files which 
are not included in write status.
+   * We need to add these to CommitMetadata so that it will be synced to MDT 
and make MDT has correct file info.
+   */
+  private HoodieCommitMetadata 
addMissingLogFileIfNeeded(HoodieWriteMetadata<HoodieData<WriteStatus>> result) 
throws IOException {

Review Comment:
   we can move these to static utility methods so that we can re-use the code



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -103,35 +120,47 @@ public List<HoodieRollbackRequest> 
getRollbackRequests(HoodieInstant instantToRo
     }
   }
 
-  protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant 
instantToRollback, String markerFilePath) throws IOException {
-    Path baseFilePathForAppend = new Path(basePath, markerFilePath);
-    String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
-    String baseCommitTime = 
FSUtils.getCommitTime(baseFilePathForAppend.getName());
-    String relativePartitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), baseFilePathForAppend.getParent());
-    Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), 
relativePartitionPath);
-
-    // NOTE: Since we're rolling back incomplete Delta Commit, it only could 
have appended its
-    //       block to the latest log-file
-    // TODO(HUDI-1517) use provided marker-file's path instead
-    Option<HoodieLogFile> latestLogFileOption = 
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
-        HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
-
-    // Log file can be deleted if the commit to rollback is also the commit 
that created the fileGroup
-    if (latestLogFileOption.isPresent() && 
baseCommitTime.equals(instantToRollback.getTimestamp())) {
-      Path fullDeletePath = new Path(partitionPath, 
latestLogFileOption.get().getFileName());
-      return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, 
EMPTY_STRING,
-          Collections.singletonList(fullDeletePath.toString()),
-          Collections.emptyMap());
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String 
markerFilePath) throws IOException {
+    Path filePath = new Path(basePath, markerFilePath);
+    String fileId;
+    String baseCommitTime;
+    String relativePartitionPath;
+    Option<HoodieLogFile> latestLogFileOption;
+
+    // Old marker files may be generated from base file name before HUDI-1517. 
keep compatible with them.
+    // TODO: deprecated in HUDI-1517, may be removed in the future. 
@guanziyue.gzy
+    if (FSUtils.isBaseFile(filePath)) {

Review Comment:
   oh, this is already backwards compatible? 
   i.e during upgrade we don't need to re-create marker files bcoz, we can 
handle a mix of marker files. 
   nice.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +124,84 @@ public boolean commit(String instantTime, 
JavaRDD<WriteStatus> writeStatuses, Op
     return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), 
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, 
extraPreCommitFunc);
   }
 
+  protected void commit(HoodieTable table, String commitActionType, String 
instantTime, HoodieCommitMetadata metadata,
+                        List<HoodieWriteStat> stats, HoodieData<WriteStatus> 
writeStatuses) throws IOException {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    HoodieCommitMetadata fixedCommitMetadata = 
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+    // Finalize write
+    finalizeWrite(table, instantTime, stats);
+    // do save internal schema to support Implicitly add columns in write 
process
+    if 
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+        && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) && 
table.getConfig().getSchemaEvolutionEnable()) {
+      saveInternalSchema(table, instantTime, fixedCommitMetadata);
+    }
+    // update Metadata table
+    writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+    activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, 
instantTime),
+        
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+  }
+
+  /* In spark mor table, any failed spark task may generate log files which 
are not included in write status.
+   * We need to add these to CommitMetadata so that it will be synced to MDT 
and make MDT has correct file info.
+   */
+  private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table, 
String commitActionType, String instantTime,
+                                                         HoodieCommitMetadata 
commitMetadata) throws IOException {
+    if 
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+        || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) 
{
+      return commitMetadata;
+    }
+
+    HoodieCommitMetadata metadata = commitMetadata;
+    WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), 
table, instantTime);
+    // if there is log files in this delta commit, we search any invalid log 
files generated by failed spark task
+    boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+        .values().stream().flatMap(List::stream)
+        .anyMatch(writeStat -> FSUtils.isLogFile(new 
Path(config.getBasePath(), writeStat.getPath()).getName()));
+    if (hasLogFileInDeltaCommit) {
+      // get all log files generated by log mark file
+      Set<String> logFilesMarkerPath = new 
HashSet<>(markers.appendedLogPaths(context, 
config.getFinalizeWriteParallelism()));
+
+      // remove valid log files
+      for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : 
metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat hoodieWriteStat : 
partitionAndWriteStats.getValue()) {
+          logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+        }
+      }
+
+      // remaining are invalid log files, let's generate write stat for them
+      if (logFilesMarkerPath.size() > 0) {
+        context.setJobStatus(this.getClass().getSimpleName(), "generate 
writeStat for missing log files");
+        List<Option<HoodieDeltaWriteStat>> fakeLogFileWriteStat = 
context.map(new ArrayList<>(logFilesMarkerPath), (logFilePath) -> {
+          FileSystem fileSystem = table.getMetaClient().getFs();
+          FileStatus fileStatus;
+          try {
+            fileStatus = fileSystem.getFileStatus(new 
Path(config.getBasePath(), logFilePath));
+          } catch (FileNotFoundException fileNotFoundException) {
+            return Option.empty();
+          }
+
+          HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
+          HoodieLogFile logFile = new HoodieLogFile(fileStatus);
+          writeStat.setPath(logFilePath);
+          writeStat.setFileId(logFile.getFileId());
+          writeStat.setTotalWriteBytes(logFile.getFileSize());
+          writeStat.setFileSizeInBytes(logFile.getFileSize());
+          writeStat.setLogVersion(logFile.getLogVersion());
+          
writeStat.setLogFiles(Collections.singletonList(logFile.getPath().toString()));
+          writeStat.setPartitionPath(FSUtils.getRelativePartitionPath(new 
Path(config.getBasePath()), fileStatus.getPath().getParent()));
+          return Option.of(writeStat);
+        }, config.getFinalizeWriteParallelism());
+
+        // add these write stat to commit meta

Review Comment:
   lets add documentation(comments) around why would a file could be missing 
here. I mean, a marker refers to a file, but the actual file is missing. 
   



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java:
##########
@@ -343,27 +359,73 @@ public void testSimpleInsertsGeneratedIntoLogFiles() 
throws Exception {
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
       JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, 
newCommitTime);
+      long expectedLogFileNum = statuses.map(writeStatus -> 
(HoodieDeltaWriteStat) writeStatus.getStat())
+          .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator())
+          .count();
+      // inject a fake log file to test marker file for log file
+      HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat) 
statuses.map(WriteStatus::getStat).take(1).get(0);
+      assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath())));
+      HoodieLogFile correctLogFile = new 
HoodieLogFile(correctWriteStat.getPath());
+      String correctWriteToken = 
FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath());
+
+      final String fakeToken = generateFakeWriteToken(correctWriteToken);
+
+      final WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(),
+          HoodieSparkTable.create(config, context()), newCommitTime);
+      HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder()
+          .onParentPath(FSUtils.getPartitionPath(config.getBasePath(), 
correctWriteStat.getPartitionPath()))
+          
.withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime)
+          .withLogVersion(correctLogFile.getLogVersion())
+          .withFileSize(0L)
+          .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs())
+          .withRolloverLogWriteToken(fakeToken)
+          .withLogWriteToken(fakeToken)
+          .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+          .withLogWriteCallback(new HoodieLogFileWriteCallback() {
+            @Override
+            public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+              return writeMarkers.create(correctWriteStat.getPartitionPath(), 
logFileToAppend.getFileName(), IOType.APPEND).isPresent();
+            }
+
+            @Override
+            public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+              return writeMarkers.create(correctWriteStat.getPartitionPath(), 
logFileToCreate.getFileName(), IOType.APPEND).isPresent();
+            }
+          }).build();
+      AppendResult fakeAppendResult = 
fakeLogWriter.appendBlock(getLogBlock(records, config.getSchema()));

Review Comment:
   lets not call it fake. 
   lets name these additionaLogWriter, additionalAppendResult etc 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -130,6 +132,53 @@ public Set<String> 
createdAndMergedDataPaths(HoodieEngineContext context, int pa
     return dataFiles;
   }
 
+  public Set<String> appendedLogPaths(HoodieEngineContext context, int 
parallelism) throws IOException {
+    Set<String> logFiles = new HashSet<>();
+
+    FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath);
+    List<String> subDirectories = new ArrayList<>();
+    for (FileStatus topLevelStatus: topLevelStatuses) {
+      if (topLevelStatus.isFile()) {
+        String pathStr = topLevelStatus.getPath().toString();
+        if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && 
pathStr.endsWith(IOType.APPEND.name())) {
+          logFiles.add(translateMarkerToDataPath(pathStr));
+        }
+      } else {
+        subDirectories.add(topLevelStatus.getPath().toString());
+      }
+    }
+
+    if (subDirectories.size() > 0) {

Review Comment:
   there is an opportunity for code re-use here. 
   I understand you want to quickly put out the patch. but before we land, lets 
retry to code re-use 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -78,22 +82,35 @@ public List<HoodieRollbackRequest> 
getRollbackRequests(HoodieInstant instantToRo
       return context.map(markerPaths, markerFilePath -> {
         String typeStr = 
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
         IOType type = IOType.valueOf(typeStr);
+        String partitionFilePath = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
+        Path fullFilePath = new Path(basePath, partitionFilePath);
+        String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullFilePath.getParent());
         switch (type) {
           case MERGE:
           case CREATE:
-            String fileToDelete = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
-            Path fullDeletePath = new Path(basePath, fileToDelete);
-            String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());
-            return new HoodieRollbackRequest(partitionPath, EMPTY_STRING, 
EMPTY_STRING,
-                Collections.singletonList(fullDeletePath.toString()),
+            String fileId = null;
+            String baseInstantTime = null;
+            if (FSUtils.isBaseFile(fullFilePath)) {

Review Comment:
   one thing to remember is. we need to create markers for log files during 
upgrade. 
   i.e. just before upgrade, is there was a failed commit, the log files 
produced by them are not in the new format. So, during upgrade we might need to 
track them down and create markers in this new format before rollback kicks in. 
   
   not required in this patch itself, but in a follow up patch. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -78,22 +82,35 @@ public List<HoodieRollbackRequest> 
getRollbackRequests(HoodieInstant instantToRo
       return context.map(markerPaths, markerFilePath -> {
         String typeStr = 
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
         IOType type = IOType.valueOf(typeStr);
+        String partitionFilePath = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
+        Path fullFilePath = new Path(basePath, partitionFilePath);
+        String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullFilePath.getParent());
         switch (type) {
           case MERGE:
           case CREATE:
-            String fileToDelete = 
WriteMarkers.stripMarkerSuffix(markerFilePath);
-            Path fullDeletePath = new Path(basePath, fileToDelete);
-            String partitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), fullDeletePath.getParent());
-            return new HoodieRollbackRequest(partitionPath, EMPTY_STRING, 
EMPTY_STRING,
-                Collections.singletonList(fullDeletePath.toString()),
+            String fileId = null;
+            String baseInstantTime = null;
+            if (FSUtils.isBaseFile(fullFilePath)) {
+              HoodieBaseFile baseFileToDelete = new 
HoodieBaseFile(fullFilePath.toString());
+              fileId = baseFileToDelete.getFileId();
+              baseInstantTime = baseFileToDelete.getCommitTime();
+            } else if (FSUtils.isLogFile(fullFilePath)) {
+              // TODO: HUDI-1517 may distinguish log file created from log 
file being appended in the future @guanziyue
+              // Now it should not have create type
+              checkArgument(type != IOType.CREATE, "Log file should not 
support create io type now");
+              checkArgument(type != IOType.MERGE, "Log file should not support 
merge io type");

Review Comment:
   oh I see, we are creating APPENDS for all log files irrespective of whether 
it was created or appended to. 
   makese sense to not complicate things. 
   I also, initially thought about LOG_CREATE and LOG_APPEND.
   but the purpose of these markers are during re-concilication and for log 
files, we are not going to delete and treat everything as appends or new files 
created. so not a bad idea to go with APPEND.
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +124,84 @@ public boolean commit(String instantTime, 
JavaRDD<WriteStatus> writeStatuses, Op
     return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), 
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, 
extraPreCommitFunc);
   }
 
+  protected void commit(HoodieTable table, String commitActionType, String 
instantTime, HoodieCommitMetadata metadata,
+                        List<HoodieWriteStat> stats, HoodieData<WriteStatus> 
writeStatuses) throws IOException {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    HoodieCommitMetadata fixedCommitMetadata = 
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+    // Finalize write
+    finalizeWrite(table, instantTime, stats);
+    // do save internal schema to support Implicitly add columns in write 
process
+    if 
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+        && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) && 
table.getConfig().getSchemaEvolutionEnable()) {
+      saveInternalSchema(table, instantTime, fixedCommitMetadata);
+    }
+    // update Metadata table
+    writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+    activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, 
instantTime),
+        
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+  }
+
+  /* In spark mor table, any failed spark task may generate log files which 
are not included in write status.
+   * We need to add these to CommitMetadata so that it will be synced to MDT 
and make MDT has correct file info.
+   */
+  private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table, 
String commitActionType, String instantTime,
+                                                         HoodieCommitMetadata 
commitMetadata) throws IOException {
+    if 
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+        || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) 
{
+      return commitMetadata;
+    }
+
+    HoodieCommitMetadata metadata = commitMetadata;
+    WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), 
table, instantTime);
+    // if there is log files in this delta commit, we search any invalid log 
files generated by failed spark task
+    boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+        .values().stream().flatMap(List::stream)
+        .anyMatch(writeStat -> FSUtils.isLogFile(new 
Path(config.getBasePath(), writeStat.getPath()).getName()));
+    if (hasLogFileInDeltaCommit) {
+      // get all log files generated by log mark file
+      Set<String> logFilesMarkerPath = new 
HashSet<>(markers.appendedLogPaths(context, 
config.getFinalizeWriteParallelism()));
+
+      // remove valid log files
+      for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : 
metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat hoodieWriteStat : 
partitionAndWriteStats.getValue()) {
+          logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+        }
+      }
+
+      // remaining are invalid log files, let's generate write stat for them
+      if (logFilesMarkerPath.size() > 0) {
+        context.setJobStatus(this.getClass().getSimpleName(), "generate 
writeStat for missing log files");
+        List<Option<HoodieDeltaWriteStat>> fakeLogFileWriteStat = 
context.map(new ArrayList<>(logFilesMarkerPath), (logFilePath) -> {

Review Comment:
   lets rename this. 
   may be, additionalLogFileWriteStat



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +124,84 @@ public boolean commit(String instantTime, 
JavaRDD<WriteStatus> writeStatuses, Op
     return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), 
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, 
extraPreCommitFunc);
   }
 
+  protected void commit(HoodieTable table, String commitActionType, String 
instantTime, HoodieCommitMetadata metadata,
+                        List<HoodieWriteStat> stats, HoodieData<WriteStatus> 
writeStatuses) throws IOException {
+    LOG.info("Committing " + instantTime + " action " + commitActionType);
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    HoodieCommitMetadata fixedCommitMetadata = 
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+    // Finalize write
+    finalizeWrite(table, instantTime, stats);
+    // do save internal schema to support Implicitly add columns in write 
process
+    if 
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+        && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) && 
table.getConfig().getSchemaEvolutionEnable()) {
+      saveInternalSchema(table, instantTime, fixedCommitMetadata);
+    }
+    // update Metadata table
+    writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+    activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, 
instantTime),
+        
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+  }
+
+  /* In spark mor table, any failed spark task may generate log files which 
are not included in write status.
+   * We need to add these to CommitMetadata so that it will be synced to MDT 
and make MDT has correct file info.
+   */
+  private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table, 
String commitActionType, String instantTime,
+                                                         HoodieCommitMetadata 
commitMetadata) throws IOException {
+    if 
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+        || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) 
{
+      return commitMetadata;
+    }
+
+    HoodieCommitMetadata metadata = commitMetadata;
+    WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), 
table, instantTime);
+    // if there is log files in this delta commit, we search any invalid log 
files generated by failed spark task
+    boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+        .values().stream().flatMap(List::stream)
+        .anyMatch(writeStat -> FSUtils.isLogFile(new 
Path(config.getBasePath(), writeStat.getPath()).getName()));
+    if (hasLogFileInDeltaCommit) {
+      // get all log files generated by log mark file
+      Set<String> logFilesMarkerPath = new 
HashSet<>(markers.appendedLogPaths(context, 
config.getFinalizeWriteParallelism()));
+
+      // remove valid log files
+      for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : 
metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat hoodieWriteStat : 
partitionAndWriteStats.getValue()) {
+          logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+        }
+      }
+
+      // remaining are invalid log files, let's generate write stat for them
+      if (logFilesMarkerPath.size() > 0) {
+        context.setJobStatus(this.getClass().getSimpleName(), "generate 
writeStat for missing log files");
+        List<Option<HoodieDeltaWriteStat>> fakeLogFileWriteStat = 
context.map(new ArrayList<>(logFilesMarkerPath), (logFilePath) -> {
+          FileSystem fileSystem = table.getMetaClient().getFs();
+          FileStatus fileStatus;
+          try {
+            fileStatus = fileSystem.getFileStatus(new 
Path(config.getBasePath(), logFilePath));

Review Comment:
   lets move this file status/file size deduction to a private method. 
   may be we want to optimize by doing one listing per partition. if for hdfs, 
we can add diff logic. But for cloud stores, this sill result in too many 
remote calls.
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java:
##########
@@ -60,6 +80,78 @@ public 
BaseSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, Hood
     super(context, config, table, instantTime, operationType, extraMetadata);
   }
 
+  protected void commit(Option<Map<String, String>> extraMetadata, 
HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat> 
writeStats) {

Review Comment:
   I see why we are overriding entire commit method here. 
   can we introduce a method called `appendMetadataForMissingFiles()`. 
   and the default in BaseSparkCommitActionExecutor will be no-op (will return 
the original commit metadata as is). 
   
   here in BaseSparkDeltaCommitActionExecutor, instead of over-riding entire 
commit method, we can override just appendMetadataForMissingFiles 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -130,6 +132,53 @@ public Set<String> 
createdAndMergedDataPaths(HoodieEngineContext context, int pa
     return dataFiles;
   }
 
+  public Set<String> appendedLogPaths(HoodieEngineContext context, int 
parallelism) throws IOException {

Review Comment:
   may be we can name this 
   createdAndAppendedLogFilePaths



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -844,6 +847,48 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>> 
twoUpsertCommitDataWithTwoP
     return Pair.of(records, records2);
   }
 
+  /**
+   * Since how markers are generated for log file changed in Version Six, we 
regenerate markers in the way version zero do.
+   *
+   * @param table instance of {@link HoodieTable}
+   */
+  private void prepForUpgradeFromZeroToOne(HoodieTable table) throws 
IOException {
+    List<HoodieInstant> instantsToBeParsed  =
+        metaClient.getActiveTimeline()
+        .getCommitsTimeline()
+        .getInstantsAsStream()
+        .collect(Collectors.toList());
+    for (HoodieInstant instant : instantsToBeParsed) {
+      WriteMarkers writeMarkers =
+          WriteMarkersFactory.get(table.getConfig().getMarkersType(), table, 
instant.getTimestamp());
+      Set<String> oldMarkers = writeMarkers.allMarkerFilePaths();
+      boolean hasAppendMarker = oldMarkers.stream().anyMatch(marker -> 
marker.contains(IOType.APPEND.name())

Review Comment:
   why both APPEND and CREATE. 
   I guess APPEND alone should suffice right?
   we need to re-generate markers only for log files right. 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java:
##########
@@ -343,27 +359,73 @@ public void testSimpleInsertsGeneratedIntoLogFiles() 
throws Exception {
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
       JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, 
newCommitTime);
+      long expectedLogFileNum = statuses.map(writeStatus -> 
(HoodieDeltaWriteStat) writeStatus.getStat())
+          .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator())
+          .count();
+      // inject a fake log file to test marker file for log file
+      HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat) 
statuses.map(WriteStatus::getStat).take(1).get(0);
+      assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath())));
+      HoodieLogFile correctLogFile = new 
HoodieLogFile(correctWriteStat.getPath());
+      String correctWriteToken = 
FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath());
+
+      final String fakeToken = generateFakeWriteToken(correctWriteToken);
+
+      final WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(),
+          HoodieSparkTable.create(config, context()), newCommitTime);
+      HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder()
+          .onParentPath(FSUtils.getPartitionPath(config.getBasePath(), 
correctWriteStat.getPartitionPath()))
+          
.withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime)
+          .withLogVersion(correctLogFile.getLogVersion())
+          .withFileSize(0L)
+          .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs())
+          .withRolloverLogWriteToken(fakeToken)
+          .withLogWriteToken(fakeToken)
+          .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+          .withLogWriteCallback(new HoodieLogFileWriteCallback() {
+            @Override
+            public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+              return writeMarkers.create(correctWriteStat.getPartitionPath(), 
logFileToAppend.getFileName(), IOType.APPEND).isPresent();
+            }
+
+            @Override
+            public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+              return writeMarkers.create(correctWriteStat.getPartitionPath(), 
logFileToCreate.getFileName(), IOType.APPEND).isPresent();
+            }
+          }).build();
+      AppendResult fakeAppendResult = 
fakeLogWriter.appendBlock(getLogBlock(records, config.getSchema()));
+      fakeLogWriter.close();
+      // check marker for fake log generated
+      assertTrue(writeMarkers.allMarkerFilePaths().stream().anyMatch(marker -> 
marker.contains(fakeToken)));
+      SyncableFileSystemView unCommittedFsView = 
getFileSystemViewWithUnCommittedSlices(metaClient);
+      // check fake log generated
+      
assertTrue(unCommittedFsView.getAllFileSlices(correctWriteStat.getPartitionPath())
+          .flatMap(FileSlice::getLogFiles).map(HoodieLogFile::getPath)
+          .anyMatch(path -> 
path.getName().equals(fakeAppendResult.logFile().getPath().getName())));
       writeClient.commit(newCommitTime, statuses);
 
       HoodieTable table = HoodieSparkTable.create(config, context(), 
metaClient);
       table.getHoodieView().sync();
       TableFileSystemView.SliceView tableRTFileSystemView = 
table.getSliceView();
-
+      // get log file number from filesystem view
       long numLogFiles = 0;
       for (String partitionPath : dataGen.getPartitionPaths()) {
         List<FileSlice> allSlices = 
tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
         assertEquals(0, allSlices.stream().filter(fileSlice -> 
fileSlice.getBaseFile().isPresent()).count());
         assertTrue(allSlices.stream().anyMatch(fileSlice -> 
fileSlice.getLogFiles().count() > 0));
-        long logFileCount = allSlices.stream().filter(fileSlice -> 
fileSlice.getLogFiles().count() > 0).count();
+        long logFileCount = allSlices.stream().mapToLong(fileSlice -> 
fileSlice.getLogFiles().count()).sum();
         if (logFileCount > 0) {
           // check the log versions start from the base version
           assertTrue(allSlices.stream().map(slice -> 
slice.getLogFiles().findFirst().get().getLogVersion())
               .allMatch(version -> 
version.equals(HoodieLogFile.LOGFILE_BASE_VERSION)));
         }
         numLogFiles += logFileCount;
       }
-
-      assertTrue(numLogFiles > 0);
+      // check log file number in file system cover both valid log file and 
invalid log file

Review Comment:
   minor. 
   "check log file number in file system to cover all log files including 
additional log files created with spark task retries". 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to