guanziyue commented on code in PR #9523: URL: https://github.com/apache/hudi/pull/9523#discussion_r1305913149
########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java: ########## @@ -343,27 +359,73 @@ public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception { List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1); JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime); + long expectedLogFileNum = statuses.map(writeStatus -> (HoodieDeltaWriteStat) writeStatus.getStat()) + .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator()) + .count(); + // inject a fake log file to test marker file for log file + HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat) statuses.map(WriteStatus::getStat).take(1).get(0); + assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath()))); + HoodieLogFile correctLogFile = new HoodieLogFile(correctWriteStat.getPath()); + String correctWriteToken = FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath()); + + final String fakeToken = generateFakeWriteToken(correctWriteToken); + + final WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), + HoodieSparkTable.create(config, context()), newCommitTime); + HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder() + .onParentPath(FSUtils.getPartitionPath(config.getBasePath(), correctWriteStat.getPartitionPath())) + .withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime) + .withLogVersion(correctLogFile.getLogVersion()) + .withFileSize(0L) + .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs()) + .withRolloverLogWriteToken(fakeToken) + .withLogWriteToken(fakeToken) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withLogWriteCallback(new HoodieLogFileWriteCallback() { + @Override + public boolean preLogFileOpen(HoodieLogFile logFileToAppend) { + return writeMarkers.create(correctWriteStat.getPartitionPath(), logFileToAppend.getFileName(), IOType.APPEND).isPresent(); + } + + @Override + public boolean preLogFileCreate(HoodieLogFile logFileToCreate) { + return writeMarkers.create(correctWriteStat.getPartitionPath(), logFileToCreate.getFileName(), IOType.APPEND).isPresent(); + } + }).build(); + AppendResult fakeAppendResult = fakeLogWriter.appendBlock(getLogBlock(records, config.getSchema())); Review Comment: fixed by new commit. ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java: ########## @@ -343,27 +359,73 @@ public void testSimpleInsertsGeneratedIntoLogFiles() throws Exception { List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1); JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime); + long expectedLogFileNum = statuses.map(writeStatus -> (HoodieDeltaWriteStat) writeStatus.getStat()) + .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator()) + .count(); + // inject a fake log file to test marker file for log file + HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat) statuses.map(WriteStatus::getStat).take(1).get(0); + assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath()))); + HoodieLogFile correctLogFile = new HoodieLogFile(correctWriteStat.getPath()); + String correctWriteToken = FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath()); + + final String fakeToken = generateFakeWriteToken(correctWriteToken); + + final WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), + HoodieSparkTable.create(config, context()), newCommitTime); + HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder() + .onParentPath(FSUtils.getPartitionPath(config.getBasePath(), correctWriteStat.getPartitionPath())) + .withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime) + .withLogVersion(correctLogFile.getLogVersion()) + .withFileSize(0L) + .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs()) + .withRolloverLogWriteToken(fakeToken) + .withLogWriteToken(fakeToken) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withLogWriteCallback(new HoodieLogFileWriteCallback() { + @Override + public boolean preLogFileOpen(HoodieLogFile logFileToAppend) { + return writeMarkers.create(correctWriteStat.getPartitionPath(), logFileToAppend.getFileName(), IOType.APPEND).isPresent(); + } + + @Override + public boolean preLogFileCreate(HoodieLogFile logFileToCreate) { + return writeMarkers.create(correctWriteStat.getPartitionPath(), logFileToCreate.getFileName(), IOType.APPEND).isPresent(); + } + }).build(); + AppendResult fakeAppendResult = fakeLogWriter.appendBlock(getLogBlock(records, config.getSchema())); + fakeLogWriter.close(); + // check marker for fake log generated + assertTrue(writeMarkers.allMarkerFilePaths().stream().anyMatch(marker -> marker.contains(fakeToken))); + SyncableFileSystemView unCommittedFsView = getFileSystemViewWithUnCommittedSlices(metaClient); + // check fake log generated + assertTrue(unCommittedFsView.getAllFileSlices(correctWriteStat.getPartitionPath()) + .flatMap(FileSlice::getLogFiles).map(HoodieLogFile::getPath) + .anyMatch(path -> path.getName().equals(fakeAppendResult.logFile().getPath().getName()))); writeClient.commit(newCommitTime, statuses); HoodieTable table = HoodieSparkTable.create(config, context(), metaClient); table.getHoodieView().sync(); TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView(); - + // get log file number from filesystem view long numLogFiles = 0; for (String partitionPath : dataGen.getPartitionPaths()) { List<FileSlice> allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); assertEquals(0, allSlices.stream().filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count()); assertTrue(allSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); - long logFileCount = allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); + long logFileCount = allSlices.stream().mapToLong(fileSlice -> fileSlice.getLogFiles().count()).sum(); if (logFileCount > 0) { // check the log versions start from the base version assertTrue(allSlices.stream().map(slice -> slice.getLogFiles().findFirst().get().getLogVersion()) .allMatch(version -> version.equals(HoodieLogFile.LOGFILE_BASE_VERSION))); } numLogFiles += logFileCount; } - - assertTrue(numLogFiles > 0); + // check log file number in file system cover both valid log file and invalid log file Review Comment: fixed by new commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org