nsivabalan commented on a change in pull request #2421:
URL: https://github.com/apache/hudi/pull/2421#discussion_r554055131



##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
##########
@@ -181,10 +187,113 @@ public void testRestoreInstants() throws Exception {
 
     // verify modified partitions included cleaned data
     List<String> partitions = 
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1",
 10));
-    assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"}));
+    assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));
 
     partitions = 
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1",
 "4"));
-    assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
+    assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
+  }
+
+  @Test
+  public void testHoodieRestoreMetadataSerDeser() throws IOException {

Review comment:
       Check this test locally to reproduce the ser and deser issue.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -164,6 +164,7 @@ private void init(HoodieRecord record) {
         // Since the actual log file written to can be different based on when 
rollover happens, we use the
         // base file to denote some log appends happened on a slice. 
writeToken will still fence concurrent
         // writers.
+        // TODO? are these sufficient?

Review comment:
       created a follow up ticket 
https://issues.apache.org/jira/browse/HUDI-1517. not fixing in this release

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -116,14 +116,31 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
                   .withDeletedFileResults(filesToDeletedStatus).build());
         }
         case APPEND_ROLLBACK_BLOCK: {
+          // collect all log files that is supposed to be deleted with this 
rollback
+          String baseCommit = rollbackRequest.getLatestBaseInstant().get();

Review comment:
       Existing FSUtils.getAllLogFiles() expects fileId to be passed in. Hence 
have created a new method in FSUtils which lists all files disregarding fileId, 
but later filters for basecommit passed in.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
##########
@@ -794,7 +788,19 @@ private void validateMetadata(SparkRDDWriteClient client) 
throws IOException {
         if ((fsFileNames.size() != metadataFilenames.size()) || 
(!fsFileNames.equals(metadataFilenames))) {
           LOG.info("*** File system listing = " + 
Arrays.toString(fsFileNames.toArray()));
           LOG.info("*** Metadata listing = " + 
Arrays.toString(metadataFilenames.toArray()));
+
+          for (String fileName : fsFileNames) {
+            if (!metadataFilenames.contains(fileName)) {
+              LOG.error(partition + "FsFilename " + fileName + " not found in 
Meta data");

Review comment:
       its intentional. Instead of printing entire list across both actual and 
expected, this will print out just the diff. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -225,25 +216,23 @@
   /**
    * Extracts information about the deleted and append files from the {@code 
HoodieRollbackMetadata}.
    *
-   * During a rollback files may be deleted (COW, MOR) or rollback blocks be 
appended (MOR only) to files. This
-   * function will extract this change file for each partition.
+   * During a rollback files may be deleted (COW, MOR) or rollback blocks be 
appended (MOR only) to files. This function will extract this change file for 
each partition.
    *
    * @param rollbackMetadata {@code HoodieRollbackMetadata}
    * @param partitionToDeletedFiles The {@code Map} to fill with files deleted 
per partition.
    * @param partitionToAppendedFiles The {@code Map} to fill with files 
appended per partition and their sizes.
    */
   private static void processRollbackMetadata(HoodieRollbackMetadata 
rollbackMetadata,
-                                              Map<String, List<String>> 
partitionToDeletedFiles,
-                                              Map<String, Map<String, Long>> 
partitionToAppendedFiles,
-                                              Option<String> lastSyncTs) {
+      Map<String, List<String>> partitionToDeletedFiles,
+      Map<String, Map<String, Long>> partitionToAppendedFiles,
+      Option<String> lastSyncTs) {
 
     rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
       // Has this rollback produced new files?
-      boolean hasAppendFiles = 
pm.getAppendFiles().values().stream().mapToLong(Long::longValue).sum() > 0;
+      boolean hasAppendFiles = pm.getRollbackLogFiles() != null ? 
pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0 
: false;

Review comment:
       this is part of avro deser. I tried something to return empty, but 
didn't work. 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
##########
@@ -102,8 +112,10 @@ public void testMergeOnReadRollback() throws Exception {
         .withMarkerFile("partA", f2, IOType.APPEND)
         .withMarkerFile("partB", f4, IOType.APPEND);
 
+    HoodieWriteConfig writeConfig = getConfig();
     // when
-    List<HoodieRollbackStat> stats = new 
SparkMarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, 
metaClient), context, getConfig(), "002")
+    HoodieTable morTable = HoodieSparkTable.create(writeConfig, context, 
metaClient);
+    List<HoodieRollbackStat> stats = new 
SparkMarkerBasedRollbackStrategy(morTable, context, writeConfig, "002")

Review comment:
       This test is failing for now. 
   ```
   java.lang.IllegalArgumentException: Wrong FS: 
file://localhost:53936/user/sivabala/partA, expected: file:///
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:427)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
        at 
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:674)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
        at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:580)
        at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:100)
        at 
org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:579)
        at org.apache.hudi.common.fs.FSUtils.getAllLogFiles(FSUtils.java:428)
        at 
org.apache.hudi.table.action.rollback.SparkMarkerBasedRollbackStrategy.getWrittenLogFileSizeMap(SparkMarkerBasedRollbackStrategy.java:90)
        at 
org.apache.hudi.table.action.rollback.AbstractMarkerBasedRollbackStrategy.undoAppend(AbstractMarkerBasedRollbackStrategy.java:93)
        at 
org.apache.hudi.table.action.rollback.SparkMarkerBasedRollbackStrategy.lambda$execute$d4509179$1(SparkMarkerBasedRollbackStrategy.java:73)
        at 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to