nsivabalan commented on code in PR #9523: URL: https://github.com/apache/hudi/pull/9523#discussion_r1304953163
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java: ########## @@ -130,6 +132,53 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa return dataFiles; } + public Set<String> appendedLogPaths(HoodieEngineContext context, int parallelism) throws IOException { Review Comment: we might need to fix createdAndMergedDataPaths to not return any log file markers. for eg, we could have CREATE markers for log files. and due to spark retries, we could have more than 1. So, essentially we change createdAndMergedDataPaths to return just data files and not an log files). here within appendedLogPaths, we can return any marker for log files(irrespective of whether its CREATE or APPEND) ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java: ########## @@ -276,4 +283,33 @@ protected static Option<IndexedRecord> toAvroRecord(HoodieRecord record, Schema return Option.empty(); } } + + protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback { + // here we distinguish log files created from log files being appended. Considering following scenario: + // An appending task write to log file. + // (1) append to existing file file_instant_writetoken1.log.1 + // (2) rollover and create file file_instant_writetoken2.log.2 + // Then this task failed and retry by a new task. + // (3) append to existing file file_instant_writetoken1.log.1 + // (4) rollover and create file file_instant_writetoken3.log.2 + // finally file_instant_writetoken2.log.2 should not be committed to hudi, we use marker file to delete it. Review Comment: minor: this documentation needs to be fixed. we need to say, we will commit all files to hudi and sync all duplicated files if any to Metadata table (if enabled) as well. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java: ########## @@ -60,6 +80,78 @@ public BaseSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, Hood super(context, config, table, instantTime, operationType, extraMetadata); } + protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat> writeStats) { + String actionType = getCommitActionType(); + LOG.info("Committing " + instantTime + ", action Type " + actionType + ", operation Type " + operationType); + result.setCommitted(true); + result.setWriteStats(writeStats); + // Finalize write + finalizeWrite(instantTime, writeStats, result); + try { + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + HoodieCommitMetadata metadata = addMissingLogFileIfNeeded(result); + writeTableMetadata(metadata, result.getWriteStatuses(), actionType); + activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), + Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + LOG.info("Committed " + instantTime); + result.setCommitMetadata(Option.of(metadata)); + } catch (IOException e) { + throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, + e); + } + } + + /* In spark mor table, any failed spark task may generate log files which are not included in write status. + * We need to add these to CommitMetadata so that it will be synced to MDT and make MDT has correct file info. + */ + private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieWriteMetadata<HoodieData<WriteStatus>> result) throws IOException { Review Comment: we can move these to static utility methods so that we can re-use the code ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java: ########## @@ -103,35 +120,47 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo } } - protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant instantToRollback, String markerFilePath) throws IOException { - Path baseFilePathForAppend = new Path(basePath, markerFilePath); - String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend); - String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName()); - String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), baseFilePathForAppend.getParent()); - Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), relativePartitionPath); - - // NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its - // block to the latest log-file - // TODO(HUDI-1517) use provided marker-file's path instead - Option<HoodieLogFile> latestLogFileOption = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId, - HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime); - - // Log file can be deleted if the commit to rollback is also the commit that created the fileGroup - if (latestLogFileOption.isPresent() && baseCommitTime.equals(instantToRollback.getTimestamp())) { - Path fullDeletePath = new Path(partitionPath, latestLogFileOption.get().getFileName()); - return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, EMPTY_STRING, - Collections.singletonList(fullDeletePath.toString()), - Collections.emptyMap()); + protected HoodieRollbackRequest getRollbackRequestForAppend(String markerFilePath) throws IOException { + Path filePath = new Path(basePath, markerFilePath); + String fileId; + String baseCommitTime; + String relativePartitionPath; + Option<HoodieLogFile> latestLogFileOption; + + // Old marker files may be generated from base file name before HUDI-1517. keep compatible with them. + // TODO: deprecated in HUDI-1517, may be removed in the future. @guanziyue.gzy + if (FSUtils.isBaseFile(filePath)) { Review Comment: oh, this is already backwards compatible? i.e during upgrade we don't need to re-create marker files bcoz, we can handle a mix of marker files. nice. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ########## @@ -104,6 +124,84 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Op return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); } + protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata, + List<HoodieWriteStat> stats, HoodieData<WriteStatus> writeStatuses) throws IOException { + LOG.info("Committing " + instantTime + " action " + commitActionType); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + HoodieCommitMetadata fixedCommitMetadata = addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata); + // Finalize write + finalizeWrite(table, instantTime, stats); + // do save internal schema to support Implicitly add columns in write process + if (!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA) + && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) && table.getConfig().getSchemaEvolutionEnable()) { + saveInternalSchema(table, instantTime, fixedCommitMetadata); + } + // update Metadata table + writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses); + activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime), + Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } + + /* In spark mor table, any failed spark task may generate log files which are not included in write status. + * We need to add these to CommitMetadata so that it will be synced to MDT and make MDT has correct file info. + */ + private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table, String commitActionType, String instantTime, + HoodieCommitMetadata commitMetadata) throws IOException { + if (!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ) + || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) { + return commitMetadata; + } + + HoodieCommitMetadata metadata = commitMetadata; + WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), table, instantTime); + // if there is log files in this delta commit, we search any invalid log files generated by failed spark task + boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats() + .values().stream().flatMap(List::stream) + .anyMatch(writeStat -> FSUtils.isLogFile(new Path(config.getBasePath(), writeStat.getPath()).getName())); + if (hasLogFileInDeltaCommit) { + // get all log files generated by log mark file + Set<String> logFilesMarkerPath = new HashSet<>(markers.appendedLogPaths(context, config.getFinalizeWriteParallelism())); + + // remove valid log files + for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat hoodieWriteStat : partitionAndWriteStats.getValue()) { + logFilesMarkerPath.remove(hoodieWriteStat.getPath()); + } + } + + // remaining are invalid log files, let's generate write stat for them + if (logFilesMarkerPath.size() > 0) { + context.setJobStatus(this.getClass().getSimpleName(), "generate writeStat for missing log files"); + List<Option<HoodieDeltaWriteStat>> fakeLogFileWriteStat = context.map(new ArrayList<>(logFilesMarkerPath), (logFilePath) -> { + FileSystem fileSystem = table.getMetaClient().getFs(); + FileStatus fileStatus; + try { + fileStatus = fileSystem.getFileStatus(new Path(config.getBasePath(), logFilePath)); + } catch (FileNotFoundException fileNotFoundException) { + return Option.empty(); + } + + HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat(); + HoodieLogFile logFile = new HoodieLogFile(fileStatus); + writeStat.setPath(logFilePath); + writeStat.setFileId(logFile.getFileId()); + writeStat.setTotalWriteBytes(logFile.getFileSize()); + writeStat.setFileSizeInBytes(logFile.getFileSize()); + writeStat.setLogVersion(logFile.getLogVersion()); + writeStat.setLogFiles(Collections.singletonList(logFile.getPath().toString())); + writeStat.setPartitionPath(FSUtils.getRelativePartitionPath(new Path(config.getBasePath()), fileStatus.getPath().getParent())); + return Option.of(writeStat); + }, config.getFinalizeWriteParallelism()); + + // add these write stat to commit meta Review Comment: lets add documentation(comments) around why would a file could be missing here. I mean, a marker refers to a file, but the actual file is missing. ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java: ########## @@ -343,27 +359,73 @@ public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception { List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1); JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime); + long expectedLogFileNum = statuses.map(writeStatus -> (HoodieDeltaWriteStat) writeStatus.getStat()) + .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator()) + .count(); + // inject a fake log file to test marker file for log file + HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat) statuses.map(WriteStatus::getStat).take(1).get(0); + assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath()))); + HoodieLogFile correctLogFile = new HoodieLogFile(correctWriteStat.getPath()); + String correctWriteToken = FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath()); + + final String fakeToken = generateFakeWriteToken(correctWriteToken); + + final WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), + HoodieSparkTable.create(config, context()), newCommitTime); + HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder() + .onParentPath(FSUtils.getPartitionPath(config.getBasePath(), correctWriteStat.getPartitionPath())) + .withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime) + .withLogVersion(correctLogFile.getLogVersion()) + .withFileSize(0L) + .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs()) + .withRolloverLogWriteToken(fakeToken) + .withLogWriteToken(fakeToken) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withLogWriteCallback(new HoodieLogFileWriteCallback() { + @Override + public boolean preLogFileOpen(HoodieLogFile logFileToAppend) { + return writeMarkers.create(correctWriteStat.getPartitionPath(), logFileToAppend.getFileName(), IOType.APPEND).isPresent(); + } + + @Override + public boolean preLogFileCreate(HoodieLogFile logFileToCreate) { + return writeMarkers.create(correctWriteStat.getPartitionPath(), logFileToCreate.getFileName(), IOType.APPEND).isPresent(); + } + }).build(); + AppendResult fakeAppendResult = fakeLogWriter.appendBlock(getLogBlock(records, config.getSchema())); Review Comment: lets not call it fake. lets name these additionaLogWriter, additionalAppendResult etc ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java: ########## @@ -130,6 +132,53 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa return dataFiles; } + public Set<String> appendedLogPaths(HoodieEngineContext context, int parallelism) throws IOException { + Set<String> logFiles = new HashSet<>(); + + FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath); + List<String> subDirectories = new ArrayList<>(); + for (FileStatus topLevelStatus: topLevelStatuses) { + if (topLevelStatus.isFile()) { + String pathStr = topLevelStatus.getPath().toString(); + if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && pathStr.endsWith(IOType.APPEND.name())) { + logFiles.add(translateMarkerToDataPath(pathStr)); + } + } else { + subDirectories.add(topLevelStatus.getPath().toString()); + } + } + + if (subDirectories.size() > 0) { Review Comment: there is an opportunity for code re-use here. I understand you want to quickly put out the patch. but before we land, lets retry to code re-use ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java: ########## @@ -78,22 +82,35 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo return context.map(markerPaths, markerFilePath -> { String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); IOType type = IOType.valueOf(typeStr); + String partitionFilePath = WriteMarkers.stripMarkerSuffix(markerFilePath); + Path fullFilePath = new Path(basePath, partitionFilePath); + String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullFilePath.getParent()); switch (type) { case MERGE: case CREATE: - String fileToDelete = WriteMarkers.stripMarkerSuffix(markerFilePath); - Path fullDeletePath = new Path(basePath, fileToDelete); - String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent()); - return new HoodieRollbackRequest(partitionPath, EMPTY_STRING, EMPTY_STRING, - Collections.singletonList(fullDeletePath.toString()), + String fileId = null; + String baseInstantTime = null; + if (FSUtils.isBaseFile(fullFilePath)) { Review Comment: one thing to remember is. we need to create markers for log files during upgrade. i.e. just before upgrade, is there was a failed commit, the log files produced by them are not in the new format. So, during upgrade we might need to track them down and create markers in this new format before rollback kicks in. not required in this patch itself, but in a follow up patch. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java: ########## @@ -78,22 +82,35 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo return context.map(markerPaths, markerFilePath -> { String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); IOType type = IOType.valueOf(typeStr); + String partitionFilePath = WriteMarkers.stripMarkerSuffix(markerFilePath); + Path fullFilePath = new Path(basePath, partitionFilePath); + String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullFilePath.getParent()); switch (type) { case MERGE: case CREATE: - String fileToDelete = WriteMarkers.stripMarkerSuffix(markerFilePath); - Path fullDeletePath = new Path(basePath, fileToDelete); - String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent()); - return new HoodieRollbackRequest(partitionPath, EMPTY_STRING, EMPTY_STRING, - Collections.singletonList(fullDeletePath.toString()), + String fileId = null; + String baseInstantTime = null; + if (FSUtils.isBaseFile(fullFilePath)) { + HoodieBaseFile baseFileToDelete = new HoodieBaseFile(fullFilePath.toString()); + fileId = baseFileToDelete.getFileId(); + baseInstantTime = baseFileToDelete.getCommitTime(); + } else if (FSUtils.isLogFile(fullFilePath)) { + // TODO: HUDI-1517 may distinguish log file created from log file being appended in the future @guanziyue + // Now it should not have create type + checkArgument(type != IOType.CREATE, "Log file should not support create io type now"); + checkArgument(type != IOType.MERGE, "Log file should not support merge io type"); Review Comment: oh I see, we are creating APPENDS for all log files irrespective of whether it was created or appended to. makese sense to not complicate things. I also, initially thought about LOG_CREATE and LOG_APPEND. but the purpose of these markers are during re-concilication and for log files, we are not going to delete and treat everything as appends or new files created. so not a bad idea to go with APPEND. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ########## @@ -104,6 +124,84 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Op return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); } + protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata, + List<HoodieWriteStat> stats, HoodieData<WriteStatus> writeStatuses) throws IOException { + LOG.info("Committing " + instantTime + " action " + commitActionType); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + HoodieCommitMetadata fixedCommitMetadata = addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata); + // Finalize write + finalizeWrite(table, instantTime, stats); + // do save internal schema to support Implicitly add columns in write process + if (!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA) + && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) && table.getConfig().getSchemaEvolutionEnable()) { + saveInternalSchema(table, instantTime, fixedCommitMetadata); + } + // update Metadata table + writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses); + activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime), + Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } + + /* In spark mor table, any failed spark task may generate log files which are not included in write status. + * We need to add these to CommitMetadata so that it will be synced to MDT and make MDT has correct file info. + */ + private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table, String commitActionType, String instantTime, + HoodieCommitMetadata commitMetadata) throws IOException { + if (!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ) + || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) { + return commitMetadata; + } + + HoodieCommitMetadata metadata = commitMetadata; + WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), table, instantTime); + // if there is log files in this delta commit, we search any invalid log files generated by failed spark task + boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats() + .values().stream().flatMap(List::stream) + .anyMatch(writeStat -> FSUtils.isLogFile(new Path(config.getBasePath(), writeStat.getPath()).getName())); + if (hasLogFileInDeltaCommit) { + // get all log files generated by log mark file + Set<String> logFilesMarkerPath = new HashSet<>(markers.appendedLogPaths(context, config.getFinalizeWriteParallelism())); + + // remove valid log files + for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat hoodieWriteStat : partitionAndWriteStats.getValue()) { + logFilesMarkerPath.remove(hoodieWriteStat.getPath()); + } + } + + // remaining are invalid log files, let's generate write stat for them + if (logFilesMarkerPath.size() > 0) { + context.setJobStatus(this.getClass().getSimpleName(), "generate writeStat for missing log files"); + List<Option<HoodieDeltaWriteStat>> fakeLogFileWriteStat = context.map(new ArrayList<>(logFilesMarkerPath), (logFilePath) -> { Review Comment: lets rename this. may be, additionalLogFileWriteStat ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ########## @@ -104,6 +124,84 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Op return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); } + protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata, + List<HoodieWriteStat> stats, HoodieData<WriteStatus> writeStatuses) throws IOException { + LOG.info("Committing " + instantTime + " action " + commitActionType); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + HoodieCommitMetadata fixedCommitMetadata = addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata); + // Finalize write + finalizeWrite(table, instantTime, stats); + // do save internal schema to support Implicitly add columns in write process + if (!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA) + && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) && table.getConfig().getSchemaEvolutionEnable()) { + saveInternalSchema(table, instantTime, fixedCommitMetadata); + } + // update Metadata table + writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses); + activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime), + Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } + + /* In spark mor table, any failed spark task may generate log files which are not included in write status. + * We need to add these to CommitMetadata so that it will be synced to MDT and make MDT has correct file info. + */ + private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table, String commitActionType, String instantTime, + HoodieCommitMetadata commitMetadata) throws IOException { + if (!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ) + || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION)) { + return commitMetadata; + } + + HoodieCommitMetadata metadata = commitMetadata; + WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(), table, instantTime); + // if there is log files in this delta commit, we search any invalid log files generated by failed spark task + boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats() + .values().stream().flatMap(List::stream) + .anyMatch(writeStat -> FSUtils.isLogFile(new Path(config.getBasePath(), writeStat.getPath()).getName())); + if (hasLogFileInDeltaCommit) { + // get all log files generated by log mark file + Set<String> logFilesMarkerPath = new HashSet<>(markers.appendedLogPaths(context, config.getFinalizeWriteParallelism())); + + // remove valid log files + for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat hoodieWriteStat : partitionAndWriteStats.getValue()) { + logFilesMarkerPath.remove(hoodieWriteStat.getPath()); + } + } + + // remaining are invalid log files, let's generate write stat for them + if (logFilesMarkerPath.size() > 0) { + context.setJobStatus(this.getClass().getSimpleName(), "generate writeStat for missing log files"); + List<Option<HoodieDeltaWriteStat>> fakeLogFileWriteStat = context.map(new ArrayList<>(logFilesMarkerPath), (logFilePath) -> { + FileSystem fileSystem = table.getMetaClient().getFs(); + FileStatus fileStatus; + try { + fileStatus = fileSystem.getFileStatus(new Path(config.getBasePath(), logFilePath)); Review Comment: lets move this file status/file size deduction to a private method. may be we want to optimize by doing one listing per partition. if for hdfs, we can add diff logic. But for cloud stores, this sill result in too many remote calls. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java: ########## @@ -60,6 +80,78 @@ public BaseSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, Hood super(context, config, table, instantTime, operationType, extraMetadata); } + protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat> writeStats) { Review Comment: I see why we are overriding entire commit method here. can we introduce a method called `appendMetadataForMissingFiles()`. and the default in BaseSparkCommitActionExecutor will be no-op (will return the original commit metadata as is). here in BaseSparkDeltaCommitActionExecutor, instead of over-riding entire commit method, we can override just appendMetadataForMissingFiles ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java: ########## @@ -130,6 +132,53 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa return dataFiles; } + public Set<String> appendedLogPaths(HoodieEngineContext context, int parallelism) throws IOException { Review Comment: may be we can name this createdAndAppendedLogFilePaths ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java: ########## @@ -844,6 +847,48 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>> twoUpsertCommitDataWithTwoP return Pair.of(records, records2); } + /** + * Since how markers are generated for log file changed in Version Six, we regenerate markers in the way version zero do. + * + * @param table instance of {@link HoodieTable} + */ + private void prepForUpgradeFromZeroToOne(HoodieTable table) throws IOException { + List<HoodieInstant> instantsToBeParsed = + metaClient.getActiveTimeline() + .getCommitsTimeline() + .getInstantsAsStream() + .collect(Collectors.toList()); + for (HoodieInstant instant : instantsToBeParsed) { + WriteMarkers writeMarkers = + WriteMarkersFactory.get(table.getConfig().getMarkersType(), table, instant.getTimestamp()); + Set<String> oldMarkers = writeMarkers.allMarkerFilePaths(); + boolean hasAppendMarker = oldMarkers.stream().anyMatch(marker -> marker.contains(IOType.APPEND.name()) Review Comment: why both APPEND and CREATE. I guess APPEND alone should suffice right? we need to re-generate markers only for log files right. ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java: ########## @@ -343,27 +359,73 @@ public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception { List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1); JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime); + long expectedLogFileNum = statuses.map(writeStatus -> (HoodieDeltaWriteStat) writeStatus.getStat()) + .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator()) + .count(); + // inject a fake log file to test marker file for log file + HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat) statuses.map(WriteStatus::getStat).take(1).get(0); + assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath()))); + HoodieLogFile correctLogFile = new HoodieLogFile(correctWriteStat.getPath()); + String correctWriteToken = FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath()); + + final String fakeToken = generateFakeWriteToken(correctWriteToken); + + final WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), + HoodieSparkTable.create(config, context()), newCommitTime); + HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder() + .onParentPath(FSUtils.getPartitionPath(config.getBasePath(), correctWriteStat.getPartitionPath())) + .withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime) + .withLogVersion(correctLogFile.getLogVersion()) + .withFileSize(0L) + .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs()) + .withRolloverLogWriteToken(fakeToken) + .withLogWriteToken(fakeToken) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withLogWriteCallback(new HoodieLogFileWriteCallback() { + @Override + public boolean preLogFileOpen(HoodieLogFile logFileToAppend) { + return writeMarkers.create(correctWriteStat.getPartitionPath(), logFileToAppend.getFileName(), IOType.APPEND).isPresent(); + } + + @Override + public boolean preLogFileCreate(HoodieLogFile logFileToCreate) { + return writeMarkers.create(correctWriteStat.getPartitionPath(), logFileToCreate.getFileName(), IOType.APPEND).isPresent(); + } + }).build(); + AppendResult fakeAppendResult = fakeLogWriter.appendBlock(getLogBlock(records, config.getSchema())); + fakeLogWriter.close(); + // check marker for fake log generated + assertTrue(writeMarkers.allMarkerFilePaths().stream().anyMatch(marker -> marker.contains(fakeToken))); + SyncableFileSystemView unCommittedFsView = getFileSystemViewWithUnCommittedSlices(metaClient); + // check fake log generated + assertTrue(unCommittedFsView.getAllFileSlices(correctWriteStat.getPartitionPath()) + .flatMap(FileSlice::getLogFiles).map(HoodieLogFile::getPath) + .anyMatch(path -> path.getName().equals(fakeAppendResult.logFile().getPath().getName()))); writeClient.commit(newCommitTime, statuses); HoodieTable table = HoodieSparkTable.create(config, context(), metaClient); table.getHoodieView().sync(); TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView(); - + // get log file number from filesystem view long numLogFiles = 0; for (String partitionPath : dataGen.getPartitionPaths()) { List<FileSlice> allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); assertEquals(0, allSlices.stream().filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count()); assertTrue(allSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); - long logFileCount = allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); + long logFileCount = allSlices.stream().mapToLong(fileSlice -> fileSlice.getLogFiles().count()).sum(); if (logFileCount > 0) { // check the log versions start from the base version assertTrue(allSlices.stream().map(slice -> slice.getLogFiles().findFirst().get().getLogVersion()) .allMatch(version -> version.equals(HoodieLogFile.LOGFILE_BASE_VERSION))); } numLogFiles += logFileCount; } - - assertTrue(numLogFiles > 0); + // check log file number in file system cover both valid log file and invalid log file Review Comment: minor. "check log file number in file system to cover all log files including additional log files created with spark task retries". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org