This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 2f7e98a  fix exception handling on root node
2f7e98a is described below

commit 2f7e98ae80a2e6bb4002fe2a2412507e8d67f958
Author: Igor Seliverstov <gvvinbl...@gmail.com>
AuthorDate: Mon Sep 14 17:13:05 2020 +0300

    fix exception handling on root node
---
 .../query/calcite/exec/ExecutionServiceImpl.java   |  2 +-
 .../query/calcite/exec/rel/AbstractNode.java       | 36 ++++++++++++----------
 .../query/calcite/exec/rel/AggregateNode.java      |  2 +-
 .../exec/rel/CorrelatedNestedLoopJoinNode.java     |  2 +-
 .../query/calcite/exec/rel/FilterNode.java         |  2 +-
 .../processors/query/calcite/exec/rel/Inbox.java   | 10 ++----
 .../query/calcite/exec/rel/ModifyNode.java         |  2 +-
 .../query/calcite/exec/rel/NestedLoopJoinNode.java | 26 ++++++++--------
 .../processors/query/calcite/exec/rel/Node.java    |  2 +-
 .../processors/query/calcite/exec/rel/Outbox.java  |  8 ++---
 .../query/calcite/exec/rel/ProjectNode.java        |  2 +-
 .../query/calcite/exec/rel/RootNode.java           |  8 ++---
 .../query/calcite/exec/rel/ScanNode.java           |  6 ++--
 .../query/calcite/exec/rel/SortNode.java           |  2 +-
 .../query/calcite/exec/rel/UnionAllNode.java       |  2 +-
 15 files changed, 56 insertions(+), 56 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 99b0ea5..6a1966e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -1028,7 +1028,7 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
                 running.remove(ctx.queryId());
 
                 // 2) close local fragment
-                root.onClose();
+                root.closeInternal();
 
                 // 3) close remote fragments
                 for (UUID nodeId : remotes) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
