imbajin commented on code in PR #683:
URL:
https://github.com/apache/incubator-hugegraph-toolchain/pull/683#discussion_r2422432634
##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -109,42 +203,62 @@ public boolean load() {
// Print load summary
Printer.printSummary(this.context);
} catch (Throwable t) {
+ this.context.occurredError();
+
+ if (t instanceof ServerException) {
+ ServerException e = (ServerException) t;
+ String logMessage =
+ "Log ServerException: \n" + e.exception() + "\n";
+ if (e.trace() != null) {
+ logMessage += StringUtils.join((List<String>) e.trace(),
+ "\n");
+ }
+ LOG.warn(logMessage);
+ }
+
RuntimeException e = LoadUtil.targetRuntimeException(t);
Printer.printError("Failed to load", e);
Review Comment:
**Exception Handling Issue**: Adding `e.printStackTrace()` in production
code is not recommended. Stack traces should be logged via the logging
framework, not printed to stderr.
**Recommendation**: Remove the printStackTrace() call and rely on proper
logging:
```java
RuntimeException e = LoadUtil.targetRuntimeException(t);
Printer.printError("Failed to load", e);
LOG.error("Load failed with exception", e); // Use logger instead
throw e;
```
##########
hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java:
##########
@@ -200,27 +592,141 @@ private void loadInputs(List<InputStruct> structs) {
}
}
- private void loadStructs(List<InputStruct> structs) {
- // Load input structs one by one
+ private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
+ boolean scatter) {
+ ArrayList<InputTaskItem> tasks = new ArrayList<>();
+ int curFile = 0;
+ int curIndex = 0;
for (InputStruct struct : structs) {
- if (this.context.stopped()) {
- break;
- }
if (struct.skip()) {
continue;
}
- // Create and init InputReader, fetch next batch lines
- try (InputReader reader = InputReader.create(struct.input())) {
- // Init reader
- reader.init(this.context, struct);
- // Load data from current input mapping
- this.loadStruct(struct, reader);
+
+ // Create and init InputReader
+ try {
+ LOG.info("Start loading: '{}'", struct);
+
+ InputReader reader = InputReader.create(struct.input());
+ List<InputReader> readerList = reader.multiReaders() ?
+ reader.split() :
+ ImmutableList.of(reader);
+
+ LOG.info("total {} found in '{}'", readerList.size(), struct);
+ tasks.ensureCapacity(tasks.size() + readerList.size());
+ int seq = 0;
+ for (InputReader r : readerList) {
+ if (curFile >= this.context.options().startFile &&
+ (this.context.options().endFile == -1 ||
+ curFile < this.context.options().endFile)) {
+ // Load data from current input mapping
+ tasks.add(new InputTaskItem(struct, r, seq, curIndex));
+ } else {
+ r.close();
+ }
+ seq += 1;
+ curFile += 1;
+ }
+ if (this.context.options().endFile != -1 &&
+ curFile >= this.context.options().endFile) {
+ break;
+ }
} catch (InitException e) {
throw new LoadException("Failed to init input reader", e);
}
+ curIndex += 1;
+ }
+ // sort by seqNumber to allow scatter loading from different sources
+ if (scatter) {
+ tasks.sort(new Comparator<InputTaskItem>() {
+ @Override
+ public int compare(InputTaskItem o1, InputTaskItem o2) {
+ if (o1.structIndex == o2.structIndex) {
+ return o1.seqNumber - o2.seqNumber;
+ } else {
+ return o1.structIndex - o2.structIndex;
+ }
+ }
+ });
+ }
+
+ return tasks;
+ }
+
+ private void loadStructs(List<InputStruct> structs) {
+ int parallelCount = this.context.options().parallelCount;
+ if (structs.size() == 0) {
+ return;
+ }
+ if (parallelCount <= 0) {
+ parallelCount = structs.size();
+ }
+
+ boolean scatter = this.context.options().scatterSources;
+
+ LOG.info("{} threads for loading {} structs, from {} to {} in {} mode",
+ parallelCount, structs.size(),
this.context.options().startFile,
+ this.context.options().endFile,
+ scatter ? "scatter" : "sequencial");
+
+ this.loadService = ExecutorUtil.newFixedThreadPool(parallelCount,
+ "loader");
+
+ List<InputTaskItem> taskItems = prepareTaskItems(structs, scatter);
+
+ List<CompletableFuture<Void>> loadTasks = new ArrayList<>();
+
+ for (InputTaskItem item : taskItems) {
+ // Init reader
+ item.reader.init(this.context, item.struct);
+ // Load data from current input mapping
+ loadTasks.add(
+ this.asyncLoadStruct(item.struct, item.reader,
+ this.loadService));
+ }
+
+ LOG.info("waiting for loading finish {}", loadTasks.size());
+ // wait for finish
+ try {
+ CompletableFuture.allOf(loadTasks.toArray(new
CompletableFuture[0]))
+ .join();
+ } catch (CompletionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof ParseException) {
+ throw (ParseException) cause;
+ } else if (cause instanceof LoadException) {
Review Comment:
**Concurrency Issue**: The exception handling in `asyncLoadStruct` wraps and
re-throws exceptions, but the concurrent execution model using
`CompletableFuture.allOf().join()` may not preserve exception context properly
for all failed tasks.
**Issue**: When multiple tasks fail concurrently, only the first exception
is propagated while others are lost.
**Recommendation**: Consider collecting all exceptions:
```java
List<Throwable> exceptions = new ArrayList<>();
for (CompletableFuture<Void> future : loadTasks) {
try {
future.join();
} catch (CompletionException e) {
exceptions.add(e.getCause() != null ? e.getCause() : e);
}
}
if (!exceptions.isEmpty()) {
LoadException aggregated = new LoadException("Multiple load failures
occurred");
exceptions.forEach(aggregated::addSuppressed);
throw aggregated;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]