This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 65866c4  [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and 
Metadata table be compatible (#2422)
65866c4 is described below

commit 65866c45ec04820b01ab701e7de5cf6a406d2a8e
Author: vinoth chandar <vinothchan...@users.noreply.github.com>
AuthorDate: Sat Jan 9 16:53:34 2021 -0800

    [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be 
compatible (#2422)
    
    * [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table 
be compatible
    
    * Use filesystemview and json format from metadata. Add tests
    
    Co-authored-by: Satish Kotha <satishko...@uber.com>
---
 .../hudi/table/HoodieTimelineArchiveLog.java       |   4 +-
 .../action/clean/BaseCleanActionExecutor.java      |   4 +-
 .../hudi/table/action/clean/CleanPlanner.java      |  77 ++++++++++----
 .../hudi/table/action/rollback/RollbackUtils.java  |   1 +
 .../hudi/metadata/TestHoodieBackedMetadata.java    |  11 ++
 .../java/org/apache/hudi/table/TestCleaner.java    | 112 ++++++++++++++++++++-
 .../table/timeline/TimelineMetadataUtils.java      |   7 +-
 .../table/view/AbstractTableFileSystemView.java    |  20 ++++
 .../table/view/PriorityBasedFileSystemView.java    |  10 ++
 .../view/RemoteHoodieTableFileSystemView.java      |  30 ++++++
 .../common/table/view/TableFileSystemView.java     |  12 ++-
 .../apache/hudi/common/util/ClusteringUtils.java   |   2 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   7 ++
 .../table/view/TestHoodieTableFileSystemView.java  |   7 ++
 .../hudi/common/testutils/HoodieTestTable.java     |   5 +
 .../timeline/service/FileSystemViewHandler.java    |  15 +++
 .../service/handlers/FileSliceHandler.java         |  10 ++
 17 files changed, 301 insertions(+), 33 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 50967b1..3f4c271 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -290,10 +290,10 @@ public class HoodieTimelineArchiveLog<T extends 
HoodieAvroPayload, I, K, O> {
       LOG.info("Wrapper schema " + wrapperSchema.toString());
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
+        // TODO HUDI-1518 Cleaner now takes care of removing replaced file 
groups. This call to deleteReplacedFileGroups can be removed.
         boolean deleteSuccess = deleteReplacedFileGroups(context, 
hoodieInstant);
         if (!deleteSuccess) {
-          // throw error and stop archival if deleting replaced file groups 
failed.
-          throw new HoodieCommitException("Unable to delete file(s) for " + 
hoodieInstant.getFileName());
+          LOG.warn("Unable to delete file(s) for " + 
hoodieInstant.getFileName() + ", replaced files possibly deleted by cleaner");
         }
         try {
           deleteAnyLeftOverMarkerFiles(context, hoodieInstant);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
index 18e638e..786bf3e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
@@ -21,9 +21,9 @@ package org.apache.hudi.table.action.clean;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.avro.model.HoodieCleanFileInfo;
 import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -72,7 +72,7 @@ public abstract class BaseCleanActionExecutor<T extends 
HoodieRecordPayload, I,
       List<String> partitionsToClean = 
planner.getPartitionPathsToClean(earliestInstant);
 
       if (partitionsToClean.isEmpty()) {
-        LOG.info("Nothing to clean here. It is already clean");
+        LOG.info("Nothing to clean here.");
         return 
HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
       }
       LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", 
with policy " + config.getCleanerPolicy());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 31d433d..321f248 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -111,14 +112,14 @@ public class CleanPlanner<T extends HoodieRecordPayload, 
I, K, O> implements Ser
   /**
    * Returns list of partitions where clean operations needs to be performed.
    *
-   * @param newInstantToRetain New instant to be retained after this cleanup 
operation
+   * @param earliestRetainedInstant New instant to be retained after this 
cleanup operation
    * @return list of partitions to scan for cleaning
    * @throws IOException when underlying file-system throws this exception
    */
-  public List<String> getPartitionPathsToClean(Option<HoodieInstant> 
newInstantToRetain) throws IOException {
+  public List<String> getPartitionPathsToClean(Option<HoodieInstant> 
earliestRetainedInstant) throws IOException {
     switch (config.getCleanerPolicy()) {
       case KEEP_LATEST_COMMITS:
-        return getPartitionPathsForCleanByCommits(newInstantToRetain);
+        return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
       case KEEP_LATEST_FILE_VERSIONS:
         return getPartitionPathsForFullCleaning();
       default:
@@ -168,10 +169,16 @@ public class CleanPlanner<T extends HoodieRecordPayload, 
I, K, O> implements Ser
             cleanMetadata.getEarliestCommitToRetain()) && 
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
             HoodieTimeline.LESSER_THAN, 
newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
               try {
-                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-                    
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
-                        HoodieCommitMetadata.class);
-                return 
commitMetadata.getPartitionToWriteStats().keySet().stream();
+                if 
(HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
+                  HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+                      
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
+                  return 
Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(),
 replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
+                } else {
+                  HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+                      
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
+                          HoodieCommitMetadata.class);
+                  return 
commitMetadata.getPartitionToWriteStats().keySet().stream();
+                }
               } catch (IOException e) {
                 throw new HoodieIOException(e.getMessage(), e);
               }
@@ -196,13 +203,17 @@ public class CleanPlanner<T extends HoodieRecordPayload, 
I, K, O> implements Ser
   private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String 
partitionPath) {
     LOG.info("Cleaning " + partitionPath + ", retaining latest " + 
config.getCleanerFileVersionsRetained()
         + " file versions. ");
-    List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
     List<CleanFileInfo> deletePaths = new ArrayList<>();
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
         .flatMap(this::getSavepointedDataFiles)
         .collect(Collectors.toList());
 
+    // In this scenario, we will assume that once replaced a file group 
automatically becomes eligible for cleaning completely
+    // In other words, the file versions only apply to the active file groups.
+    deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, 
partitionPath, Option.empty()));
+
+    List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
     for (HoodieFileGroup fileGroup : fileGroups) {
       int keepVersions = config.getCleanerFileVersionsRetained();
       // do not cleanup slice required for pending compaction
@@ -226,18 +237,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, 
I, K, O> implements Ser
       // Delete the remaining files
       while (fileSliceIterator.hasNext()) {
         FileSlice nextSlice = fileSliceIterator.next();
-        if (nextSlice.getBaseFile().isPresent()) {
-          HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
-          deletePaths.add(new CleanFileInfo(dataFile.getPath(), false));
-          if (dataFile.getBootstrapBaseFile().isPresent() && 
config.shouldCleanBootstrapBaseFile()) {
-            deletePaths.add(new 
CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
-          }
-        }
-        if (hoodieTable.getMetaClient().getTableType() == 
HoodieTableType.MERGE_ON_READ) {
-          // If merge on read, then clean the log files for the commits as well
-          deletePaths.addAll(nextSlice.getLogFiles().map(lf -> new 
CleanFileInfo(lf.getPath().toString(), false))
-              .collect(Collectors.toList()));
-        }
+        deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
       }
     }
     return deletePaths;
@@ -269,7 +269,11 @@ public class CleanPlanner<T extends HoodieRecordPayload, 
I, K, O> implements Ser
 
     // determine if we have enough commits, to start cleaning.
     if (commitTimeline.countInstants() > commitsRetained) {
-      HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get();
+      Option<HoodieInstant> earliestCommitToRetainOption = 
getEarliestCommitToRetain();
+      HoodieInstant earliestCommitToRetain = 
earliestCommitToRetainOption.get();
+      // all replaced file groups before earliestCommitToRetain are eligible 
to clean
+      deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, 
partitionPath, earliestCommitToRetainOption));
+      // add active files
       List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
       for (HoodieFileGroup fileGroup : fileGroups) {
         List<FileSlice> fileSliceList = 
fileGroup.getAllFileSlices().collect(Collectors.toList());
@@ -322,6 +326,20 @@ public class CleanPlanner<T extends HoodieRecordPayload, 
I, K, O> implements Ser
     }
     return deletePaths;
   }
+  
+  private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> 
savepointedFiles, String partitionPath, Option<HoodieInstant> 
earliestCommitToRetain) {
+    final Stream<HoodieFileGroup> replacedGroups;
+    if (earliestCommitToRetain.isPresent()) {
+      replacedGroups = 
fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(),
 partitionPath);
+    } else {
+      replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
+    }
+    return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
+        // do not delete savepointed files  (archival will make sure 
corresponding replacecommit file is not deleted)
+        .filter(slice -> !slice.getBaseFile().isPresent() || 
!savepointedFiles.contains(slice.getBaseFile().get().getFileName()))
+        .flatMap(slice -> getCleanFileInfoForSlice(slice).stream())
+        .collect(Collectors.toList());
+  }
 
   /**
    * Gets the latest version < instantTime. This version file could still be 
used by queries.
@@ -339,6 +357,23 @@ public class CleanPlanner<T extends HoodieRecordPayload, 
I, K, O> implements Ser
     return null;
   }
 
+  private List<CleanFileInfo> getCleanFileInfoForSlice(FileSlice nextSlice) {
+    List<CleanFileInfo> cleanPaths = new ArrayList<>();
+    if (nextSlice.getBaseFile().isPresent()) {
+      HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
+      cleanPaths.add(new CleanFileInfo(dataFile.getPath(), false));
+      if (dataFile.getBootstrapBaseFile().isPresent() && 
config.shouldCleanBootstrapBaseFile()) {
+        cleanPaths.add(new 
CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
+      }
+    }
+    if (hoodieTable.getMetaClient().getTableType() == 
HoodieTableType.MERGE_ON_READ) {
+      // If merge on read, then clean the log files for the commits as well
+      cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new 
CleanFileInfo(lf.getPath().toString(), false))
+          .collect(Collectors.toList()));
+    }
+    return cleanPaths;
+  }
+
   /**
    * Returns files to be cleaned for the given partitionPath based on cleaning 
policy.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index 18f284e..ee7f4dd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -122,6 +122,7 @@ public class RollbackUtils {
       List<ListingBasedRollbackRequest> partitionRollbackRequests = new 
ArrayList<>();
       switch (instantToRollback.getAction()) {
         case HoodieTimeline.COMMIT_ACTION:
+        case HoodieTimeline.REPLACE_COMMIT_ACTION:
           LOG.info("Rolling back commit action.");
           partitionRollbackRequests.add(
               
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index 32cec71..5932236 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -498,6 +499,15 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
       writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
       assertFalse(metadata(client).isInSync());
+      
+      // insert overwrite to test replacecommit
+      newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      client.startCommitWithTime(newCommitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+      records = dataGen.generateInserts(newCommitTime, 5);
+      HoodieWriteResult replaceResult = 
client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
+      writeStatuses = replaceResult.getWriteStatuses().collect();
+      assertNoWriteErrors(writeStatuses);
+      assertFalse(metadata(client).isInSync());
     }
 
     // Enable metadata table and ensure it is synced
@@ -800,6 +810,7 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
 
         // FileSystemView should expose the same data
         List<HoodieFileGroup> fileGroups = 
tableView.getAllFileGroups(partition).collect(Collectors.toList());
+        
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));
 
         fileGroups.forEach(g -> 
LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
         fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> 
LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 69c6f98..3a5d737 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
@@ -38,9 +40,11 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -57,6 +61,7 @@ import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -65,9 +70,6 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndex;
 import org.apache.hudi.table.action.clean.CleanPlanner;
 import org.apache.hudi.testutils.HoodieClientTestBase;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -76,6 +78,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import scala.Tuple3;
 
 import java.io.File;
 import java.io.IOException;
@@ -96,8 +99,6 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import scala.Tuple3;
-
 import static 
org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes;
 import static 
org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
@@ -687,6 +688,107 @@ public class TestCleaner extends HoodieClientTestBase {
     assertTrue(testTable.baseFileExists(p0, "002", file1P0));
     assertTrue(testTable.logFileExists(p0, "002", file1P0, 4));
   }
+  
+  @Test
+  public void testCleanWithReplaceCommits() throws Exception {
+    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
+        .build();
+
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    String p0 = "2020/01/01";
+    String p1 = "2020/01/02";
+
+    // make 1 commit, with 1 file per partition
+    String file1P0C0 = UUID.randomUUID().toString();
+    String file1P1C0 = UUID.randomUUID().toString();
+    testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, 
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
+
+    HoodieCommitMetadata commitMetadata = generateCommitMetadata(
+        Collections.unmodifiableMap(new HashMap<String, List<String>>() {
+          {
+            put(p0, CollectionUtils.createImmutableList(file1P0C0));
+            put(p1, CollectionUtils.createImmutableList(file1P1C0));
+          }
+        })
+    );
+    metaClient.getActiveTimeline().saveAsComplete(
+        new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
"00000000000001"),
+        
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
+    assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions 
and clean any files");
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next replacecommit, with 1 clustering operation. logically delete 
p0. No change to p1
+    Map<String, String> partitionAndFileId002 = 
testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
+    String file2P0C1 = partitionAndFileId002.get(p0);
+    testTable.addReplaceCommit("00000000000002", 
generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1));
+
+    // run cleaner
+    List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
+    assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions 
and clean any files");
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next replacecommit, with 1 clustering operation. Replace data in 
p1. No change to p0
+    Map<String, String> partitionAndFileId003 = 
testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
+    String file3P1C2 = partitionAndFileId003.get(p1);
+    testTable.addReplaceCommit("00000000000003", 
generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2));
+
+    // run cleaner
+    List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
+    assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any 
partitions and clean any files");
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next replacecommit, with 1 clustering operation. Replace data in 
p0 again
+    Map<String, String> partitionAndFileId004 = 
testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
+    String file4P0C3 = partitionAndFileId004.get(p0);
+    testTable.addReplaceCommit("00000000000004", 
generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3));
+
+    // run cleaner
+    List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
+    assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+    assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    //file1P1C0 still stays because its not replaced until 3 and its the only 
version available
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next replacecommit, with 1 clustering operation. Replace all data 
in p1. no new files created
+    Map<String, String> partitionAndFileId005 = 
testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1);
+    String file4P1C4 = partitionAndFileId005.get(p1);
+    testTable.addReplaceCommit("00000000000005", 
generateReplaceCommitMetadata(p1, file3P1C2, file4P1C4));
+    
+    List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 2);
+    assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+    assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+  }
+  
+  private HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String 
partition, String replacedFileId, String newFileId) {
+    HoodieReplaceCommitMetadata replaceMetadata = new 
HoodieReplaceCommitMetadata();
+    replaceMetadata.addReplaceFileId(partition, replacedFileId);
+    replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
+    if (!StringUtils.isNullOrEmpty(newFileId)) {
+      HoodieWriteStat writeStat = new HoodieWriteStat();
+      writeStat.setPartitionPath(partition);
+      writeStat.setPath(newFileId);
+      writeStat.setFileId(newFileId);
+      replaceMetadata.addWriteStat(partition, writeStat);
+    }
+    return replaceMetadata;
+  }
 
   @Test
   public void testCleanMetadataUpgradeDowngrade() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 32e60c3..962d69d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -33,6 +33,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieInstantInfo;
+import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -158,10 +159,14 @@ public class TimelineMetadataUtils {
     return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
   }
 
-  public static HoodieRequestedReplaceMetadata 
deserializeRequestedReplaceMetadta(byte[] bytes) throws IOException {
+  public static HoodieRequestedReplaceMetadata 
deserializeRequestedReplaceMetadata(byte[] bytes) throws IOException {
     return deserializeAvroMetadata(bytes, 
HoodieRequestedReplaceMetadata.class);
   }
 
+  public static HoodieReplaceCommitMetadata 
deserializeHoodieReplaceMetadata(byte[] bytes) throws IOException {
+    return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class);
+  }
+
   public static <T extends SpecificRecordBase> T 
deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
       throws IOException {
     DatumReader<T> reader = new SpecificDatumReader<>(clazz);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 65e9231..3f45715 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -62,6 +62,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
 
@@ -691,6 +692,16 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
   }
 
   @Override
+  public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String 
maxCommitTime, String partitionPath) {
+    return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> 
isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
+  }
+
+  @Override
+  public Stream<HoodieFileGroup> getAllReplacedFileGroups(String 
partitionPath) {
+    return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> 
isFileGroupReplaced(fg.getFileGroupId()));
+  }
+
+  @Override
   public final Stream<Pair<HoodieFileGroupId, HoodieInstant>> 
getFileGroupsInPendingClustering() {
     try {
       readLock.lock();
@@ -1041,6 +1052,15 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     return isFileGroupReplacedBeforeOrOn(fileGroupId, 
instants.stream().max(Comparator.naturalOrder()).get());
   }
 
+  private boolean isFileGroupReplacedBefore(HoodieFileGroupId fileGroupId, 
String instant) {
+    Option<HoodieInstant> hoodieInstantOption = getReplaceInstant(fileGroupId);
+    if (!hoodieInstantOption.isPresent()) {
+      return false;
+    }
+
+    return HoodieTimeline.compareTimestamps(instant, GREATER_THAN, 
hoodieInstantOption.get().getTimestamp());
+  }
+  
   private boolean isFileGroupReplacedBeforeOrOn(HoodieFileGroupId fileGroupId, 
String instant) {
     Option<HoodieInstant> hoodieInstantOption = getReplaceInstant(fileGroupId);
     if (!hoodieInstantOption.isPresent()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index f7244ee..3783d00 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -200,6 +200,16 @@ public class PriorityBasedFileSystemView implements 
SyncableFileSystemView, Seri
   }
 
   @Override
+  public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String 
maxCommitTime, String partitionPath) {
+    return execute(maxCommitTime, partitionPath, 
preferredView::getReplacedFileGroupsBefore, 
secondaryView::getReplacedFileGroupsBefore);
+  }
+
+  @Override
+  public Stream<HoodieFileGroup> getAllReplacedFileGroups(String 
partitionPath) {
+    return execute(partitionPath, preferredView::getAllReplacedFileGroups, 
secondaryView::getAllReplacedFileGroups);
+  }
+
+  @Override
   public Stream<Pair<String, CompactionOperation>> 
getPendingCompactionOperations() {
     return execute(preferredView::getPendingCompactionOperations, 
secondaryView::getPendingCompactionOperations);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 91a28a8..23b0536 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -91,6 +91,12 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON =
       String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
 
+  public static final String ALL_REPLACED_FILEGROUPS_BEFORE =
+      String.format("%s/%s", BASE_URL, "filegroups/replaced/before/");
+
+  public static final String ALL_REPLACED_FILEGROUPS_PARTITION =
+      String.format("%s/%s", BASE_URL, "filegroups/replaced/partition/");
+  
   public static final String PENDING_CLUSTERING_FILEGROUPS = 
String.format("%s/%s", BASE_URL, "clustering/pending/");
 
 
@@ -380,6 +386,30 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     }
   }
 
+  @Override
+  public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String 
maxCommitTime, String partitionPath) {
+    Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
+    try {
+      List<FileGroupDTO> fileGroups = 
executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap,
+          new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
+      return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, 
metaClient));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  @Override
+  public Stream<HoodieFileGroup> getAllReplacedFileGroups(String 
partitionPath) {
+    Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+    try {
+      List<FileGroupDTO> fileGroups = 
executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap,
+          new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
+      return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, 
metaClient));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
   public boolean refresh() {
     Map<String, String> paramsMap = getParams();
     try {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 504f95a..7330286 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -167,11 +167,21 @@ public interface TableFileSystemView {
   HoodieTimeline getTimeline();
 
   /**
-   * Stream all the replaced file groups before maxCommitTime.
+   * Stream all the replaced file groups before or on maxCommitTime for given 
partition.
    */
   Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String 
maxCommitTime, String partitionPath);
 
   /**
+   * Stream all the replaced file groups before maxCommitTime for given 
partition.
+   */
+  Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, 
String partitionPath);
+
+  /**
+   * Stream all the replaced file groups for given partition.
+   */
+  Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath);
+
+  /**
    * Filegroups that are in pending clustering.
    */
   Stream<Pair<HoodieFileGroupId, HoodieInstant>> 
