bvaradar commented on code in PR #7143:
URL: https://github.com/apache/hudi/pull/7143#discussion_r1155230011


##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -229,17 +238,93 @@ protected List<FileSlice> 
getInputFileSlices(PartitionPath partition) {
   }
 
   private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<PartitionPath> partitions) {
-    FileStatus[] allFiles = listPartitionPathFiles(partitions);
+    Pair<Map<Path, FileStatus[]>, Map<String, FileStatus[]>> 
partitionFilesPair = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
 
-    HoodieTableFileSystemView fileSystemView =
-        new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
-
     Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
 
     validate(activeTimeline, queryInstant);
 
+    int parallelism = 
Integer.parseInt(String.valueOf(configProperties.getOrDefault(HoodieCommonConfig.TABLE_LOADING_PARALLELISM.key(),
+        HoodieCommonConfig.TABLE_LOADING_PARALLELISM.defaultValue())));

Review Comment:
   By default, we need to disable this. Only after sufficient runway of having 
this code being used, we need to enable parallelism. 



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -229,17 +238,93 @@ protected List<FileSlice> 
getInputFileSlices(PartitionPath partition) {
   }
 
   private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<PartitionPath> partitions) {
-    FileStatus[] allFiles = listPartitionPathFiles(partitions);
+    Pair<Map<Path, FileStatus[]>, Map<String, FileStatus[]>> 
partitionFilesPair = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
 
-    HoodieTableFileSystemView fileSystemView =
-        new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
-
     Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
 
     validate(activeTimeline, queryInstant);
 
+    int parallelism = 
Integer.parseInt(String.valueOf(configProperties.getOrDefault(HoodieCommonConfig.TABLE_LOADING_PARALLELISM.key(),
+        HoodieCommonConfig.TABLE_LOADING_PARALLELISM.defaultValue())));
+
+    Map<PartitionPath, List<FileSlice>> cachedAllInputFileSlices;
+    long buildCacheFileSlicesLocalStart = System.currentTimeMillis();
+    if (parallelism > 0 && partitions.size() > 0) {
+
+      // convert Map<Path, FileStatus[]> to Map<String, FileStatus[]>
+      Map<String, FileStatus[]> left = 
partitionFilesPair.getLeft().entrySet().stream().map(entry -> {
+        String partitionPath = entry.getKey().toString();
+        FileStatus[] statuses = entry.getValue();
+        return Pair.of(partitionPath, statuses);
+      }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+      Map<String, FileStatus[]> partitionFiles = combine(left, 
partitionFilesPair.getRight());
+
+      cachedAllInputFileSlices = 
buildCacheFileSlicesLocalParallel(parallelism, partitions, partitionFiles, 
activeTimeline, queryInstant);
+    } else {
+      FileStatus[] allFiles = 
combine(flatMap(partitionFilesPair.getLeft().values()), 
flatMap(partitionFilesPair.getRight().values()));
+      HoodieTableFileSystemView fileSystemView =
+          new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
+
+      cachedAllInputFileSlices =  getCandidateFileSlices(partitions, 
queryInstant, fileSystemView);
+    }
+
+    long buildCacheFileSlicesLocalEnd = System.currentTimeMillis();
+    LOG.info(String.format("Build cache file slices, spent: %d ms", 
buildCacheFileSlicesLocalEnd - buildCacheFileSlicesLocalStart));
+
+    return cachedAllInputFileSlices;
+  }
+
+  private Map<PartitionPath, List<FileSlice>> 
buildCacheFileSlicesLocalParallel(int parallelism, List<PartitionPath> 
partitions, Map<String, FileStatus[]> partitionFiles,
+                                                                               
 HoodieTimeline activeTimeline, Option<String> queryInstant) {
+    HashMap<PartitionPath, List<FileSlice>> res = new HashMap<>();
+    parallelism = Math.max(1, Math.min(parallelism, partitionFiles.size()));
+    int totalPartitions = partitionFiles.size();
+    int cursor = 0;
+    int step = totalPartitions / parallelism;
+
+    ExecutorService pool =  Executors.newFixedThreadPool((parallelism + 1));
+    ArrayList<CompletableFuture<Map<PartitionPath, List<FileSlice>>>> 
futureList = new ArrayList<>(parallelism + 1);
+
+    while (cursor + step <= totalPartitions) {

Review Comment:
   Can we use simple Java Streams - parallelStream here to parallelize here 
instead of subdividing and then using parallel streams.  



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -229,17 +238,93 @@ protected List<FileSlice> 
getInputFileSlices(PartitionPath partition) {
   }
 
   private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<PartitionPath> partitions) {
-    FileStatus[] allFiles = listPartitionPathFiles(partitions);
+    Pair<Map<Path, FileStatus[]>, Map<String, FileStatus[]>> 
partitionFilesPair = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
 
-    HoodieTableFileSystemView fileSystemView =
-        new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
-
     Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
 
     validate(activeTimeline, queryInstant);
 
+    int parallelism = 
Integer.parseInt(String.valueOf(configProperties.getOrDefault(HoodieCommonConfig.TABLE_LOADING_PARALLELISM.key(),
+        HoodieCommonConfig.TABLE_LOADING_PARALLELISM.defaultValue())));
+
+    Map<PartitionPath, List<FileSlice>> cachedAllInputFileSlices;

Review Comment:
   Rename to allCachedInputFileSlices



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -229,17 +238,93 @@ protected List<FileSlice> 
getInputFileSlices(PartitionPath partition) {
   }
 
   private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<PartitionPath> partitions) {
-    FileStatus[] allFiles = listPartitionPathFiles(partitions);
+    Pair<Map<Path, FileStatus[]>, Map<String, FileStatus[]>> 
partitionFilesPair = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
 
-    HoodieTableFileSystemView fileSystemView =
-        new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
-
     Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::getTimestamp));
 
     validate(activeTimeline, queryInstant);
 
+    int parallelism = 
Integer.parseInt(String.valueOf(configProperties.getOrDefault(HoodieCommonConfig.TABLE_LOADING_PARALLELISM.key(),
+        HoodieCommonConfig.TABLE_LOADING_PARALLELISM.defaultValue())));
+
+    Map<PartitionPath, List<FileSlice>> cachedAllInputFileSlices;
+    long buildCacheFileSlicesLocalStart = System.currentTimeMillis();
+    if (parallelism > 0 && partitions.size() > 0) {
+
+      // convert Map<Path, FileStatus[]> to Map<String, FileStatus[]>
+      Map<String, FileStatus[]> left = 
partitionFilesPair.getLeft().entrySet().stream().map(entry -> {

Review Comment:
   Wouldn't it be simpler to have return type of listPartitionPathFiles use 
String instead of Path for partitionPath. We can avoid these transformations.



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -229,17 +238,93 @@ protected List<FileSlice> 
getInputFileSlices(PartitionPath partition) {
   }
 
   private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<PartitionPath> partitions) {
-    FileStatus[] allFiles = listPartitionPathFiles(partitions);
+    Pair<Map<Path, FileStatus[]>, Map<String, FileStatus[]>> 
partitionFilesPair = listPartitionPathFiles(partitions);

Review Comment:
   From the code change, it does not look like you need to differentiate cached 
and fetched file statues. Can you keep them as one return value. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to