This is an automated email from the ASF dual-hosted git repository. alexpl pushed a commit to branch ignite-20501 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 7476486ea567c13d71c5b37a62ecf03eac9b6ea7 Author: Aleksey Plekhanov <plehanov.a...@gmail.com> AuthorDate: Fri Oct 20 10:06:14 2023 +0300 IGNITE-20501 SQL Calcite: Fix memory leak in MailboxRegistryImpl#remotes --- .../query/calcite/exec/rel/MergeJoinNode.java | 54 ++++++++-------------- .../integration/AggregatesIntegrationTest.java | 1 + 2 files changed, 21 insertions(+), 34 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java index 0a89fa68693..35f31e8b931 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java @@ -213,6 +213,14 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { /** */ protected abstract void join() throws Exception; + /** */ + protected void checkJoinFinished() throws Exception { + if (!distributed || (waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING)) { + requested = 0; + downstream().end(); + } + } + /** */ @NotNull public static <Row> MergeJoinNode<Row> create(ExecutionContext<Row> ctx, RelDataType outputRowType, RelDataType leftRowType, RelDataType rightRowType, JoinRelType joinType, Comparator<Row> comp, boolean distributed) { @@ -392,12 +400,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { if (requested > 0 && ((waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty()) || (waitingRight == NOT_WAITING && right == null && rightInBuf.isEmpty() && rightMaterialization == null)) - ) { - if (!distributed || (waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING)) { - requested = 0; - downstream().end(); - } - } + ) + checkJoinFinished(); } } @@ -579,12 +583,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { if (waitingLeft == 0) leftSource().request(waitingLeft = IN_BUFFER_SIZE); - if (requested > 0 && waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty()) { - if (!distributed || (waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING)) { - requested = 0; - downstream().end(); - } - } + if (requested > 0 && waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty()) + checkJoinFinished(); } } @@ -775,12 +775,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { if (waitingLeft == 0) leftSource().request(waitingLeft = IN_BUFFER_SIZE); - if (requested > 0 && waitingRight == NOT_WAITING && right == null && rightInBuf.isEmpty() && rightMaterialization == null) { - if (!distributed || (waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING)) { - requested = 0; - downstream().end(); - } - } + if (requested > 0 && waitingRight == NOT_WAITING && right == null && rightInBuf.isEmpty() && rightMaterialization == null) + checkJoinFinished(); } } @@ -1012,10 +1008,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { if (requested > 0 && waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty() && waitingRight == NOT_WAITING && right == null && rightInBuf.isEmpty() && rightMaterialization == null - ) { - requested = 0; - downstream().end(); - } + ) + checkJoinFinished(); } } @@ -1089,12 +1083,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { if (requested > 0 && ((waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty() || (waitingRight == NOT_WAITING && right == null && rightInBuf.isEmpty()))) - ) { - if (!distributed || (waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING)) { - requested = 0; - downstream().end(); - } - } + ) + checkJoinFinished(); } } @@ -1169,12 +1159,8 @@ public abstract class MergeJoinNode<Row> extends AbstractNode<Row> { if (waitingLeft == 0) leftSource().request(waitingLeft = IN_BUFFER_SIZE); - if (requested > 0 && waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty()) { - if (!distributed || (waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING)) { - requested = 0; - downstream().end(); - } - } + if (requested > 0 && waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty()) + checkJoinFinished(); } } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java index ec13938c4b8..4b910029cae 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java @@ -229,6 +229,7 @@ public class AggregatesIntegrationTest extends AbstractBasicIntegrationTest { GridTestUtils.assertThrowsWithCause(() -> assertQuery("SELECT t._key, (SELECT x FROM " + "TABLE(system_range(1, 5))) FROM person t").check(), IllegalArgumentException.class); + GridTestUtils.assertThrowsWithCause(() -> assertQuery("SELECT t._key, (SELECT x FROM " + "TABLE(system_range(t._key, t._key + 1))) FROM person t").check(), IllegalArgumentException.class);