nsivabalan commented on a change in pull request #3873:
URL: https://github.com/apache/hudi/pull/3873#discussion_r740454888



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -645,4 +612,83 @@ protected void doClean(AbstractHoodieWriteClient 
writeClient, String instantTime
     // metadata table.
     writeClient.clean(instantTime + "002");
   }
+
+  /**
+   * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
+   *
+   */
+  protected abstract void commit(List<HoodieRecord> records, String 
partitionName, String instantTime);
+
+  /**
+   * Commit the partition to file listing information to Metadata Table as a 
new delta-commit.
+   *
+   */
+  protected abstract void commit(List<DirectoryInfo> dirInfoList, String 
createInstantTime);
+
+
+  /**
+   * A class which represents a directory and the files and directories inside 
it.
+   *
+   * A {@code PartitionFileInfo} object saves the name of the partition and 
various properties requires of each file
+   * required for bootstrapping the metadata table. Saving limited properties 
reduces the total memory footprint when
+   * a very large number of files are present in the dataset being 
bootstrapped.
+   */
+  public static class DirectoryInfo implements Serializable {
+    // Relative path of the directory (relative to the base directory)
+    private String relativePath;
+    // List of filenames within this partition
+    private List<String> filenames;
+    // Length of the various files
+    private List<Long> filelengths;
+    // List of directories within this partition
+    private List<Path> subdirs = new ArrayList<>();
+    // Is this a HUDI partition
+    private boolean isPartition = false;
+
+    public DirectoryInfo(String relativePath, FileStatus[] fileStatus) {
+      this.relativePath = relativePath;
+
+      // Pre-allocate with the maximum length possible
+      filenames = new ArrayList<>(fileStatus.length);
+      filelengths = new ArrayList<>(fileStatus.length);
+
+      for (FileStatus status : fileStatus) {
+        if (status.isDirectory()) {
+          this.subdirs.add(status.getPath());
+        } else if 
(status.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
 {
+          // Presence of partition meta file implies this is a HUDI partition
+          this.isPartition = true;
+        } else if (FSUtils.isDataFile(status.getPath())) {
+          // Regular HUDI data file (base file or log file)
+          filenames.add(status.getPath().getName());
+          filelengths.add(status.getLen());
+        }
+      }
+    }
+
+    public String getRelativePath() {
+      return relativePath;
+    }
+
+    public int getTotalFiles() {
+      return filenames.size();
+    }
+
+    public boolean isPartition() {
+      return isPartition;
+    }
+
+    public List<Path> getSubdirs() {
+      return subdirs;
+    }
+
+    // Returns a map of filenames mapped to their lengths
+    public Map<String, Long> getFileMap() {

Review comment:
       if the caller is always going to be interested in, map of file name to 
length, can we populate the map directly in the constructor of DirectoryInfo() 
and not have two separate lists only. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean 
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
    * @param dataMetaClient
    * @return Map of partition names to a list of FileStatus for all the files 
in the partition
    */
-  private Map<String, List<FileStatus>> 
getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+  private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient 
datasetMetaClient) {
     List<Path> pathsToList = new LinkedList<>();
     pathsToList.add(new Path(dataWriteConfig.getBasePath()));
 
-    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    List<DirectoryInfo> foundPartitionsList = new LinkedList<>();
     final int fileListingParallelism = 
metadataWriteConfig.getFileListingParallelism();
     SerializableConfiguration conf = new 
SerializableConfiguration(dataMetaClient.getHadoopConf());
     final String dirFilterRegex = 
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+    final String datasetBasePath = dataMetaClient.getBasePath();
 
     while (!pathsToList.isEmpty()) {
-      int listingParallelism = Math.min(fileListingParallelism, 
pathsToList.size());
+      // In each round we will list a section of directories
+      int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
       // List all directories in parallel
-      List<Pair<Path, FileStatus[]>> dirToFileListing = 
engineContext.map(pathsToList, path -> {
+      List<DirectoryInfo> foundDirsList = 
engineContext.map(pathsToList.subList(0,  numDirsToList), path -> {
         FileSystem fs = path.getFileSystem(conf.get());
-        return Pair.of(path, fs.listStatus(path));
-      }, listingParallelism);
-      pathsToList.clear();
+        String relativeDirPath = FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), path);
+        return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
+      }, numDirsToList);
+
+      pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, 
pathsToList.size()));
 
       // If the listing reveals a directory, add it to queue. If the listing 
reveals a hoodie partition, add it to
       // the results.
