This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new 2f7e98a fix exception handling on root node 2f7e98a is described below commit 2f7e98ae80a2e6bb4002fe2a2412507e8d67f958 Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Mon Sep 14 17:13:05 2020 +0300 fix exception handling on root node --- .../query/calcite/exec/ExecutionServiceImpl.java | 2 +- .../query/calcite/exec/rel/AbstractNode.java | 36 ++++++++++++---------- .../query/calcite/exec/rel/AggregateNode.java | 2 +- .../exec/rel/CorrelatedNestedLoopJoinNode.java | 2 +- .../query/calcite/exec/rel/FilterNode.java | 2 +- .../processors/query/calcite/exec/rel/Inbox.java | 10 ++---- .../query/calcite/exec/rel/ModifyNode.java | 2 +- .../query/calcite/exec/rel/NestedLoopJoinNode.java | 26 ++++++++-------- .../processors/query/calcite/exec/rel/Node.java | 2 +- .../processors/query/calcite/exec/rel/Outbox.java | 8 ++--- .../query/calcite/exec/rel/ProjectNode.java | 2 +- .../query/calcite/exec/rel/RootNode.java | 8 ++--- .../query/calcite/exec/rel/ScanNode.java | 6 ++-- .../query/calcite/exec/rel/SortNode.java | 2 +- .../query/calcite/exec/rel/UnionAllNode.java | 2 +- 15 files changed, 56 insertions(+), 56 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index 99b0ea5..6a1966e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -1028,7 +1028,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut running.remove(ctx.queryId()); // 2) close local fragment - root.onClose(); + root.closeInternal(); // 3) close remote fragments for (UUID nodeId : remotes) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java index c47321f..f021516 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java @@ -103,7 +103,7 @@ public abstract class AbstractNode<Row> implements Node<Row> { if (isClosed()) return; - onClose(); + closeInternal(); if (!F.isEmpty(sources())) sources().forEach(U::closeQuiet); @@ -111,7 +111,7 @@ public abstract class AbstractNode<Row> implements Node<Row> { /** {@inheritDoc} */ @Override public void rewind() { - onRewind(); + rewindInternal(); if (!F.isEmpty(sources())) sources().forEach(Node::rewind); @@ -122,36 +122,40 @@ public abstract class AbstractNode<Row> implements Node<Row> { this.downstream = downstream; } - /** */ - protected abstract void onRewind(); - /** * Processes given exception. * * @param e Exception. */ public void onError(Throwable e) { - assert downstream() != null; - - if (e instanceof ExecutionCancelledException) { + if (e instanceof ExecutionCancelledException) U.warn(context().planningContext().logger(), "Execution is cancelled.", e); + else + onErrorInternal(e); + } - return; - } + /** */ + protected void closeInternal() { + closed = true; + } + + /** */ + protected abstract void rewindInternal(); + + /** */ + protected void onErrorInternal(Throwable e) { + Downstream<Row> downstream = downstream(); + + assert downstream != null; try { - downstream().onError(e); + downstream.onError(e); } finally { U.closeQuiet(this); } } - /** */ - protected void onClose() { - closed = true; - } - /** * @return {@code true} if the subtree is canceled. */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java index 8d300b6..80911e0 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java @@ -157,7 +157,7 @@ public class AggregateNode<Row> extends AbstractNode<Row> implements SingleNode< } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { requested = 0; waiting = 0; groupings.forEach(grouping -> grouping.groups.clear()); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java index c88a9a8..d102be1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -113,7 +113,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { leftInBuf = null; rightInBuf = null; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java index ba8f5fe..06703eb 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java @@ -118,7 +118,7 @@ public class FilterNode<Row> extends AbstractNode<Row> implements SingleNode<Row } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { requested = 0; waiting = 0; inBuf.clear(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java index 58c084e..3ea53fc 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java @@ -137,8 +137,8 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl } /** {@inheritDoc} */ - @Override public void onClose() { - super.onClose(); + @Override public void closeInternal() { + super.closeInternal(); registry.unregister(this); } @@ -154,7 +154,7 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { throw new UnsupportedOperationException(); } @@ -168,8 +168,6 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl */ public void onBatchReceived(UUID src, int batchId, boolean last, List<Row> rows) { try { - checkState(); - Buffer buf = getOrCreateBuffer(src); boolean waitingBefore = buf.check() == State.WAITING; @@ -187,8 +185,6 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl /** */ private void doPush() { try { - checkState(); - push(); } catch (Exception e) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java index 5336481..a89255f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java @@ -157,7 +157,7 @@ public class ModifyNode<Row> extends AbstractNode<Row> implements SingleNode<Row } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { throw new UnsupportedOperationException(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java index c65dc68..9a90d36 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java @@ -103,7 +103,7 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { requested = 0; waitingLeft = 0; waitingRight = 0; @@ -292,11 +292,11 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { left = null; rightIdx = 0; - super.onRewind(); + super.rewindInternal(); } /** */ @@ -369,12 +369,12 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { matched = false; left = null; rightIdx = 0; - super.onRewind(); + super.rewindInternal(); } /** {@inheritDoc} */ @@ -465,13 +465,13 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { left = null; rightNotMatchedIndexes.clear(); lastPushedInd = 0; rightIdx = 0; - super.onRewind(); + super.rewindInternal(); } /** {@inheritDoc} */ @@ -593,14 +593,14 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { left = null; leftMatched = false; rightNotMatchedIndexes.clear(); lastPushedInd = 0; rightIdx = 0; - super.onRewind(); + super.rewindInternal(); } /** {@inheritDoc} */ @@ -718,11 +718,11 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { left = null; rightIdx = 0; - super.onRewind(); + super.rewindInternal(); } /** {@inheritDoc} */ @@ -786,11 +786,11 @@ public abstract class NestedLoopJoinNode<Row> extends AbstractNode<Row> { } /** */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { left = null; rightIdx = 0; - super.onRewind(); + super.rewindInternal(); } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java index fb1c1c1..d0d8e51 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext /** * Represents a node of execution tree. * - * <p/><b>Note</b>: except several cases (like consumer node and mailboxes), {@link Node#request(int)}, {@link Node#cancel()}, + * <p/><b>Note</b>: except several cases (like consumer node and mailboxes), {@link Node#request(int)}, {@link Node#close()} ()}, * {@link Downstream#push(Object)} and {@link Downstream#end()} methods should be used from one single thread. */ public interface Node<Row> extends AutoCloseable { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java index 25a4673..35e61f9 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java @@ -165,7 +165,7 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing } /** {@inheritDoc} */ - @Override public void onError(Throwable e) { + @Override protected void onErrorInternal(Throwable e) { U.error(context().planningContext().logger(), "Error occurred during execution: " + X.getFullStackTrace(e)); @@ -182,8 +182,8 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing } /** {@inheritDoc} */ - @Override public void onClose() { - super.onClose(); + @Override public void closeInternal() { + super.closeInternal(); registry.unregister(this); @@ -198,7 +198,7 @@ public class Outbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Sing } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { throw new UnsupportedOperationException(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java index 5f3205e..dec2710 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java @@ -40,7 +40,7 @@ public class ProjectNode<Row> extends AbstractNode<Row> implements SingleNode<Ro } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { // No-op. } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java index 825fe42..f38c613 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java @@ -104,7 +104,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, } if (onClose == null) - onClose(); + closeInternal(); else onClose.run(); } @@ -115,7 +115,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, } /** {@inheritDoc} */ - @Override public void onClose() { + @Override public void closeInternal() { context().execute(() -> { buff.clear(); @@ -184,7 +184,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, } /** {@inheritDoc} */ - @Override public void onError(Throwable e) { + @Override protected void onErrorInternal(Throwable e) { if (!ex.compareAndSet(null, e)) ex.get().addSuppressed(e); @@ -226,7 +226,7 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { throw new UnsupportedOperationException(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java index 3e82b63..7576c02 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java @@ -80,8 +80,8 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row> } /** {@inheritDoc} */ - @Override public void onClose() { - super.onClose(); + @Override public void closeInternal() { + super.closeInternal(); Commons.closeQuiet(it); it = null; @@ -89,7 +89,7 @@ public class ScanNode<Row> extends AbstractNode<Row> implements SingleNode<Row> } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { Commons.closeQuiet(it); it = null; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java index 5c686fc..3ab1148 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java @@ -50,7 +50,7 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { requested = 0; waiting = 0; rows.clear(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java index 9935b68..0d1a6d3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java @@ -98,7 +98,7 @@ public class UnionAllNode<Row> extends AbstractNode<Row> implements Downstream<R } /** {@inheritDoc} */ - @Override protected void onRewind() { + @Override protected void rewindInternal() { curSrc = 0; waiting = 0; }