getFileGroupsInPendingClustering();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index fcc3274..70dfa2a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -86,7 +86,7 @@ public class ClusteringUtils {
         LOG.warn("No content found in requested file for instant " + 
pendingReplaceInstant);
         return Option.empty();
       }
-      HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get());
+      HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(content.get());
       if 
(WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType()))
 {
         return Option.of(Pair.of(pendingReplaceInstant, 
requestedReplaceMetadata.getClusteringPlan()));
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 115001a..ed2a878 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -92,6 +93,12 @@ public class HoodieTableMetadataUtil {
       case HoodieTimeline.SAVEPOINT_ACTION:
         // Nothing to be done here
         break;
+      case HoodieTimeline.REPLACE_COMMIT_ACTION:
+        HoodieReplaceCommitMetadata replaceMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+            timeline.getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
+        // Note: we only add new files created here. Replaced files are 
removed from metadata later by cleaner.
+        records = Option.of(convertMetadataToRecords(replaceMetadata, 
instant.getTimestamp()));
+        break;
       default:
         throw new HoodieException("Unknown type of action " + 
instant.getAction());
     }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 3fceee3..e103427 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -1356,6 +1356,13 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     List<HoodieFileGroup> allReplaced = 
fsView.getReplacedFileGroupsBeforeOrOn("2", 
partitionPath1).collect(Collectors.toList());
     assertEquals(1, allReplaced.size());
     assertEquals(fileId1, allReplaced.get(0).getFileGroupId().getFileId());
+
+    allReplaced = fsView.getReplacedFileGroupsBefore("2", 
partitionPath1).collect(Collectors.toList());
+    assertEquals(0, allReplaced.size());
+
+    allReplaced = 
fsView.getAllReplacedFileGroups(partitionPath1).collect(Collectors.toList());
+    assertEquals(1, allReplaced.size());
+    assertEquals(fileId1, allReplaced.get(0).getFileGroupId().getFileId());
   }
 
   @Test
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 3663917..858e113 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -228,6 +228,11 @@ public class HoodieTestTable {
     return this;
   }
 
