guanziyue commented on code in PR #9523:
URL: https://github.com/apache/hudi/pull/9523#discussion_r1305913149


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java:
##########
@@ -343,27 +359,73 @@ public void testSimpleInsertsGeneratedIntoLogFiles() 
throws Exception {
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
       JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, 
newCommitTime);
+      long expectedLogFileNum = statuses.map(writeStatus -> 
(HoodieDeltaWriteStat) writeStatus.getStat())
+          .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator())
+          .count();
+      // inject a fake log file to test marker file for log file
+      HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat) 
statuses.map(WriteStatus::getStat).take(1).get(0);
+      assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath())));
+      HoodieLogFile correctLogFile = new 
HoodieLogFile(correctWriteStat.getPath());
+      String correctWriteToken = 
FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath());
+
+      final String fakeToken = generateFakeWriteToken(correctWriteToken);
+
+      final WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(),
+          HoodieSparkTable.create(config, context()), newCommitTime);
+      HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder()
+          .onParentPath(FSUtils.getPartitionPath(config.getBasePath(), 
correctWriteStat.getPartitionPath()))
+          
.withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime)
+          .withLogVersion(correctLogFile.getLogVersion())
+          .withFileSize(0L)
+          .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs())
+          .withRolloverLogWriteToken(fakeToken)
+          .withLogWriteToken(fakeToken)
+          .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+          .withLogWriteCallback(new HoodieLogFileWriteCallback() {
+            @Override
+            public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+              return writeMarkers.create(correctWriteStat.getPartitionPath(), 
logFileToAppend.getFileName(), IOType.APPEND).isPresent();
+            }
+
+            @Override
+            public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+              return writeMarkers.create(correctWriteStat.getPartitionPath(), 
logFileToCreate.getFileName(), IOType.APPEND).isPresent();
+            }
+          }).build();
+      AppendResult fakeAppendResult = 
fakeLogWriter.appendBlock(getLogBlock(records, config.getSchema()));

Review Comment:
   fixed by new commit.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java:
##########
@@ -343,27 +359,73 @@ public void testSimpleInsertsGeneratedIntoLogFiles() 
throws Exception {
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
       JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, 
newCommitTime);
+      long expectedLogFileNum = statuses.map(writeStatus -> 
(HoodieDeltaWriteStat) writeStatus.getStat())
+          .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator())
+          .count();
+      // inject a fake log file to test marker file for log file
+      HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat) 
statuses.map(WriteStatus::getStat).take(1).get(0);
+      assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath())));
+      HoodieLogFile correctLogFile = new 
HoodieLogFile(correctWriteStat.getPath());
+      String correctWriteToken = 
FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath());
+
+      final String fakeToken = generateFakeWriteToken(correctWriteToken);
+
+      final WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(),
+          HoodieSparkTable.create(config, context()), newCommitTime);
+      HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder()
+          .onParentPath(FSUtils.getPartitionPath(config.getBasePath(), 
correctWriteStat.getPartitionPath()))
+          
.withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime)
+          .withLogVersion(correctLogFile.getLogVersion())
+          .withFileSize(0L)
+          .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs())
+          .withRolloverLogWriteToken(fakeToken)
+          .withLogWriteToken(fakeToken)
+          .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+          .withLogWriteCallback(new HoodieLogFileWriteCallback() {
+            @Override
+            public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+              return writeMarkers.create(correctWriteStat.getPartitionPath(), 
logFileToAppend.getFileName(), IOType.APPEND).isPresent();
+            }
+
+            @Override
+            public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+              return writeMarkers.create(correctWriteStat.getPartitionPath(), 
logFileToCreate.getFileName(), IOType.APPEND).isPresent();
+            }
+          }).build();
+      AppendResult fakeAppendResult = 
fakeLogWriter.appendBlock(getLogBlock(records, config.getSchema()));
+      fakeLogWriter.close();
+      // check marker for fake log generated
+      assertTrue(writeMarkers.allMarkerFilePaths().stream().anyMatch(marker -> 
marker.contains(fakeToken)));
+      SyncableFileSystemView unCommittedFsView = 
getFileSystemViewWithUnCommittedSlices(metaClient);
+      // check fake log generated
+      
assertTrue(unCommittedFsView.getAllFileSlices(correctWriteStat.getPartitionPath())
+          .flatMap(FileSlice::getLogFiles).map(HoodieLogFile::getPath)
+          .anyMatch(path -> 
path.getName().equals(fakeAppendResult.logFile().getPath().getName())));
       writeClient.commit(newCommitTime, statuses);
 
       HoodieTable table = HoodieSparkTable.create(config, context(), 
metaClient);
       table.getHoodieView().sync();
       TableFileSystemView.SliceView tableRTFileSystemView = 
table.getSliceView();
-
+      // get log file number from filesystem view
       long numLogFiles = 0;
       for (String partitionPath : dataGen.getPartitionPaths()) {
         List<FileSlice> allSlices = 
tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
         assertEquals(0, allSlices.stream().filter(fileSlice -> 
fileSlice.getBaseFile().isPresent()).count());
         assertTrue(allSlices.stream().anyMatch(fileSlice -> 
fileSlice.getLogFiles().count() > 0));
-        long logFileCount = allSlices.stream().filter(fileSlice -> 
fileSlice.getLogFiles().count() > 0).count();
+        long logFileCount = allSlices.stream().mapToLong(fileSlice -> 
fileSlice.getLogFiles().count()).sum();
         if (logFileCount > 0) {
           // check the log versions start from the base version
           assertTrue(allSlices.stream().map(slice -> 
slice.getLogFiles().findFirst().get().getLogVersion())
               .allMatch(version -> 
version.equals(HoodieLogFile.LOGFILE_BASE_VERSION)));
         }
         numLogFiles += logFileCount;
       }
-
-      assertTrue(numLogFiles > 0);
+      // check log file number in file system cover both valid log file and 
invalid log file

Review Comment:
   fixed by new commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to