nsivabalan commented on a change in pull request #1858:
URL: https://github.com/apache/hudi/pull/1858#discussion_r461213552



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
##########
@@ -159,24 +161,32 @@ private void rollBackIndex() {
     LOG.info("Index rolled back for commits " + instantToRollback);
   }
 
-  public List<HoodieRollbackStat> doRollbackAndGetStats() {
-    final String instantTimeToRollback = instantToRollback.getTimestamp();
-    final boolean isPendingCompaction = 
Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
-        && !instantToRollback.isCompleted();
-    validateSavepointRollbacks();
-    if (!isPendingCompaction) {
-      validateRollbackCommitSequence();
-    }
-
-    try {
-      List<HoodieRollbackStat> stats = executeRollback();
-      LOG.info("Rolled back inflight instant " + instantTimeToRollback);
+  public List<HoodieRollbackStat> mayBeRollbackAndGetStats(boolean doDelete) {
+    if(doDelete) {
+      final String instantTimeToRollback = instantToRollback.getTimestamp();
+      final boolean isPendingCompaction = 
Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
+          && !instantToRollback.isCompleted();
+      validateSavepointRollbacks();
       if (!isPendingCompaction) {
-        rollBackIndex();
+        validateRollbackCommitSequence();
+      }
+
+      try {
+        List<HoodieRollbackStat> stats = executeRollback(doDelete);
+        LOG.info("Rolled back inflight instant " + instantTimeToRollback);
+        if (!isPendingCompaction) {
+          rollBackIndex();
+        }
+        return stats;
+      } catch (IOException e) {
+        throw new HoodieIOException("Unable to execute rollback ", e);
+      }
+    } else{
+      try {
+        return executeRollback(doDelete);

Review comment:
       this is the else part where we just collect stats. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
##########
@@ -159,24 +161,32 @@ private void rollBackIndex() {
     LOG.info("Index rolled back for commits " + instantToRollback);
   }
 
-  public List<HoodieRollbackStat> doRollbackAndGetStats() {
-    final String instantTimeToRollback = instantToRollback.getTimestamp();
-    final boolean isPendingCompaction = 
Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
-        && !instantToRollback.isCompleted();
-    validateSavepointRollbacks();
-    if (!isPendingCompaction) {
-      validateRollbackCommitSequence();
-    }
-
-    try {
-      List<HoodieRollbackStat> stats = executeRollback();
-      LOG.info("Rolled back inflight instant " + instantTimeToRollback);
+  public List<HoodieRollbackStat> mayBeRollbackAndGetStats(boolean doDelete) {

Review comment:
       @vinothchandar : I have added a flag here to say where delete has to be 
done or just stats need to be collected. Since I don't want to duplicate code, 
tried my best to re-use. If you can think of any other ways, lmk. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/UpgradeDowngradeHelper.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.io.IOType;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class to assist in upgrading/downgrading Hoodie when there is a 
version change.
+ */
+public class UpgradeDowngradeHelper {
+
+  private static final Logger LOG = 
LogManager.getLogger(UpgradeDowngradeHelper.class);
+  public static final String HOODIE_ORIG_PROPERTY_FILE = 
"hoodie.properties.orig";
+
+  /**
+   * Perform Upgrade or Downgrade steps if required and updated table version 
if need be.
+   * <p>
+   * Starting from version 0.6.0, this upgrade/downgrade step will be added in 
all write paths. Essentially, if a dataset was created using any pre 0.6.0(for 
eg 0.5.3),
+   * and Hoodie version was upgraded to 0.6.0, Hoodie table version gets 
bumped to 1 and there are some upgrade steps need to be executed before doing 
any writes.
+   * Similarly, if a dataset was created using Hoodie version 0.6.0 or Hoodie 
table version 1 and then hoodie was downgraded to pre 0.6.0 or to Hoodie table 
version 0,
+   * then some downgrade steps need to be executed before proceeding w/ any 
writes.
+   * On a high level, these are the steps performed
+   * Step1 : Understand current hoodie table version and table version from 
hoodie.properties file
+   * Step2 : Fix any residues from previous upgrade/downgrade
+   * Step3 : If there are no residues, Check for version upgrade/downgrade. If 
version mismatch, perform upgrade/downgrade.
+   * Step4 : If there are residues, clean them up and skip upgrade/downgrade 
since those steps would have been completed last time.
+   * Step5 : Copy hoodie.properties -> hoodie.properties.orig
+   * Step6 : Update hoodie.properties file with current table version
+   * Step7 : Delete hoodie.properties.orig
+   * </p>
+   *
+   * @param metaClient instance of {@link HoodieTableMetaClient} to use
+   * @param toVersion version to which upgrade or downgrade has to be done.
+   */
+  public static void doUpgradeOrDowngrade(HoodieTableMetaClient metaClient, 
HoodieTableVersion toVersion, HoodieWriteConfig config, JavaSparkContext jsc) 
throws IOException {
+    // Fetch version from property file and current version
+    HoodieTableVersion versionFromPropertyFile = 
metaClient.getTableConfig().getHoodieTableVersionFromPropertyFile();
+
+    Path metaPath = new Path(metaClient.getMetaPath());
+    Path originalHoodiePropertyFile = 
getOrigHoodiePropertyFilePath(metaPath.toString());
+
+    boolean updateTableVersionInPropertyFile = false;
+
+    if (metaClient.getFs().exists(originalHoodiePropertyFile)) {
+      // if hoodie.properties.orig exists, rename to hoodie.properties and 
skip upgrade/downgrade step
+      metaClient.getFs().rename(originalHoodiePropertyFile, 
getHoodiePropertyFilePath(metaPath.toString()));
+      updateTableVersionInPropertyFile = true;
+    } else {
+      // upgrade or downgrade if there is a version mismatch
+      if (versionFromPropertyFile != toVersion) {
+        updateTableVersionInPropertyFile = true;
+        if (versionFromPropertyFile == HoodieTableVersion.ONE && toVersion == 
HoodieTableVersion.ZERO) {
+          upgradeFromZeroToOne(config, jsc.hadoopConfiguration(), jsc);
+        } else if (versionFromPropertyFile == HoodieTableVersion.ZERO && 
toVersion == HoodieTableVersion.ONE) {
+          downgradeFromOneToZero();
+        } else {
+          throw new HoodieException("Illegal state wrt table versions. Version 
from proerpty file " + versionFromPropertyFile + " and current version " + 
toVersion);
+        }
+      }
+    }
+
+    /**
+     * If table version needs to be updated in hoodie.properties file.
+     * Step1: Copy hoodie.properties to hoodie.properties.orig
+     * Step2: add table.version to hoodie.properties
+     * Step3: delete hoodie.properties.orig
+     */
+    if (updateTableVersionInPropertyFile) {
+      updateTableVersionInHoodiePropertyFile(metaClient, toVersion);
+    }
+  }
+
+  /**
+   * Upgrade steps to be done to upgrade from hoodie table version 0 to 1.
+   */
+  private static void upgradeFromZeroToOne(HoodieWriteConfig config, 
Configuration hadoopConf, JavaSparkContext jsc) {
+    // fetch pending commit info
+    HoodieTable table = HoodieTable.create(config, hadoopConf);
+    HoodieTimeline inflightTimeline = 
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
+    List<String> commits = 
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toList());
+    for (String commit : commits) {
+      // for every pending commit, delete old marker files and 
re-SparkMaincreate marker files in new format
+      recreateMarkerFiles(commit, table, jsc);
+    }
+  }
+
+  public static void recreateMarkerFiles(final String commitInstantTime, 
HoodieTable table, JavaSparkContext jsc) throws HoodieRollbackException {
+    try {
+      Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
+          .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
+          .findFirst());
+      if (commitInstantOpt.isPresent()) {
+        MarkerFiles markerFiles = new MarkerFiles(table, commitInstantTime);
+        markerFiles.quietDeleteMarkerDir();
+
+        List<HoodieRollbackStat> rollbackStats = new 
CopyOnWriteRollbackActionExecutor(jsc, table.getConfig(), table, "", 
commitInstantOpt.get(), false).mayBeRollbackAndGetStats(false);
+
+        for (HoodieRollbackStat rollbackStat : rollbackStats) {
+          for (FileStatus fileStatus : rollbackStat.getFilesToRollback()) {
+            String path = fileStatus.getPath().toString();
+            String dataFileName = path.substring(path.lastIndexOf("/") + 1);
+            markerFiles.create(rollbackStat.getPartitionPath(), dataFileName, 
path.endsWith(table.getBaseFileExtension()) ? IOType.CREATE : IOType.MERGE);

Review comment:
       yet to figure out how to differentiate CREATE and MERGE from fileStatus

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java
##########
@@ -39,12 +40,15 @@
   // Count of HoodieLogFile to commandBlocks written for a particular rollback
   private final Map<FileStatus, Long> commandBlocksCount;
 
+  private final List<FileStatus> filesToRollback;

Review comment:
       have added this to hold file status fully to be used for upgrade 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/UpgradeDowngradeHelper.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.io.IOType;
+import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class to assist in upgrading/downgrading Hoodie when there is a 
version change.
+ */
+public class UpgradeDowngradeHelper {
+
+  private static final Logger LOG = 
LogManager.getLogger(UpgradeDowngradeHelper.class);
+  public static final String HOODIE_ORIG_PROPERTY_FILE = 
"hoodie.properties.orig";
+
+  /**
+   * Perform Upgrade or Downgrade steps if required and updated table version 
if need be.
+   * <p>
+   * Starting from version 0.6.0, this upgrade/downgrade step will be added in 
all write paths. Essentially, if a dataset was created using any pre 0.6.0(for 
eg 0.5.3),
+   * and Hoodie version was upgraded to 0.6.0, Hoodie table version gets 
bumped to 1 and there are some upgrade steps need to be executed before doing 
any writes.
+   * Similarly, if a dataset was created using Hoodie version 0.6.0 or Hoodie 
table version 1 and then hoodie was downgraded to pre 0.6.0 or to Hoodie table 
version 0,
+   * then some downgrade steps need to be executed before proceeding w/ any 
writes.
+   * On a high level, these are the steps performed
+   * Step1 : Understand current hoodie table version and table version from 
hoodie.properties file
+   * Step2 : Fix any residues from previous upgrade/downgrade
+   * Step3 : If there are no residues, Check for version upgrade/downgrade. If 
version mismatch, perform upgrade/downgrade.
+   * Step4 : If there are residues, clean them up and skip upgrade/downgrade 
since those steps would have been completed last time.
+   * Step5 : Copy hoodie.properties -> hoodie.properties.orig
+   * Step6 : Update hoodie.properties file with current table version
+   * Step7 : Delete hoodie.properties.orig
+   * </p>
+   *
+   * @param metaClient instance of {@link HoodieTableMetaClient} to use
+   * @param toVersion version to which upgrade or downgrade has to be done.
+   */
+  public static void doUpgradeOrDowngrade(HoodieTableMetaClient metaClient, 
HoodieTableVersion toVersion, HoodieWriteConfig config, JavaSparkContext jsc) 
throws IOException {
+    // Fetch version from property file and current version
+    HoodieTableVersion versionFromPropertyFile = 
metaClient.getTableConfig().getHoodieTableVersionFromPropertyFile();
+
+    Path metaPath = new Path(metaClient.getMetaPath());
+    Path originalHoodiePropertyFile = 
getOrigHoodiePropertyFilePath(metaPath.toString());
+
+    boolean updateTableVersionInPropertyFile = false;
+
+    if (metaClient.getFs().exists(originalHoodiePropertyFile)) {
+      // if hoodie.properties.orig exists, rename to hoodie.properties and 
skip upgrade/downgrade step
+      metaClient.getFs().rename(originalHoodiePropertyFile, 
getHoodiePropertyFilePath(metaPath.toString()));
+      updateTableVersionInPropertyFile = true;
+    } else {
+      // upgrade or downgrade if there is a version mismatch
+      if (versionFromPropertyFile != toVersion) {
+        updateTableVersionInPropertyFile = true;
+        if (versionFromPropertyFile == HoodieTableVersion.ONE && toVersion == 
HoodieTableVersion.ZERO) {
+          upgradeFromZeroToOne(config, jsc.hadoopConfiguration(), jsc);
+        } else if (versionFromPropertyFile == HoodieTableVersion.ZERO && 
toVersion == HoodieTableVersion.ONE) {
+          downgradeFromOneToZero();
+        } else {
+          throw new HoodieException("Illegal state wrt table versions. Version 
from proerpty file " + versionFromPropertyFile + " and current version " + 
toVersion);
+        }
+      }
+    }
+
+    /**
+     * If table version needs to be updated in hoodie.properties file.
+     * Step1: Copy hoodie.properties to hoodie.properties.orig
+     * Step2: add table.version to hoodie.properties
+     * Step3: delete hoodie.properties.orig
+     */
+    if (updateTableVersionInPropertyFile) {
+      updateTableVersionInHoodiePropertyFile(metaClient, toVersion);
+    }
+  }
+
+  /**
+   * Upgrade steps to be done to upgrade from hoodie table version 0 to 1.
+   */
+  private static void upgradeFromZeroToOne(HoodieWriteConfig config, 
Configuration hadoopConf, JavaSparkContext jsc) {
+    // fetch pending commit info
+    HoodieTable table = HoodieTable.create(config, hadoopConf);
+    HoodieTimeline inflightTimeline = 
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
+    List<String> commits = 
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toList());
+    for (String commit : commits) {
+      // for every pending commit, delete old marker files and 
re-SparkMaincreate marker files in new format
+      recreateMarkerFiles(commit, table, jsc);
+    }
+  }
+
+  public static void recreateMarkerFiles(final String commitInstantTime, 
HoodieTable table, JavaSparkContext jsc) throws HoodieRollbackException {
+    try {
+      Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
+          .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
+          .findFirst());
+      if (commitInstantOpt.isPresent()) {
+        MarkerFiles markerFiles = new MarkerFiles(table, commitInstantTime);
+        markerFiles.quietDeleteMarkerDir();
+
+        List<HoodieRollbackStat> rollbackStats = new 
CopyOnWriteRollbackActionExecutor(jsc, table.getConfig(), table, "", 
commitInstantOpt.get(), false).mayBeRollbackAndGetStats(false);
+
+        for (HoodieRollbackStat rollbackStat : rollbackStats) {
+          for (FileStatus fileStatus : rollbackStat.getFilesToRollback()) {
+            String path = fileStatus.getPath().toString();
+            String dataFileName = path.substring(path.lastIndexOf("/") + 1);
+            markerFiles.create(rollbackStat.getPartitionPath(), dataFileName, 
path.endsWith(table.getBaseFileExtension()) ? IOType.CREATE : IOType.MERGE);
+          }
+          for (FileStatus fileStatus : 
rollbackStat.getCommandBlocksCount().keySet()) {

Review comment:
       I assume every entry in commandBlocks will be an APPEND. Correct me if I 
am wrong. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -130,39 +137,55 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
               1L
           );
           return new Tuple2<>(rollbackRequest.getPartitionPath(),
-                  
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                          
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
+              
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
         }
         default:
           throw new IllegalStateException("Unknown Rollback action " + 
rollbackRequest);
       }
-    }).reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect();
+    });
   }
 
 
-
   /**
    * Common method used for cleaning out base files under a partition path 
during rollback of a set of commits.
    */
-  private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
-                                                      String partitionPath, 
PathFilter filter) throws IOException {
+  private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
+      String commit, String partitionPath, boolean doDelete) throws 
IOException {
     LOG.info("Cleaning path " + partitionPath);
+    String basefileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    SerializablePathFilter filter = (path) -> {
+      if (path.toString().endsWith(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      } else if (FSUtils.isLogFile(path)) {
+        // Since the baseCommitTime is the only commit for new log files, it's 
okay here
+        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+
     final Map<FileStatus, Boolean> results = new HashMap<>();
     FileSystem fs = metaClient.getFs();
     FileStatus[] toBeDeleted = 
fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), 
filter);
     for (FileStatus file : toBeDeleted) {
-      boolean success = fs.delete(file.getPath(), false);
-      results.put(file, success);
-      LOG.info("Delete file " + file.getPath() + "\t" + success);
+      if(doDelete) {
+        boolean success = fs.delete(file.getPath(), false);
+        results.put(file, success);
+        LOG.info("Delete file " + file.getPath() + "\t" + success);
+      } else{
+        results.put(file, true);

Review comment:
       incase of just collecting stats, all files are added to success list.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -68,34 +69,38 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
    * Performs all rollback actions that we have collected in parallel.
    */
   public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, 
HoodieInstant instantToRollback, List<ListingBasedRollbackRequest> 
rollbackRequests) {
-    SerializablePathFilter filter = (path) -> {

Review comment:
       this filter was used only within one method and hence moved it within 
the resp method.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
##########
@@ -100,8 +108,13 @@ public CopyOnWriteRollbackActionExecutor(JavaSparkContext 
jsc,
   }
 
   @Override
-  protected List<HoodieRollbackStat> 
executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
+  protected List<HoodieRollbackStat> 
executeRollbackUsingFileListing(HoodieInstant instantToRollback, boolean 
doDelete) {
     List<ListingBasedRollbackRequest> rollbackRequests = 
generateRollbackRequestsByListing();
-    return new ListingBasedRollbackHelper(table.getMetaClient(), 
config).performRollback(jsc, instantToRollback, rollbackRequests);
+    ListingBasedRollbackHelper listingBasedRollbackHelper = new 
ListingBasedRollbackHelper(table.getMetaClient(), config);
+    if(doDelete) {
+      return listingBasedRollbackHelper.performRollback(jsc, 
instantToRollback, rollbackRequests);

Review comment:
       disintegrated ListingBasedRollbackHelper into two apis, performRollback 
and collectRollbackStats where first calls into 2nd. Incase of actual rollback, 
we call performRollback and incase of collecting stats for upgrade, we call 
into collectRollbackStats. I am repurposing HoodieRollbackStat to hold the info 
on file path to be rolledback. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
##########
@@ -130,39 +137,55 @@ public ListingBasedRollbackHelper(HoodieTableMetaClient 
metaClient, HoodieWriteC
               1L
           );
           return new Tuple2<>(rollbackRequest.getPartitionPath(),
-                  
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                          
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
+              
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                  
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
         }
         default:
           throw new IllegalStateException("Unknown Rollback action " + 
rollbackRequest);
       }
-    }).reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect();
+    });
   }
 
 
-
   /**
    * Common method used for cleaning out base files under a partition path 
during rollback of a set of commits.
    */
-  private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,
-                                                      String partitionPath, 
PathFilter filter) throws IOException {
+  private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient 
metaClient, HoodieWriteConfig config,

Review comment:
       can you help me understand how to differentiate between CREATE and MERGE 
in these code blocks. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
##########
@@ -80,10 +82,16 @@ public CopyOnWriteRollbackActionExecutor(JavaSparkContext 
jsc,
     if (!resolvedInstant.isRequested()) {
       // delete all the data files for this commit
       LOG.info("Clean out all base files generated for commit: " + 
resolvedInstant);
-      stats = getRollbackStrategy().execute(resolvedInstant);
+      if(doDelete) {
+        stats = getRollbackStrategy().execute(resolvedInstant);
+      } else{
+        stats = executeRollbackUsingFileListing(resolvedInstant, false);

Review comment:
       and if doDelete is false, we call into executeRollbackUsingFileListing 
directly.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
##########
@@ -159,24 +161,32 @@ private void rollBackIndex() {
     LOG.info("Index rolled back for commits " + instantToRollback);
   }
 
-  public List<HoodieRollbackStat> doRollbackAndGetStats() {
-    final String instantTimeToRollback = instantToRollback.getTimestamp();
-    final boolean isPendingCompaction = 
Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
-        && !instantToRollback.isCompleted();
-    validateSavepointRollbacks();
-    if (!isPendingCompaction) {
-      validateRollbackCommitSequence();
-    }
-
-    try {
-      List<HoodieRollbackStat> stats = executeRollback();
-      LOG.info("Rolled back inflight instant " + instantTimeToRollback);
+  public List<HoodieRollbackStat> mayBeRollbackAndGetStats(boolean doDelete) {
+    if(doDelete) {
+      final String instantTimeToRollback = instantToRollback.getTimestamp();
+      final boolean isPendingCompaction = 
Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())

Review comment:
       @vinothchandar : forgot to remind you yesterday when we discussed to 
move the collectStats method to a separate class and call directly for upgrade. 
These validations steps (validateSavepointRollbacks, 
validateRollbackCommitSequence) might be required as well right ? So, few bits 
and pieces in this class is required in upgrade step as well. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to