This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b3292ebf4a [HUDI-5154] Improve hudi-spark-client Lambada writing 
(#7127)
b3292ebf4a is described below

commit b3292ebf4a8e6e7168766a3f6f3dd8b76edf8c5e
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Wed Nov 9 08:51:48 2022 +0800

    [HUDI-5154] Improve hudi-spark-client Lambada writing (#7127)
    
    Co-authored-by: slfan1989 <louj1988@@>
---
 .../java/org/apache/hudi/client/SparkRDDWriteClient.java     |  4 ++--
 .../run/strategy/SingleSparkJobExecutionStrategy.java        |  8 ++++----
 .../clustering/update/strategy/SparkAllowUpdateStrategy.java |  2 +-
 .../org/apache/hudi/client/utils/SparkValidatorUtils.java    | 12 +++++-------
 .../execution/bulkinsert/RDDConsistentBucketPartitioner.java |  4 +---
 .../org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java   |  4 ++--
 .../java/org/apache/hudi/metrics/DistributedRegistry.java    | 10 +++++-----
 7 files changed, 20 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index ef37dd1835..c200abee5e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -434,9 +434,9 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
     List<HoodieWriteStat> writeStats = 
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
         e.getValue().stream()).collect(Collectors.toList());
 
-    if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) 
{
+    if 
(writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) 
{
       throw new HoodieClusteringException("Clustering failed to write to 
files:"
-          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 
0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
+          + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 
0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
     }
 
     final HoodieInstant clusteringInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 601d2ec8a7..46d2466c5c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -53,6 +53,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -77,8 +78,7 @@ public abstract class SingleSparkJobExecutionStrategy<T 
extends HoodieRecordPayl
     JavaSparkContext engineContext = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
     final TaskContextSupplier taskContextSupplier = 
getEngineContext().getTaskContextSupplier();
     final SerializableSchema serializableSchema = new 
SerializableSchema(schema);
-    final List<ClusteringGroupInfo> clusteringGroupInfos = 
clusteringPlan.getInputGroups().stream().map(clusteringGroup ->
-        
ClusteringGroupInfo.create(clusteringGroup)).collect(Collectors.toList());
+    final List<ClusteringGroupInfo> clusteringGroupInfos = 
clusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList());
 
     String umask = 
engineContext.hadoopConfiguration().get("fs.permissions.umask-mode");
     Broadcast<String> umaskBroadcastValue = engineContext.broadcast(umask);
@@ -121,7 +121,7 @@ public abstract class SingleSparkJobExecutionStrategy<T 
extends HoodieRecordPayl
 
     Iterable<List<WriteStatus>> writeStatusIterable = () -> writeStatuses;
     return StreamSupport.stream(writeStatusIterable.spliterator(), false)
-        .flatMap(writeStatusList -> writeStatusList.stream());
+        .flatMap(Collection::stream);
   }
 
 
@@ -152,7 +152,7 @@ public abstract class SingleSparkJobExecutionStrategy<T 
extends HoodieRecordPayl
         }
       };
 
-      return StreamSupport.stream(indexedRecords.spliterator(), 
false).map(record -> transform(record)).iterator();
+      return StreamSupport.stream(indexedRecords.spliterator(), 
false).map(this::transform).iterator();
     }).collect(Collectors.toList());
 
     return new ConcatenatingIterator<>(iteratorsForPartition);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
index acb6d82ae1..6d819df3c2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
@@ -44,7 +44,7 @@ public class SparkAllowUpdateStrategy<T extends 
HoodieRecordPayload<T>> extends
   public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> 
handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
     List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = 
getGroupIdsWithUpdate(taggedRecordsRDD);
     Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering = 
