This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 96b2359fda4d5b55e51e8f347228fe2f7cf6b24f
Author: ffcchi <fengfei...@gmail.com>
AuthorDate: Mon Mar 30 01:14:38 2020 -0600

    [HUDI-724] Parallelize getSmallFiles for partitions (#1421)
    
    Co-authored-by: Feichi Feng <feicf...@amazon.com>
---
 .../org/apache/hudi/client/HoodieWriteClient.java  |  4 +--
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  | 37 ++++++++++++++++------
 .../apache/hudi/table/HoodieMergeOnReadTable.java  | 12 +++----
 .../java/org/apache/hudi/table/HoodieTable.java    |  4 +--
 .../apache/hudi/table/TestCopyOnWriteTable.java    |  2 +-
 5 files changed, 36 insertions(+), 23 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index e201487..90bc9b3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -480,9 +480,9 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
 
   private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, 
WorkloadProfile profile) {
     if (isUpsert) {
-      return table.getUpsertPartitioner(profile);
+      return table.getUpsertPartitioner(profile, jsc);
     } else {
-      return table.getInsertPartitioner(profile);
+      return table.getInsertPartitioner(profile, jsc);
     }
   }
 
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 82b08b7..4c91c77 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -81,6 +81,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
 
 /**
@@ -142,16 +143,16 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
   }
 
   @Override
-  public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+  public Partitioner getUpsertPartitioner(WorkloadProfile profile, 
JavaSparkContext jsc) {
     if (profile == null) {
       throw new HoodieUpsertException("Need workload profile to construct the 
upsert partitioner.");
     }
-    return new UpsertPartitioner(profile);
+    return new UpsertPartitioner(profile, jsc);
   }
 
   @Override
-  public Partitioner getInsertPartitioner(WorkloadProfile profile) {
-    return getUpsertPartitioner(profile);
+  public Partitioner getInsertPartitioner(WorkloadProfile profile, 
JavaSparkContext jsc) {
+    return getUpsertPartitioner(profile, jsc);
   }
 
   @Override
@@ -569,14 +570,14 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
      */
     protected HoodieRollingStatMetadata rollingStatMetadata;
 
-    UpsertPartitioner(WorkloadProfile profile) {
+    UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) {
       updateLocationToBucket = new HashMap<>();
       partitionPathToInsertBuckets = new HashMap<>();
       bucketInfoMap = new HashMap<>();
       globalStat = profile.getGlobalStat();
       rollingStatMetadata = getRollingStats();
       assignUpdates(profile);
-      assignInserts(profile);
+      assignInserts(profile, jsc);
 
       LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + 
bucketInfoMap + ", \n"
           + "Partition to insert buckets => " + partitionPathToInsertBuckets + 
", \n"
@@ -602,18 +603,24 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
       return bucket;
     }
 
-    private void assignInserts(WorkloadProfile profile) {
+    private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) {
       // for new inserts, compute buckets depending on how many records we 
have for each partition
       Set<String> partitionPaths = profile.getPartitionPaths();
       long averageRecordSize =
           
averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
               config.getCopyOnWriteRecordSizeEstimate());
       LOG.info("AvgRecordSize => " + averageRecordSize);
+
+      Map<String, List<SmallFile>> partitionSmallFilesMap =
+              getSmallFilesForPartitions(new 
ArrayList<String>(partitionPaths), jsc);
+
       for (String partitionPath : partitionPaths) {
         WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
         if (pStat.getNumInserts() > 0) {
 
-          List<SmallFile> smallFiles = getSmallFiles(partitionPath);
+          List<SmallFile> smallFiles = 
partitionSmallFilesMap.get(partitionPath);
+          this.smallFiles.addAll(smallFiles);
+
           LOG.info("For partitionPath : " + partitionPath + " Small Files => " 
+ smallFiles);
 
           long totalUnassignedInserts = pStat.getNumInserts();
@@ -675,6 +682,18 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
       }
     }
 
+    private Map<String, List<SmallFile>> 
getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) {
+
+      Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
+      if (partitionPaths != null && partitionPaths.size() > 0) {
+        JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, 
partitionPaths.size());
+        partitionSmallFilesMap = 
partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>)
+            partitionPath -> new Tuple2<>(partitionPath, 
getSmallFiles(partitionPath))).collectAsMap();
+      }
+
+      return partitionSmallFilesMap;
+    }
+
     /**
      * Returns a list of small files in the given partition path.
      */
@@ -697,8 +716,6 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
             sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
             sf.sizeBytes = file.getFileSize();
             smallFileLocations.add(sf);
-            // Update the global small files list
-            smallFiles.add(sf);
           }
         }
       }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index 50d41b3..938a5fd 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -89,11 +89,11 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends Hoodi
   }
 
   @Override
-  public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
+  public Partitioner getUpsertPartitioner(WorkloadProfile profile, 
JavaSparkContext jsc) {
     if (profile == null) {
       throw new HoodieUpsertException("Need workload profile to construct the 
upsert partitioner.");
     }
-    mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile);
+    mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile, 
jsc);
     return mergeOnReadUpsertPartitioner;
   }
 
@@ -323,8 +323,8 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends Hoodi
    */
   class MergeOnReadUpsertPartitioner extends 
HoodieCopyOnWriteTable.UpsertPartitioner {
 
-    MergeOnReadUpsertPartitioner(WorkloadProfile profile) {
-      super(profile);
+    MergeOnReadUpsertPartitioner(WorkloadProfile profile, JavaSparkContext 
jsc) {
+      super(profile, jsc);
     }
 
     @Override
@@ -374,16 +374,12 @@ public class HoodieMergeOnReadTable<T extends 
HoodieRecordPayload> extends Hoodi
             sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
             sf.sizeBytes = getTotalFileSize(smallFileSlice);
             smallFileLocations.add(sf);
-            // Update the global small files list
-            smallFiles.add(sf);
           } else {
             HoodieLogFile logFile = 
smallFileSlice.getLogFiles().findFirst().get();
             sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
                 FSUtils.getFileIdFromLogPath(logFile.getPath()));
             sf.sizeBytes = getTotalFileSize(smallFileSlice);
             smallFileLocations.add(sf);
-            // Update the global small files list
-            smallFiles.add(sf);
           }
         }
       }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 2e73ef0..ad0196c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -115,12 +115,12 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
   /**
    * Provides a partitioner to perform the upsert operation, based on the 
workload profile.
    */
-  public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile);
+  public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile, 
JavaSparkContext jsc);
 
   /**
    * Provides a partitioner to perform the insert operation, based on the 
workload profile.
    */
-  public abstract Partitioner getInsertPartitioner(WorkloadProfile profile);
+  public abstract Partitioner getInsertPartitioner(WorkloadProfile profile, 
JavaSparkContext jsc);
 
   /**
    * Return whether this HoodieTable implementation can benefit from workload 
profiling.
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index 95248a4..ec64080 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -415,7 +415,7 @@ public class TestCopyOnWriteTable extends 
HoodieClientTestHarness {
     records.addAll(updateRecords);
     WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records));
     HoodieCopyOnWriteTable.UpsertPartitioner partitioner =
-        (HoodieCopyOnWriteTable.UpsertPartitioner) 
table.getUpsertPartitioner(profile);
+        (HoodieCopyOnWriteTable.UpsertPartitioner) 
table.getUpsertPartitioner(profile, jsc);
     assertEquals("Update record should have gone to the 1 update partition", 
0, partitioner.getPartition(
         new Tuple2<>(updateRecords.get(0).getKey(), 
Option.ofNullable(updateRecords.get(0).getCurrentLocation()))));
     return partitioner;

Reply via email to