This is an automated email from the ASF dual-hosted git repository. vaughn pushed a commit to branch zy_dev in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git
commit 8f1bfe33981cb476f7d6416b795b1dff20389290 Author: vaughn.zhang <[email protected]> AuthorDate: Thu Apr 6 18:30:03 2023 +0800 chore: improve spark parallel --- .../loader/spark/HugeGraphSparkLoader.java | 65 +++++++++++++--------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java index 60c7837f..b26003b0 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java @@ -63,6 +63,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import scala.collection.JavaConverters; @@ -77,11 +80,10 @@ public class HugeGraphSparkLoader implements Serializable { HugeGraphSparkLoader loader; try { loader = new HugeGraphSparkLoader(args); + loader.load(); } catch (Throwable e) { Printer.printError("Failed to start loading", e); - return; } - loader.load(); } public HugeGraphSparkLoader(String[] args) { @@ -89,7 +91,7 @@ public class HugeGraphSparkLoader implements Serializable { this.builders = new HashMap<>(); } - public void load() { + public void load() throws ExecutionException, InterruptedException { LoadMapping mapping = LoadMapping.of(this.loadOptions.file); List<InputStruct> structs = mapping.structs(); boolean sinkType = this.loadOptions.sinkType; @@ -123,35 +125,44 @@ public class HugeGraphSparkLoader implements Serializable { SparkContext sc = session.sparkContext(); LongAccumulator totalInsertSuccess = sc.longAccumulator("totalInsertSuccess"); + List<Future<Void>> futures = new ArrayList<>(structs.size()); + for (InputStruct struct : structs) { - LOG.info("\n Initializes the accumulator corresponding to the {} ", - struct.input().asFileSource().path()); - LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct); - loadDistributeMetrics.init(sc); - LOG.info("\n Start to load data, data info is: \t {} ", - struct.input().asFileSource().path()); - Dataset<Row> ds = read(session, struct); - if (sinkType) { - LOG.info("\n Start to load data using spark apis \n"); - ds.foreachPartition((Iterator<Row> p) -> { - LoadContext context = initPartition(this.loadOptions, struct); - p.forEachRemaining((Row row) -> { - loadRow(struct, row, p, context); + Future<Void> future = Executors.newCachedThreadPool().submit(() -> { + LOG.info("\n Initializes the accumulator corresponding to the {} ", + struct.input().asFileSource().path()); + LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct); + loadDistributeMetrics.init(sc); + LOG.info("\n Start to load data, data info is: \t {} ", + struct.input().asFileSource().path()); + Dataset<Row> ds = read(session, struct); + if (sinkType) { + LOG.info("\n Start to load data using spark apis \n"); + ds.foreachPartition((Iterator<Row> p) -> { + LoadContext context = initPartition(this.loadOptions, struct); + p.forEachRemaining((Row row) -> { + loadRow(struct, row, p, context); + }); + context.close(); }); - context.close(); - }); - } else { - LOG.info("\n Start to load data using spark bulkload \n"); - // gen-hfile - HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions, struct, - loadDistributeMetrics); - directLoader.bulkload(ds); + } else { + LOG.info("\n Start to load data using spark bulkload \n"); + // gen-hfile + HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions, struct, + loadDistributeMetrics); + directLoader.bulkload(ds); - } - collectLoadMetrics(loadDistributeMetrics, totalInsertSuccess); - LOG.info("\n Finished load {} data ", struct.input().asFileSource().path()); + } + collectLoadMetrics(loadDistributeMetrics, totalInsertSuccess); + LOG.info("\n Finished load {} data ", struct.input().asFileSource().path()); + }); + futures.add(future); } + for (Future<Void> future : futures) { + future.get(); + } + Long totalInsertSuccessCnt = totalInsertSuccess.value(); LOG.info("\n ------------The data load task is complete-------------------\n" + "\n insertSuccessCnt:\t {} \n ---------------------------------------------\n",
