This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new b3292ebf4a [HUDI-5154] Improve hudi-spark-client Lambada writing (#7127) b3292ebf4a is described below commit b3292ebf4a8e6e7168766a3f6f3dd8b76edf8c5e Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Wed Nov 9 08:51:48 2022 +0800 [HUDI-5154] Improve hudi-spark-client Lambada writing (#7127) Co-authored-by: slfan1989 <louj1988@@> --- .../java/org/apache/hudi/client/SparkRDDWriteClient.java | 4 ++-- .../run/strategy/SingleSparkJobExecutionStrategy.java | 8 ++++---- .../clustering/update/strategy/SparkAllowUpdateStrategy.java | 2 +- .../org/apache/hudi/client/utils/SparkValidatorUtils.java | 12 +++++------- .../execution/bulkinsert/RDDConsistentBucketPartitioner.java | 4 +--- .../org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java | 4 ++-- .../java/org/apache/hudi/metrics/DistributedRegistry.java | 10 +++++----- 7 files changed, 20 insertions(+), 24 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index ef37dd1835..c200abee5e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -434,9 +434,9 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); - if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) { + if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) { throw new HoodieClusteringException("Clustering failed to write to files:" - + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); + + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(","))); } final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java index 601d2ec8a7..46d2466c5c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -53,6 +53,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import java.io.IOException; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -77,8 +78,7 @@ public abstract class SingleSparkJobExecutionStrategy<T extends HoodieRecordPayl JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext()); final TaskContextSupplier taskContextSupplier = getEngineContext().getTaskContextSupplier(); final SerializableSchema serializableSchema = new SerializableSchema(schema); - final List<ClusteringGroupInfo> clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(clusteringGroup -> - ClusteringGroupInfo.create(clusteringGroup)).collect(Collectors.toList()); + final List<ClusteringGroupInfo> clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList()); String umask = engineContext.hadoopConfiguration().get("fs.permissions.umask-mode"); Broadcast<String> umaskBroadcastValue = engineContext.broadcast(umask); @@ -121,7 +121,7 @@ public abstract class SingleSparkJobExecutionStrategy<T extends HoodieRecordPayl Iterable<List<WriteStatus>> writeStatusIterable = () -> writeStatuses; return StreamSupport.stream(writeStatusIterable.spliterator(), false) - .flatMap(writeStatusList -> writeStatusList.stream()); + .flatMap(Collection::stream); } @@ -152,7 +152,7 @@ public abstract class SingleSparkJobExecutionStrategy<T extends HoodieRecordPayl } }; - return StreamSupport.stream(indexedRecords.spliterator(), false).map(record -> transform(record)).iterator(); + return StreamSupport.stream(indexedRecords.spliterator(), false).map(this::transform).iterator(); }).collect(Collectors.toList()); return new ConcatenatingIterator<>(iteratorsForPartition); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java index acb6d82ae1..6d819df3c2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java @@ -44,7 +44,7 @@ public class SparkAllowUpdateStrategy<T extends HoodieRecordPayload<T>> extends public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) { List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD); Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering = fileGroupIdsWithRecordUpdate.stream() - .filter(f -> fileGroupsInPendingClustering.contains(f)) + .filter(fileGroupsInPendingClustering::contains) .collect(Collectors.toSet()); return Pair.of(taggedRecordsRDD, fileGroupIdsWithUpdatesAndPendingClustering); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java index a6d03eae2b..4c4200d9ba 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.validator.SparkPreCommitValidator; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -71,8 +72,7 @@ public class SparkValidatorUtils { if (!writeMetadata.getWriteStats().isPresent()) { writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collectAsList()); } - Set<String> partitionsModified = writeMetadata.getWriteStats().get().stream().map(writeStats -> - writeStats.getPartitionPath()).collect(Collectors.toSet()); + Set<String> partitionsModified = writeMetadata.getWriteStats().get().stream().map(HoodieWriteStat::getPartitionPath).collect(Collectors.toSet()); SQLContext sqlContext = new SQLContext(HoodieSparkEngineContext.getSparkContext(context)); // Refresh timeline to ensure validator sees the any other operations done on timeline (async operations such as other clustering/compaction/rollback) table.getMetaClient().reloadActiveTimeline(); @@ -80,11 +80,9 @@ public class SparkValidatorUtils { Dataset<Row> afterState = getRecordsFromPendingCommits(sqlContext, partitionsModified, writeMetadata, table, instantTime).cache(); Stream<SparkPreCommitValidator> validators = Arrays.stream(config.getPreCommitValidators().split(",")) - .map(validatorClass -> { - return ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass, - new Class<?>[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, - table, context, config)); - }); + .map(validatorClass -> ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass, + new Class<?>[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, + table, context, config))); boolean allSuccess = validators.map(v -> runValidatorAsync(v, writeMetadata, beforeState, afterState, instantTime)).map(CompletableFuture::join) .reduce(true, Boolean::logicalAnd); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java index e23723ac72..7b644938bb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java @@ -266,9 +266,7 @@ public class RDDConsistentBucketPartitioner<T extends HoodieRecordPayload> exten LOG.warn("Consistent bucket does not support global sort mode, the sort will only be done within each data partition"); } - Comparator<HoodieKey> comparator = (Comparator<HoodieKey> & Serializable) (t1, t2) -> { - return t1.getRecordKey().compareTo(t2.getRecordKey()); - }; + Comparator<HoodieKey> comparator = (Comparator<HoodieKey> & Serializable) (t1, t2) -> t1.getRecordKey().compareTo(t2.getRecordKey()); return records.mapToPair(record -> new Tuple2<>(record.getKey(), record)) .repartitionAndSortWithinPartitions(partitioner, comparator) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java index f99bf876c9..16f47b8f8c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java @@ -427,7 +427,7 @@ public class SparkHoodieHBaseIndex extends HoodieIndex<Object, Object> { // Map each fileId that has inserts to a unique partition Id. This will be used while // repartitioning RDD<WriteStatus> final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0) - .map(w -> w.getFileId()).collect(); + .map(WriteStatus::getFileId).collect(); for (final String fileId : fileIds) { fileIdPartitionMap.put(fileId, partitionIndex++); } @@ -445,7 +445,7 @@ public class SparkHoodieHBaseIndex extends HoodieIndex<Object, Object> { writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w)) .partitionBy(new WriteStatusPartitioner(fileIdPartitionMap, this.numWriteStatusWithInserts)) - .map(w -> w._2()); + .map(Tuple2::_2); JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc); JavaRDD<WriteStatus> writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(), diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java index 60c32b34da..ca01def803 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java @@ -52,12 +52,12 @@ public class DistributedRegistry extends AccumulatorV2<Map<String, Long>, Map<St @Override public void increment(String name) { - counters.merge(name, 1L, (oldValue, newValue) -> oldValue + newValue); + counters.merge(name, 1L, Long::sum); } @Override public void add(String name, long value) { - counters.merge(name, value, (oldValue, newValue) -> oldValue + newValue); + counters.merge(name, value, Long::sum); } @Override @@ -80,13 +80,13 @@ public class DistributedRegistry extends AccumulatorV2<Map<String, Long>, Map<St @Override public void add(Map<String, Long> arg) { - arg.forEach((key, value) -> add(key, value)); + arg.forEach(this::add); } @Override public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() { DistributedRegistry registry = new DistributedRegistry(name); - counters.forEach((key, value) -> registry.add(key, value)); + counters.forEach(registry::add); return registry; } @@ -97,7 +97,7 @@ public class DistributedRegistry extends AccumulatorV2<Map<String, Long>, Map<St @Override public void merge(AccumulatorV2<Map<String, Long>, Map<String, Long>> acc) { - acc.value().forEach((key, value) -> add(key, value)); + acc.value().forEach(this::add); } @Override