fileGroupIdsWithRecordUpdate.stream()
-        .filter(f -> fileGroupsInPendingClustering.contains(f))
+        .filter(fileGroupsInPendingClustering::contains)
         .collect(Collectors.toSet());
     return Pair.of(taggedRecordsRDD, 
fileGroupIdsWithUpdatesAndPendingClustering);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index a6d03eae2b..4c4200d9ba 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -24,6 +24,7 @@ import 
org.apache.hudi.client.validator.SparkPreCommitValidator;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -71,8 +72,7 @@ public class SparkValidatorUtils {
       if (!writeMetadata.getWriteStats().isPresent()) {
         
writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
       }
-      Set<String> partitionsModified = 
writeMetadata.getWriteStats().get().stream().map(writeStats ->
-          writeStats.getPartitionPath()).collect(Collectors.toSet());
+      Set<String> partitionsModified = 
writeMetadata.getWriteStats().get().stream().map(HoodieWriteStat::getPartitionPath).collect(Collectors.toSet());
       SQLContext sqlContext = new 
SQLContext(HoodieSparkEngineContext.getSparkContext(context));
       // Refresh timeline to ensure validator sees the any other operations 
done on timeline (async operations such as other clustering/compaction/rollback)
       table.getMetaClient().reloadActiveTimeline();
@@ -80,11 +80,9 @@ public class SparkValidatorUtils {
       Dataset<Row> afterState  = getRecordsFromPendingCommits(sqlContext, 
partitionsModified, writeMetadata, table, instantTime).cache();
 
       Stream<SparkPreCommitValidator> validators = 
Arrays.stream(config.getPreCommitValidators().split(","))
-          .map(validatorClass -> {
-            return ((SparkPreCommitValidator) 
ReflectionUtils.loadClass(validatorClass,
-                new Class<?>[] {HoodieSparkTable.class, 
HoodieEngineContext.class, HoodieWriteConfig.class},
-                table, context, config));
-          });
+          .map(validatorClass -> ((SparkPreCommitValidator) 
ReflectionUtils.loadClass(validatorClass,
+              new Class<?>[] {HoodieSparkTable.class, 
HoodieEngineContext.class, HoodieWriteConfig.class},
+              table, context, config)));
 
       boolean allSuccess = validators.map(v -> runValidatorAsync(v, 
writeMetadata, beforeState, afterState, 
instantTime)).map(CompletableFuture::join)
           .reduce(true, Boolean::logicalAnd);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
index e23723ac72..7b644938bb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
@@ -266,9 +266,7 @@ public class RDDConsistentBucketPartitioner<T extends 
HoodieRecordPayload> exten
       LOG.warn("Consistent bucket does not support global sort mode, the sort 
will only be done within each data partition");
     }
 
-    Comparator<HoodieKey> comparator = (Comparator<HoodieKey> & Serializable) 
(t1, t2) -> {
-      return t1.getRecordKey().compareTo(t2.getRecordKey());
-    };
+    Comparator<HoodieKey> comparator = (Comparator<HoodieKey> & Serializable) 
(t1, t2) -> t1.getRecordKey().compareTo(t2.getRecordKey());
 
     return records.mapToPair(record -> new Tuple2<>(record.getKey(), record))
         .repartitionAndSortWithinPartitions(partitioner, comparator)
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
index f99bf876c9..16f47b8f8c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
@@ -427,7 +427,7 @@ public class SparkHoodieHBaseIndex extends 
HoodieIndex<Object, Object> {
     // Map each fileId that has inserts to a unique partition Id. This will be 
used while
     // repartitioning RDD<WriteStatus>
     final List<String> fileIds = writeStatusRDD.filter(w -> 
w.getStat().getNumInserts() > 0)
-                                   .map(w -> w.getFileId()).collect();
+                                   .map(WriteStatus::getFileId).collect();
     for (final String fileId : fileIds) {
       fileIdPartitionMap.put(fileId, partitionIndex++);
     }
@@ -445,7 +445,7 @@ public class SparkHoodieHBaseIndex extends 
HoodieIndex<Object, Object> {
         writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
             .partitionBy(new WriteStatusPartitioner(fileIdPartitionMap,
                 this.numWriteStatusWithInserts))
-            .map(w -> w._2());
+            .map(Tuple2::_2);
     JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
     acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
     JavaRDD<WriteStatus> writeStatusJavaRDD = 
partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
index 60c32b34da..ca01def803 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
@@ -52,12 +52,12 @@ public class DistributedRegistry extends 
AccumulatorV2<Map<String, Long>, Map<St
 
   @Override
   public void increment(String name) {
-    counters.merge(name,  1L, (oldValue, newValue) -> oldValue + newValue);
+    counters.merge(name,  1L, Long::sum);
   }
 
   @Override
   public void add(String name, long value) {
-    counters.merge(name,  value, (oldValue, newValue) -> oldValue + newValue);
+    counters.merge(name,  value, Long::sum);
   }
 
   @Override
@@ -80,13 +80,13 @@ public class DistributedRegistry extends 
AccumulatorV2<Map<String, Long>, Map<St
 
   @Override
   public void add(Map<String, Long> arg) {
-    arg.forEach((key, value) -> add(key, value));
+    arg.forEach(this::add);
   }
 
   @Override
   public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() {
     DistributedRegistry registry = new DistributedRegistry(name);
-    counters.forEach((key, value) -> registry.add(key, value));
+    counters.forEach(registry::add);
     return registry;
   }
 
@@ -97,7 +97,7 @@ public class DistributedRegistry extends 
AccumulatorV2<Map<String, Long>, Map<St
 
   @Override
   public void merge(AccumulatorV2<Map<String, Long>, Map<String, Long>> acc) {
-    acc.value().forEach((key, value) -> add(key, value));
+    acc.value().forEach(this::add);
   }
 
   @Override

Reply via email to