+  public HoodieTestTable forReplaceCommit(String instantTime) {
+    currentInstantTime = instantTime;
+    return this;
+  }
+
   public HoodieTestTable forCompaction(String instantTime) {
     currentInstantTime = instantTime;
     return this;
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
index e008fc5..b3e860a 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
@@ -299,6 +299,21 @@ public class FileSystemViewHandler {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE, 
new ViewHandler(ctx -> {
+      List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBefore(
+          
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
+          ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""),
+          ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
+      writeValueAsString(ctx, dtos);
+    }, true));
+
+    app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION, 
new ViewHandler(ctx -> {
+      List<FileGroupDTO> dtos = sliceHandler.getAllReplacedFileGroups(
+          
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
+          ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
+      writeValueAsString(ctx, dtos);
+    }, true));
+
     app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new 
ViewHandler(ctx -> {
       List<ClusteringOpDTO> dtos = 
sliceHandler.getFileGroupsInPendingClustering(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index 18c5eb1..2180e4e 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -94,6 +94,16 @@ public class FileSliceHandler extends Handler {
         .collect(Collectors.toList());
   }
 
+  public List<FileGroupDTO> getReplacedFileGroupsBefore(String basePath, 
String maxCommitTime, String partitionPath) {
+    return 
viewManager.getFileSystemView(basePath).getReplacedFileGroupsBefore(maxCommitTime,
 partitionPath).map(FileGroupDTO::fromFileGroup)
+        .collect(Collectors.toList());
+  }
+  
+  public List<FileGroupDTO> getAllReplacedFileGroups(String basePath, String 
partitionPath) {
+    return 
viewManager.getFileSystemView(basePath).getAllReplacedFileGroups(partitionPath).map(FileGroupDTO::fromFileGroup)
+        .collect(Collectors.toList());
+  }
+
   public List<ClusteringOpDTO> getFileGroupsInPendingClustering(String 
basePath) {
     return 
viewManager.getFileSystemView(basePath).getFileGroupsInPendingClustering()
         .map(fgInstant -> 
ClusteringOpDTO.fromClusteringOp(fgInstant.getLeft(), fgInstant.getRight()))

Reply via email to