index c47321f..f021516 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
@@ -103,7 +103,7 @@ public abstract class AbstractNode<Row> implements 
Node<Row> {
         if (isClosed())
             return;
 
-        onClose();
+        closeInternal();
 
         if (!F.isEmpty(sources()))
             sources().forEach(U::closeQuiet);
@@ -111,7 +111,7 @@ public abstract class AbstractNode<Row> implements 
Node<Row> {
 
     /** {@inheritDoc} */
     @Override public void rewind() {
-        onRewind();
+        rewindInternal();
 
         if (!F.isEmpty(sources()))
             sources().forEach(Node::rewind);
@@ -122,36 +122,40 @@ public abstract class AbstractNode<Row> implements 
Node<Row> {
         this.downstream = downstream;
     }
 
-    /** */
-    protected abstract void onRewind();
-
     /**
      * Processes given exception.
      *
      * @param e Exception.
      */
     public void onError(Throwable e) {
-        assert downstream() != null;
-
-        if (e instanceof ExecutionCancelledException) {
+        if (e instanceof ExecutionCancelledException)
             U.warn(context().planningContext().logger(), "Execution is 
cancelled.", e);
+        else
+            onErrorInternal(e);
+    }
 
-            return;
-        }
+    /** */
+    protected void closeInternal() {
+        closed = true;
+    }
+
+    /** */
+    protected abstract void rewindInternal();
+
+    /** */
+    protected void onErrorInternal(Throwable e) {
+        Downstream<Row> downstream = downstream();
+
+        assert downstream != null;
 
         try {
-            downstream().onError(e);
+            downstream.onError(e);
         }
         finally {
             U.closeQuiet(this);
         }
     }
 
-    /** */
-    protected void onClose() {
-        closed = true;
-    }
-
     /**
      * @return {@code true} if the subtree is canceled.
      */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
index 8d300b6..80911e0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AggregateNode.java
@@ -157,7 +157,7 @@ public class AggregateNode<Row> extends AbstractNode<Row> 
implements SingleNode<
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         requested = 0;
         waiting = 0;
         groupings.forEach(grouping -> grouping.groups.clear());
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
index c88a9a8..d102be1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -113,7 +113,7 @@ public class CorrelatedNestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         leftInBuf = null;
         rightInBuf = null;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
index ba8f5fe..06703eb 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/FilterNode.java
@@ -118,7 +118,7 @@ public class FilterNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         requested = 0;
         waiting = 0;
         inBuf.clear();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index 58c084e..3ea53fc 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -137,8 +137,8 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Singl
     }
 
     /** {@inheritDoc} */
-    @Override public void onClose() {
-        super.onClose();
+    @Override public void closeInternal() {
+        super.closeInternal();
 
         registry.unregister(this);
     }
@@ -154,7 +154,7 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Singl
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         throw new UnsupportedOperationException();
     }
 
@@ -168,8 +168,6 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Singl
      */
     public void onBatchReceived(UUID src, int batchId, boolean last, List<Row> 
rows) {
         try {
-            checkState();
-
             Buffer buf = getOrCreateBuffer(src);
 
             boolean waitingBefore = buf.check() == State.WAITING;
@@ -187,8 +185,6 @@ public class Inbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Singl
     /** */
     private void doPush() {
         try {
-            checkState();
-
             push();
         }
         catch (Exception e) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index 5336481..a89255f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -157,7 +157,7 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         throw new UnsupportedOperationException();
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
index c65dc68..9a90d36 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java
@@ -103,7 +103,7 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         requested = 0;
         waitingLeft = 0;
         waitingRight = 0;
@@ -292,11 +292,11 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
         }
 
         /** {@inheritDoc} */
-        @Override protected void onRewind() {
+        @Override protected void rewindInternal() {
             left = null;
             rightIdx = 0;
 
-            super.onRewind();
+            super.rewindInternal();
         }
 
         /** */
@@ -369,12 +369,12 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
         }
 
         /** */
-        @Override protected void onRewind() {
+        @Override protected void rewindInternal() {
             matched = false;
             left = null;
             rightIdx = 0;
 
-            super.onRewind();
+            super.rewindInternal();
         }
 
         /** {@inheritDoc} */
@@ -465,13 +465,13 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
         }
 
         /** {@inheritDoc} */
-        @Override protected void onRewind() {
+        @Override protected void rewindInternal() {
             left = null;
             rightNotMatchedIndexes.clear();
             lastPushedInd = 0;
             rightIdx = 0;
 
-            super.onRewind();
+            super.rewindInternal();
         }
 
         /** {@inheritDoc} */
@@ -593,14 +593,14 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
         }
 
         /** {@inheritDoc} */
-        @Override protected void onRewind() {
+        @Override protected void rewindInternal() {
             left = null;
             leftMatched = false;
             rightNotMatchedIndexes.clear();
             lastPushedInd = 0;
             rightIdx = 0;
 
-            super.onRewind();
+            super.rewindInternal();
         }
 
         /** {@inheritDoc} */
@@ -718,11 +718,11 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
         }
 
         /** {@inheritDoc} */
-        @Override protected void onRewind() {
+        @Override protected void rewindInternal() {
             left = null;
             rightIdx = 0;
 
-            super.onRewind();
+            super.rewindInternal();
         }
 
         /** {@inheritDoc} */
@@ -786,11 +786,11 @@ public abstract class NestedLoopJoinNode<Row> extends 
AbstractNode<Row> {
         }
 
         /** */
-        @Override protected void onRewind() {
+        @Override protected void rewindInternal() {
             left = null;
             rightIdx = 0;
 
-            super.onRewind();
+            super.rewindInternal();
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
index fb1c1c1..d0d8e51 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Node.java
@@ -24,7 +24,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
 /**
  * Represents a node of execution tree.
  *
- * <p/><b>Note</b>: except several cases (like consumer node and mailboxes), 
{@link Node#request(int)}, {@link Node#cancel()},
+ * <p/><b>Note</b>: except several cases (like consumer node and mailboxes), 
{@link Node#request(int)}, {@link Node#close()} ()},
  * {@link Downstream#push(Object)} and {@link Downstream#end()} methods should 
be used from one single thread.
  */
 public interface Node<Row> extends AutoCloseable {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
index 25a4673..35e61f9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
@@ -165,7 +165,7 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Sing
     }
 
     /** {@inheritDoc} */
-    @Override public void onError(Throwable e) {
+    @Override protected void onErrorInternal(Throwable e) {
         U.error(context().planningContext().logger(),
             "Error occurred during execution: " + X.getFullStackTrace(e));
 
@@ -182,8 +182,8 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Sing
     }
 
     /** {@inheritDoc} */
-    @Override public void onClose() {
-        super.onClose();
+    @Override public void closeInternal() {
+        super.closeInternal();
 
         registry.unregister(this);
 
@@ -198,7 +198,7 @@ public class Outbox<Row> extends AbstractNode<Row> 
implements Mailbox<Row>, Sing
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         throw new UnsupportedOperationException();
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
index 5f3205e..dec2710 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ProjectNode.java
@@ -40,7 +40,7 @@ public class ProjectNode<Row> extends AbstractNode<Row> 
implements SingleNode<Ro
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         // No-op.
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index 825fe42..f38c613 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -104,7 +104,7 @@ public class RootNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
         }
 
         if (onClose == null)
-            onClose();
+            closeInternal();
         else
             onClose.run();
     }
@@ -115,7 +115,7 @@ public class RootNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     }
 
     /** {@inheritDoc} */
-    @Override public void onClose() {
+    @Override public void closeInternal() {
         context().execute(() -> {
             buff.clear();
 
@@ -184,7 +184,7 @@ public class RootNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     }
 
     /** {@inheritDoc} */
-    @Override public void onError(Throwable e) {
+    @Override protected void onErrorInternal(Throwable e) {
         if (!ex.compareAndSet(null, e))
             ex.get().addSuppressed(e);
 
@@ -226,7 +226,7 @@ public class RootNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         throw new UnsupportedOperationException();
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
index 3e82b63..7576c02 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java
@@ -80,8 +80,8 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
     }
 
     /** {@inheritDoc} */
-    @Override public void onClose() {
-        super.onClose();
+    @Override public void closeInternal() {
+        super.closeInternal();
 
         Commons.closeQuiet(it);
         it = null;
@@ -89,7 +89,7 @@ public class ScanNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         Commons.closeQuiet(it);
         it = null;
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 5c686fc..3ab1148 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -50,7 +50,7 @@ public class SortNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         requested = 0;
         waiting = 0;
         rows.clear();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
index 9935b68..0d1a6d3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/UnionAllNode.java
@@ -98,7 +98,7 @@ public class UnionAllNode<Row> extends AbstractNode<Row> 
implements Downstream<R
     }
 
     /** {@inheritDoc} */
-    @Override protected void onRewind() {
+    @Override protected void rewindInternal() {
         curSrc = 0;
         waiting = 0;
     }

Reply via email to