nsivabalan commented on a change in pull request #2421: URL: https://github.com/apache/hudi/pull/2421#discussion_r554055131
########## File path: hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java ########## @@ -181,10 +187,113 @@ public void testRestoreInstants() throws Exception { // verify modified partitions included cleaned data List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); - assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"})); + assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"})); partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); - assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"})); + assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"})); + } + + @Test + public void testHoodieRestoreMetadataSerDeser() throws IOException { Review comment: Check this test locally to reproduce the ser and deser issue. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java ########## @@ -164,6 +164,7 @@ private void init(HoodieRecord record) { // Since the actual log file written to can be different based on when rollover happens, we use the // base file to denote some log appends happened on a slice. writeToken will still fence concurrent // writers. + // TODO? are these sufficient? Review comment: created a follow up ticket https://issues.apache.org/jira/browse/HUDI-1517. not fixing in this release ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java ########## @@ -116,14 +116,31 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteC .withDeletedFileResults(filesToDeletedStatus).build()); } case APPEND_ROLLBACK_BLOCK: { + // collect all log files that is supposed to be deleted with this rollback + String baseCommit = rollbackRequest.getLatestBaseInstant().get(); Review comment: Existing FSUtils.getAllLogFiles() expects fileId to be passed in. Hence have created a new method in FSUtils which lists all files disregarding fileId, but later filters for basecommit passed in. ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java ########## @@ -794,7 +788,19 @@ private void validateMetadata(SparkRDDWriteClient client) throws IOException { if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) { LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray())); LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray())); + + for (String fileName : fsFileNames) { + if (!metadataFilenames.contains(fileName)) { + LOG.error(partition + "FsFilename " + fileName + " not found in Meta data"); Review comment: its intentional. Instead of printing entire list across both actual and expected, this will print out just the diff. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -225,25 +216,23 @@ /** * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}. * - * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This - * function will extract this change file for each partition. + * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This function will extract this change file for each partition. * * @param rollbackMetadata {@code HoodieRollbackMetadata} * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. */ private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, - Map<String, List<String>> partitionToDeletedFiles, - Map<String, Map<String, Long>> partitionToAppendedFiles, - Option<String> lastSyncTs) { + Map<String, List<String>> partitionToDeletedFiles, + Map<String, Map<String, Long>> partitionToAppendedFiles, + Option<String> lastSyncTs) { rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { // Has this rollback produced new files? - boolean hasAppendFiles = pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0; + boolean hasAppendFiles = pm.getRollbackLogFiles() != null ? pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0 : false; Review comment: this is part of avro deser. I tried something to return empty, but didn't work. ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java ########## @@ -102,8 +112,10 @@ public void testMergeOnReadRollback() throws Exception { .withMarkerFile("partA", f2, IOType.APPEND) .withMarkerFile("partB", f4, IOType.APPEND); + HoodieWriteConfig writeConfig = getConfig(); // when - List<HoodieRollbackStat> stats = new SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002") + HoodieTable morTable = HoodieSparkTable.create(writeConfig, context, metaClient); + List<HoodieRollbackStat> stats = new SparkMarkerBasedRollbackStrategy(morTable, context, writeConfig, "002") Review comment: This test is failing for now. ``` java.lang.IllegalArgumentException: Wrong FS: file://localhost:53936/user/sivabala/partA, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82) at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:427) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557) at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:580) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:100) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:579) at org.apache.hudi.common.fs.FSUtils.getAllLogFiles(FSUtils.java:428) at org.apache.hudi.table.action.rollback.SparkMarkerBasedRollbackStrategy.getWrittenLogFileSizeMap(SparkMarkerBasedRollbackStrategy.java:90) at org.apache.hudi.table.action.rollback.AbstractMarkerBasedRollbackStrategy.undoAppend(AbstractMarkerBasedRollbackStrategy.java:93) at org.apache.hudi.table.action.rollback.SparkMarkerBasedRollbackStrategy.lambda$execute$d4509179$1(SparkMarkerBasedRollbackStrategy.java:73) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org