-      dirToFileListing.forEach(p -> {
-        if (!dirFilterRegex.isEmpty() && 
p.getLeft().getName().matches(dirFilterRegex)) {
-          LOG.info("Ignoring directory " + p.getLeft() + " which matches the 
filter regex " + dirFilterRegex);
-          return;
+      for (DirectoryInfo dirInfo : foundDirsList) {
+        if (!dirFilterRegex.isEmpty()) {
+          final String relativePath = dirInfo.getRelativePath();
+          if (!relativePath.isEmpty()) {
+            Path partitionPath = new Path(datasetBasePath, relativePath);
+            if (partitionPath.getName().matches(dirFilterRegex)) {
+              LOG.info("Ignoring directory " + partitionPath + " which matches 
the filter regex " + dirFilterRegex);
+              continue;
+            }
+          }
         }
 
-        List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
-            .filter(fs -> 
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
-            .collect(Collectors.toList());
-
-        if (p.getRight().length > filesInDir.size()) {
-          String partitionName = FSUtils.getRelativePartitionPath(new 
Path(dataMetaClient.getBasePath()), p.getLeft());
-          // deal with Non-partition table, we should exclude .hoodie
-          partitionToFileStatus.put(partitionName, filesInDir.stream()
-              .filter(f -> 
!f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
+        if (dirInfo.isPartition()) {
+          // Add to result
+          foundPartitionsList.add(dirInfo);
         } else {
           // Add sub-dirs to the queue
-          pathsToList.addAll(Arrays.stream(p.getRight())
-              .filter(fs -> fs.isDirectory() && 
!fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
-              .map(fs -> fs.getPath())
-              .collect(Collectors.toList()));
+          pathsToList.addAll(dirInfo.getSubdirs());

Review comment:
       Also, I see this in master before this patch 
   ```
   if (p.getRight().length > filesInDir.size()) {
             String partitionName = FSUtils.getRelativePartitionPath(new 
Path(dataMetaClient.getBasePath()), p.getLeft());
             // deal with Non-partition table, we should exclude .hoodie
             partitionToFileStatus.put(partitionName, filesInDir.stream()
                 .filter(f -> 
!f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
           }
   ```
   may I know how we are handling the same in this patch? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean 
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
    * @param dataMetaClient
    * @return Map of partition names to a list of FileStatus for all the files 
in the partition
    */
-  private Map<String, List<FileStatus>> 
getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+  private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient 
datasetMetaClient) {
     List<Path> pathsToList = new LinkedList<>();
     pathsToList.add(new Path(dataWriteConfig.getBasePath()));
 
-    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    List<DirectoryInfo> foundPartitionsList = new LinkedList<>();
     final int fileListingParallelism = 
metadataWriteConfig.getFileListingParallelism();
     SerializableConfiguration conf = new 
SerializableConfiguration(dataMetaClient.getHadoopConf());
     final String dirFilterRegex = 
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+    final String datasetBasePath = dataMetaClient.getBasePath();
 
     while (!pathsToList.isEmpty()) {
-      int listingParallelism = Math.min(fileListingParallelism, 
pathsToList.size());
+      // In each round we will list a section of directories
+      int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
       // List all directories in parallel
-      List<Pair<Path, FileStatus[]>> dirToFileListing = 
engineContext.map(pathsToList, path -> {
+      List<DirectoryInfo> foundDirsList = 
engineContext.map(pathsToList.subList(0,  numDirsToList), path -> {

Review comment:
       minor: can we name this `processedDirectories`

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean 
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
    * @param dataMetaClient
    * @return Map of partition names to a list of FileStatus for all the files 
in the partition
    */
-  private Map<String, List<FileStatus>> 
getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+  private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient 
datasetMetaClient) {
     List<Path> pathsToList = new LinkedList<>();
     pathsToList.add(new Path(dataWriteConfig.getBasePath()));
 
-    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    List<DirectoryInfo> foundPartitionsList = new LinkedList<>();
     final int fileListingParallelism = 
metadataWriteConfig.getFileListingParallelism();
     SerializableConfiguration conf = new 
SerializableConfiguration(dataMetaClient.getHadoopConf());
     final String dirFilterRegex = 
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
+    final String datasetBasePath = dataMetaClient.getBasePath();
 
     while (!pathsToList.isEmpty()) {
-      int listingParallelism = Math.min(fileListingParallelism, 
pathsToList.size());
+      // In each round we will list a section of directories
+      int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
       // List all directories in parallel
-      List<Pair<Path, FileStatus[]>> dirToFileListing = 
engineContext.map(pathsToList, path -> {
+      List<DirectoryInfo> foundDirsList = 
engineContext.map(pathsToList.subList(0,  numDirsToList), path -> {
         FileSystem fs = path.getFileSystem(conf.get());
-        return Pair.of(path, fs.listStatus(path));
-      }, listingParallelism);
-      pathsToList.clear();
+        String relativeDirPath = FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), path);
+        return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
+      }, numDirsToList);
+
+      pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, 
pathsToList.size()));
 
       // If the listing reveals a directory, add it to queue. If the listing 
reveals a hoodie partition, add it to
       // the results.
-      dirToFileListing.forEach(p -> {
-        if (!dirFilterRegex.isEmpty() && 
p.getLeft().getName().matches(dirFilterRegex)) {
-          LOG.info("Ignoring directory " + p.getLeft() + " which matches the 
filter regex " + dirFilterRegex);
-          return;
+      for (DirectoryInfo dirInfo : foundDirsList) {
+        if (!dirFilterRegex.isEmpty()) {
+          final String relativePath = dirInfo.getRelativePath();
+          if (!relativePath.isEmpty()) {
+            Path partitionPath = new Path(datasetBasePath, relativePath);
+            if (partitionPath.getName().matches(dirFilterRegex)) {
+              LOG.info("Ignoring directory " + partitionPath + " which matches 
the filter regex " + dirFilterRegex);
+              continue;
+            }
+          }
         }
 
-        List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
-            .filter(fs -> 
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
-            .collect(Collectors.toList());
-
-        if (p.getRight().length > filesInDir.size()) {
-          String partitionName = FSUtils.getRelativePartitionPath(new 
Path(dataMetaClient.getBasePath()), p.getLeft());
-          // deal with Non-partition table, we should exclude .hoodie
-          partitionToFileStatus.put(partitionName, filesInDir.stream()
-              .filter(f -> 
!f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
+        if (dirInfo.isPartition()) {
+          // Add to result
+          foundPartitionsList.add(dirInfo);
         } else {
           // Add sub-dirs to the queue
-          pathsToList.addAll(Arrays.stream(p.getRight())
-              .filter(fs -> fs.isDirectory() && 
!fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
-              .map(fs -> fs.getPath())
-              .collect(Collectors.toList()));
+          pathsToList.addAll(dirInfo.getSubdirs());

Review comment:
       may I know where are we ignoring the meta paths? eg: L460 before this 
patch. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -419,52 +394,53 @@ private boolean 
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
    * @param dataMetaClient
    * @return Map of partition names to a list of FileStatus for all the files 
in the partition
    */
-  private Map<String, List<FileStatus>> 
getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
+  private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient 
datasetMetaClient) {
     List<Path> pathsToList = new LinkedList<>();
     pathsToList.add(new Path(dataWriteConfig.getBasePath()));
 
-    Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+    List<DirectoryInfo> foundPartitionsList = new LinkedList<>();

Review comment:
       may be we can name it "partitionsToBootstrap" 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
##########
@@ -36,4 +49,9 @@
   public String getFileExtension() {
     return extension;
   }
+
+  public static boolean isBaseFile(Path path) {

Review comment:
       do you think we should move this to FSUtils? 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -145,15 +152,39 @@ protected void commit(List<HoodieRecord> records, String 
partitionName, String i
    *
    * The record is tagged with respective file slice's location based on its 
record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName, int numFileGroups) {
+  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, 
String partitionName, int numFileGroups) {
     List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
 partitionName);
     ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
 
-    JavaSparkContext jsc = ((HoodieSparkEngineContext) 
engineContext).getJavaSparkContext();
-    return jsc.parallelize(records, 1).map(r -> {
+    return recordsRDD.map(r -> {
       FileSlice slice = 
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
 numFileGroups));
       r.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
       return r;
     });
   }
+
+  @Override
+  protected void commit(List<DirectoryInfo> partitionInfoList, String 
createInstantTime) {

Review comment:
       Recently we added some abstractions to hudi (HoodieData, HoodieJavaRdd, 
HoodieList). Can we re-use them to avoid duplications across flink and spark. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -145,15 +152,39 @@ protected void commit(List<HoodieRecord> records, String 
partitionName, String i
    *
    * The record is tagged with respective file slice's location based on its 
record key.
    */
-  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName, int numFileGroups) {
+  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, 
String partitionName, int numFileGroups) {
     List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient,
 partitionName);
     ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
 
-    JavaSparkContext jsc = ((HoodieSparkEngineContext) 
engineContext).getJavaSparkContext();
-    return jsc.parallelize(records, 1).map(r -> {
+    return recordsRDD.map(r -> {
       FileSlice slice = 
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
 numFileGroups));
       r.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
       return r;
     });
   }
+
+  @Override
+  protected void commit(List<DirectoryInfo> partitionInfoList, String 
createInstantTime) {

Review comment:
       We can have only one commit() method which deals with HoodieData and 
abstract out the details. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to