This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch ignite-20501
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 7476486ea567c13d71c5b37a62ecf03eac9b6ea7
Author: Aleksey Plekhanov <plehanov.a...@gmail.com>
AuthorDate: Fri Oct 20 10:06:14 2023 +0300

    IGNITE-20501 SQL Calcite: Fix memory leak in MailboxRegistryImpl#remotes
---
 .../query/calcite/exec/rel/MergeJoinNode.java      | 54 ++++++++--------------
 .../integration/AggregatesIntegrationTest.java     |  1 +
 2 files changed, 21 insertions(+), 34 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
index 0a89fa68693..35f31e8b931 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
@@ -213,6 +213,14 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
     /** */
     protected abstract void join() throws Exception;
 
+    /** */
+    protected void checkJoinFinished() throws Exception {
+        if (!distributed || (waitingLeft == NOT_WAITING && waitingRight == 
NOT_WAITING)) {
+            requested = 0;
+            downstream().end();
+        }
+    }
+
     /** */
     @NotNull public static <Row> MergeJoinNode<Row> 
create(ExecutionContext<Row> ctx, RelDataType outputRowType, RelDataType 
leftRowType,
         RelDataType rightRowType, JoinRelType joinType, Comparator<Row> comp, 
boolean distributed) {
@@ -392,12 +400,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
 
             if (requested > 0 && ((waitingLeft == NOT_WAITING && left == null 
&& leftInBuf.isEmpty())
                 || (waitingRight == NOT_WAITING && right == null && 
rightInBuf.isEmpty() && rightMaterialization == null))
-            ) {
-                if (!distributed || (waitingLeft == NOT_WAITING && 
waitingRight == NOT_WAITING)) {
-                    requested = 0;
-                    downstream().end();
-                }
-            }
+            )
+                checkJoinFinished();
         }
     }
 
@@ -579,12 +583,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             if (waitingLeft == 0)
                 leftSource().request(waitingLeft = IN_BUFFER_SIZE);
 
-            if (requested > 0 && waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty()) {
-                if (!distributed || (waitingLeft == NOT_WAITING && 
waitingRight == NOT_WAITING)) {
-                    requested = 0;
-                    downstream().end();
-                }
-            }
+            if (requested > 0 && waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty())
+                checkJoinFinished();
         }
     }
 
@@ -775,12 +775,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             if (waitingLeft == 0)
                 leftSource().request(waitingLeft = IN_BUFFER_SIZE);
 
-            if (requested > 0 && waitingRight == NOT_WAITING && right == null 
&& rightInBuf.isEmpty() && rightMaterialization == null) {
-                if (!distributed || (waitingLeft == NOT_WAITING && 
waitingRight == NOT_WAITING)) {
-                    requested = 0;
-                    downstream().end();
-                }
-            }
+            if (requested > 0 && waitingRight == NOT_WAITING && right == null 
&& rightInBuf.isEmpty() && rightMaterialization == null)
+                checkJoinFinished();
         }
     }
 
@@ -1012,10 +1008,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
 
             if (requested > 0 && waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty()
                 && waitingRight == NOT_WAITING && right == null && 
rightInBuf.isEmpty() && rightMaterialization == null
-            ) {
-                requested = 0;
-                downstream().end();
-            }
+            )
+                checkJoinFinished();
         }
     }
 
@@ -1089,12 +1083,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
 
             if (requested > 0 && ((waitingLeft == NOT_WAITING && left == null 
&& leftInBuf.isEmpty()
                 || (waitingRight == NOT_WAITING && right == null && 
rightInBuf.isEmpty())))
-            ) {
-                if (!distributed || (waitingLeft == NOT_WAITING && 
waitingRight == NOT_WAITING)) {
-                    requested = 0;
-                    downstream().end();
-                }
-            }
+            )
+                checkJoinFinished();
         }
     }
 
@@ -1169,12 +1159,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             if (waitingLeft == 0)
                 leftSource().request(waitingLeft = IN_BUFFER_SIZE);
 
-            if (requested > 0 && waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty()) {
-                if (!distributed || (waitingLeft == NOT_WAITING && 
waitingRight == NOT_WAITING)) {
-                    requested = 0;
-                    downstream().end();
-                }
-            }
+            if (requested > 0 && waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty())
+                checkJoinFinished();
         }
     }
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java
index ec13938c4b8..4b910029cae 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java
@@ -229,6 +229,7 @@ public class AggregatesIntegrationTest extends 
AbstractBasicIntegrationTest {
 
         GridTestUtils.assertThrowsWithCause(() -> assertQuery("SELECT t._key, 
(SELECT x FROM " +
                 "TABLE(system_range(1, 5))) FROM person t").check(), 
IllegalArgumentException.class);
+
         GridTestUtils.assertThrowsWithCause(() -> assertQuery("SELECT t._key, 
(SELECT x FROM " +
                 "TABLE(system_range(t._key, t._key + 1))) FROM person 
t").check(), IllegalArgumentException.class);
 

Reply via email to