This is an automated email from the ASF dual-hosted git repository.
ming pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git
The following commit(s) were added to refs/heads/master by this push:
new cf1312e9 chore: improve spark parallel (#450)
cf1312e9 is described below
commit cf1312e9be69756a083ef25301fece41396701bb
Author: vaughn <[email protected]>
AuthorDate: Fri Apr 7 09:49:48 2023 +0800
chore: improve spark parallel (#450)
---
.../loader/spark/HugeGraphSparkLoader.java | 69 +++++++++++++---------
1 file changed, 42 insertions(+), 27 deletions(-)
diff --git
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
index 60c7837f..849c818c 100644
---
a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++
b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@ -63,6 +63,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import scala.collection.JavaConverters;
@@ -73,23 +77,25 @@ public class HugeGraphSparkLoader implements Serializable {
private final LoadOptions loadOptions;
private final Map<ElementBuilder, List<GraphElement>> builders;
+ private final ExecutorService executor;
+
public static void main(String[] args) {
HugeGraphSparkLoader loader;
try {
loader = new HugeGraphSparkLoader(args);
+ loader.load();
} catch (Throwable e) {
Printer.printError("Failed to start loading", e);
- return;
}
- loader.load();
}
public HugeGraphSparkLoader(String[] args) {
this.loadOptions = LoadOptions.parseOptions(args);
this.builders = new HashMap<>();
+ this.executor = Executors.newCachedThreadPool();
}
- public void load() {
+ public void load() throws ExecutionException, InterruptedException {
LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
List<InputStruct> structs = mapping.structs();
boolean sinkType = this.loadOptions.sinkType;
@@ -123,35 +129,44 @@ public class HugeGraphSparkLoader implements Serializable
{
SparkContext sc = session.sparkContext();
LongAccumulator totalInsertSuccess =
sc.longAccumulator("totalInsertSuccess");
+ List<Future<?>> futures = new ArrayList<>(structs.size());
+
for (InputStruct struct : structs) {
- LOG.info("\n Initializes the accumulator corresponding to the {}
",
- struct.input().asFileSource().path());
- LoadDistributeMetrics loadDistributeMetrics = new
LoadDistributeMetrics(struct);
- loadDistributeMetrics.init(sc);
- LOG.info("\n Start to load data, data info is: \t {} ",
- struct.input().asFileSource().path());
- Dataset<Row> ds = read(session, struct);
- if (sinkType) {
- LOG.info("\n Start to load data using spark apis \n");
- ds.foreachPartition((Iterator<Row> p) -> {
- LoadContext context = initPartition(this.loadOptions,
struct);
- p.forEachRemaining((Row row) -> {
- loadRow(struct, row, p, context);
+ Future<?> future = this.executor.submit(() -> {
+ LOG.info("\n Initializes the accumulator corresponding to the
{} ",
+ struct.input().asFileSource().path());
+ LoadDistributeMetrics loadDistributeMetrics = new
LoadDistributeMetrics(struct);
+ loadDistributeMetrics.init(sc);
+ LOG.info("\n Start to load data, data info is: \t {} ",
+ struct.input().asFileSource().path());
+ Dataset<Row> ds = read(session, struct);
+ if (sinkType) {
+ LOG.info("\n Start to load data using spark apis \n");
+ ds.foreachPartition((Iterator<Row> p) -> {
+ LoadContext context = initPartition(this.loadOptions,
struct);
+ p.forEachRemaining((Row row) -> {
+ loadRow(struct, row, p, context);
+ });
+ context.close();
});
- context.close();
- });
- } else {
- LOG.info("\n Start to load data using spark bulkload \n");
- // gen-hfile
- HBaseDirectLoader directLoader = new
HBaseDirectLoader(loadOptions, struct,
-
loadDistributeMetrics);
- directLoader.bulkload(ds);
+ } else {
+ LOG.info("\n Start to load data using spark bulkload \n");
+ // gen-hfile
+ HBaseDirectLoader directLoader = new
HBaseDirectLoader(loadOptions, struct,
+
loadDistributeMetrics);
+ directLoader.bulkload(ds);
- }
- collectLoadMetrics(loadDistributeMetrics, totalInsertSuccess);
- LOG.info("\n Finished load {} data ",
struct.input().asFileSource().path());
+ }
+ collectLoadMetrics(loadDistributeMetrics, totalInsertSuccess);
+ LOG.info("\n Finished load {} data ",
struct.input().asFileSource().path());
+ });
+ futures.add(future);
}
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
Long totalInsertSuccessCnt = totalInsertSuccess.value();
LOG.info("\n ------------The data load task is
complete-------------------\n" +
"\n insertSuccessCnt:\t {} \n
---------------------------------------------\n",