This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 96b2359fda4d5b55e51e8f347228fe2f7cf6b24f Author: ffcchi <fengfei...@gmail.com> AuthorDate: Mon Mar 30 01:14:38 2020 -0600 [HUDI-724] Parallelize getSmallFiles for partitions (#1421) Co-authored-by: Feichi Feng <feicf...@amazon.com> --- .../org/apache/hudi/client/HoodieWriteClient.java | 4 +-- .../apache/hudi/table/HoodieCopyOnWriteTable.java | 37 ++++++++++++++++------ .../apache/hudi/table/HoodieMergeOnReadTable.java | 12 +++---- .../java/org/apache/hudi/table/HoodieTable.java | 4 +-- .../apache/hudi/table/TestCopyOnWriteTable.java | 2 +- 5 files changed, 36 insertions(+), 23 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index e201487..90bc9b3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -480,9 +480,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) { if (isUpsert) { - return table.getUpsertPartitioner(profile); + return table.getUpsertPartitioner(profile, jsc); } else { - return table.getInsertPartitioner(profile); + return table.getInsertPartitioner(profile, jsc); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 82b08b7..4c91c77 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -81,6 +81,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; /** @@ -142,16 +143,16 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi } @Override - public Partitioner getUpsertPartitioner(WorkloadProfile profile) { + public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) { if (profile == null) { throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); } - return new UpsertPartitioner(profile); + return new UpsertPartitioner(profile, jsc); } @Override - public Partitioner getInsertPartitioner(WorkloadProfile profile) { - return getUpsertPartitioner(profile); + public Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) { + return getUpsertPartitioner(profile, jsc); } @Override @@ -569,14 +570,14 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi */ protected HoodieRollingStatMetadata rollingStatMetadata; - UpsertPartitioner(WorkloadProfile profile) { + UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) { updateLocationToBucket = new HashMap<>(); partitionPathToInsertBuckets = new HashMap<>(); bucketInfoMap = new HashMap<>(); globalStat = profile.getGlobalStat(); rollingStatMetadata = getRollingStats(); assignUpdates(profile); - assignInserts(profile); + assignInserts(profile, jsc); LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n" + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n" @@ -602,18 +603,24 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi return bucket; } - private void assignInserts(WorkloadProfile profile) { + private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) { // for new inserts, compute buckets depending on how many records we have for each partition Set<String> partitionPaths = profile.getPartitionPaths(); long averageRecordSize = averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), config.getCopyOnWriteRecordSizeEstimate()); LOG.info("AvgRecordSize => " + averageRecordSize); + + Map<String, List<SmallFile>> partitionSmallFilesMap = + getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc); + for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); if (pStat.getNumInserts() > 0) { - List<SmallFile> smallFiles = getSmallFiles(partitionPath); + List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath); + this.smallFiles.addAll(smallFiles); + LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); long totalUnassignedInserts = pStat.getNumInserts(); @@ -675,6 +682,18 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi } } + private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, JavaSparkContext jsc) { + + Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>(); + if (partitionPaths != null && partitionPaths.size() > 0) { + JavaRDD<String> partitionPathRdds = jsc.parallelize(partitionPaths, partitionPaths.size()); + partitionSmallFilesMap = partitionPathRdds.mapToPair((PairFunction<String, String, List<SmallFile>>) + partitionPath -> new Tuple2<>(partitionPath, getSmallFiles(partitionPath))).collectAsMap(); + } + + return partitionSmallFilesMap; + } + /** * Returns a list of small files in the given partition path. */ @@ -697,8 +716,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); sf.sizeBytes = file.getFileSize(); smallFileLocations.add(sf); - // Update the global small files list - smallFiles.add(sf); } } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index 50d41b3..938a5fd 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -89,11 +89,11 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi } @Override - public Partitioner getUpsertPartitioner(WorkloadProfile profile) { + public Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) { if (profile == null) { throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); } - mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile); + mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile, jsc); return mergeOnReadUpsertPartitioner; } @@ -323,8 +323,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi */ class MergeOnReadUpsertPartitioner extends HoodieCopyOnWriteTable.UpsertPartitioner { - MergeOnReadUpsertPartitioner(WorkloadProfile profile) { - super(profile); + MergeOnReadUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc) { + super(profile, jsc); } @Override @@ -374,16 +374,12 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); - // Update the global small files list - smallFiles.add(sf); } else { HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), FSUtils.getFileIdFromLogPath(logFile.getPath())); sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); - // Update the global small files list - smallFiles.add(sf); } } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 2e73ef0..ad0196c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -115,12 +115,12 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri /** * Provides a partitioner to perform the upsert operation, based on the workload profile. */ - public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile); + public abstract Partitioner getUpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc); /** * Provides a partitioner to perform the insert operation, based on the workload profile. */ - public abstract Partitioner getInsertPartitioner(WorkloadProfile profile); + public abstract Partitioner getInsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc); /** * Return whether this HoodieTable implementation can benefit from workload profiling. diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index 95248a4..ec64080 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -415,7 +415,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { records.addAll(updateRecords); WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(records)); HoodieCopyOnWriteTable.UpsertPartitioner partitioner = - (HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile); + (HoodieCopyOnWriteTable.UpsertPartitioner) table.getUpsertPartitioner(profile, jsc); assertEquals("Update record should have gone to the 1 update partition", 0, partitioner.getPartition( new Tuple2<>(updateRecords.get(0).getKey(), Option.ofNullable(updateRecords.get(0).getCurrentLocation())))); return partitioner;