xtern commented on code in PR #1581:
URL: https://github.com/apache/ignite-3/pull/1581#discussion_r1089053233
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java:
##########
@@ -576,36 +572,48 @@ private void submitFragment(String initiatorNode, String
fragmentString, Fragmen
private AsyncCursor<List<Object>> execute(MultiStepPlan plan) {
taskExecutor.execute(() -> {
- plan.init(mappingSrvc, new
MappingQueryContext(localNode.name()));
+ try {
+ plan.init(mappingSrvc, new
MappingQueryContext(localNode.name()));
- List<Fragment> fragments = plan.fragments();
+ List<Fragment> fragments = plan.fragments();
- // we rely on the fact that the very first fragment is a root.
Otherwise we need to handle
- // the case when a non-root fragment will fail before the root
is processed.
- assert !nullOrEmpty(fragments) &&
fragments.get(0).rootFragment() : fragments;
+ // we rely on the fact that the very first fragment is a
root. Otherwise we need to handle
+ // the case when a non-root fragment will fail before the
root is processed.
+ assert !nullOrEmpty(fragments) &&
fragments.get(0).rootFragment() : fragments;
- // start remote execution
- try {
+ // start remote execution
for (Fragment fragment : fragments) {
- if (fragment.rootFragment()) {
- assert rootFragmentId == null;
+ try {
+ if (fragment.rootFragment()) {
+ assert rootFragmentId == null;
- rootFragmentId = fragment.fragmentId();
- }
+ rootFragmentId = fragment.fragmentId();
+ }
- FragmentDescription fragmentDesc = new
FragmentDescription(
- fragment.fragmentId(),
- plan.mapping(fragment),
- plan.target(fragment),
- plan.remotes(fragment)
- );
+ FragmentDescription fragmentDesc = new
FragmentDescription(
+ fragment.fragmentId(),
+ plan.mapping(fragment),
+ plan.target(fragment),
+ plan.remotes(fragment)
+ );
+
+ for (String nodeName : fragmentDesc.nodeNames()) {
+ sendFragment(nodeName, fragment, fragmentDesc);
+ }
+ } catch (Throwable t0) {
+ if (fragment.rootFragment()) {
+ throw t0;
+ }
+
+ root.thenAccept(exec -> exec.onError(t0));
- for (String nodeName : fragmentDesc.nodeNames()) {
- sendFragment(nodeName, fragment, fragmentDesc);
+ break;
}
}
- } catch (Throwable e) {
- root.thenAccept(root -> root.onError(e));
+ } catch (Throwable t) {
+ LOG.error("An error occurred while executing the query.",
t);
+
+ root.completeExceptionally(t);
Review Comment:
Fixed a bit to handle such scenario :thinking:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]